This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.workflow.lite;
019
020 import org.apache.oozie.service.XLogService;
021 import org.apache.oozie.service.DagXLogInfoService;
022 import org.apache.oozie.client.OozieClient;
023 import org.apache.hadoop.io.Writable;
024 import org.apache.hadoop.util.ReflectionUtils;
025 import org.apache.hadoop.conf.Configuration;
026 import org.apache.oozie.workflow.WorkflowApp;
027 import org.apache.oozie.workflow.WorkflowException;
028 import org.apache.oozie.workflow.WorkflowInstance;
029 import org.apache.oozie.util.ParamChecker;
030 import org.apache.oozie.util.XLog;
031 import org.apache.oozie.util.XConfiguration;
032 import org.apache.oozie.ErrorCode;
033
034 import java.io.DataInput;
035 import java.io.DataOutput;
036 import java.io.IOException;
037 import java.io.ByteArrayOutputStream;
038 import java.io.ByteArrayInputStream;
039 import java.util.ArrayList;
040 import java.util.HashMap;
041 import java.util.List;
042 import java.util.Map;
043
044 //TODO javadoc
045 public class LiteWorkflowInstance implements Writable, WorkflowInstance {
046 private static final String TRANSITION_TO = "transition.to";
047
048 private XLog log;
049
050 private static String PATH_SEPARATOR = "/";
051 private static String ROOT = PATH_SEPARATOR;
052 private static String TRANSITION_SEPARATOR = "#";
053
054 private static class NodeInstance {
055 String nodeName;
056 boolean started = false;
057
058 private NodeInstance(String nodeName) {
059 this.nodeName = nodeName;
060 }
061 }
062
063 private class Context implements NodeHandler.Context {
064 private NodeDef nodeDef;
065 private String executionPath;
066 private String exitState;
067 private Status status = Status.RUNNING;
068
069 private Context(NodeDef nodeDef, String executionPath, String exitState) {
070 this.nodeDef = nodeDef;
071 this.executionPath = executionPath;
072 this.exitState = exitState;
073 }
074
075 public NodeDef getNodeDef() {
076 return nodeDef;
077 }
078
079 public String getExecutionPath() {
080 return executionPath;
081 }
082
083 public String getParentExecutionPath(String executionPath) {
084 return LiteWorkflowInstance.getParentPath(executionPath);
085 }
086
087 public String getSignalValue() {
088 return exitState;
089 }
090
091 public String createExecutionPath(String name) {
092 return LiteWorkflowInstance.createChildPath(executionPath, name);
093 }
094
095 public String createFullTransition(String executionPath, String transition) {
096 return LiteWorkflowInstance.createFullTransition(executionPath, transition);
097 }
098
099 public void deleteExecutionPath() {
100 if (!executionPaths.containsKey(executionPath)) {
101 throw new IllegalStateException();
102 }
103 executionPaths.remove(executionPath);
104 executionPath = LiteWorkflowInstance.getParentPath(executionPath);
105 }
106
107 public void failJob() {
108 status = Status.FAILED;
109 }
110
111 public void killJob() {
112 status = Status.KILLED;
113 }
114
115 public void completeJob() {
116 status = Status.SUCCEEDED;
117 }
118
119 @Override
120 public Object getTransientVar(String name) {
121 return LiteWorkflowInstance.this.getTransientVar(name);
122 }
123
124 @Override
125 public String getVar(String name) {
126 return LiteWorkflowInstance.this.getVar(name);
127 }
128
129 @Override
130 public void setTransientVar(String name, Object value) {
131 LiteWorkflowInstance.this.setTransientVar(name, value);
132 }
133
134 @Override
135 public void setVar(String name, String value) {
136 LiteWorkflowInstance.this.setVar(name, value);
137 }
138
139 @Override
140 public LiteWorkflowInstance getProcessInstance() {
141 return LiteWorkflowInstance.this;
142 }
143
144 }
145
146 private LiteWorkflowApp def;
147 private Configuration conf;
148 private String instanceId;
149 private Status status;
150 private Map<String, NodeInstance> executionPaths = new HashMap<String, NodeInstance>();
151 private Map<String, String> persistentVars = new HashMap<String, String>();
152 private Map<String, Object> transientVars = new HashMap<String, Object>();
153
154 protected LiteWorkflowInstance() {
155 log = XLog.getLog(getClass());
156 }
157
158 public LiteWorkflowInstance(LiteWorkflowApp def, Configuration conf, String instanceId) {
159 this();
160 this.def = ParamChecker.notNull(def, "def");
161 this.instanceId = ParamChecker.notNull(instanceId, "instanceId");
162 this.conf = ParamChecker.notNull(conf, "conf");
163 refreshLog();
164 status = Status.PREP;
165 }
166
167 public synchronized boolean start() throws WorkflowException {
168 if (status != Status.PREP) {
169 throw new WorkflowException(ErrorCode.E0719);
170 }
171 log.debug(XLog.STD, "Starting job");
172 status = Status.RUNNING;
173 executionPaths.put(ROOT, new NodeInstance(StartNodeDef.START));
174 return signal(ROOT, StartNodeDef.START);
175 }
176
177 //todo if suspended store signal and use when resuming
178
179 public synchronized boolean signal(String executionPath, String signalValue) throws WorkflowException {
180 ParamChecker.notEmpty(executionPath, "executionPath");
181 ParamChecker.notNull(signalValue, "signalValue");
182 log.debug(XLog.STD, "Signaling job execution path [{0}] signal value [{1}]", executionPath, signalValue);
183 if (status != Status.RUNNING) {
184 throw new WorkflowException(ErrorCode.E0716);
185 }
186 NodeInstance nodeJob = executionPaths.get(executionPath);
187 if (nodeJob == null) {
188 status = Status.FAILED;
189 log.error("invalid execution path [{0}]", executionPath);
190 }
191 NodeDef nodeDef = null;
192 if (!status.isEndState()) {
193 nodeDef = def.getNode(nodeJob.nodeName);
194 if (nodeDef == null) {
195 status = Status.FAILED;
196 log.error("invalid transition [{0}]", nodeJob.nodeName);
197 }
198 }
199 if (!status.isEndState()) {
200 NodeHandler nodeHandler = newInstance(nodeDef.getHandlerClass());
201 boolean exiting = true;
202
203 Context context = new Context(nodeDef, executionPath, signalValue);
204 if (!nodeJob.started) {
205 try {
206 nodeHandler.loopDetection(context);
207 exiting = nodeHandler.enter(context);
208 nodeJob.started = true;
209 }
210 catch (WorkflowException ex) {
211 status = Status.FAILED;
212 throw ex;
213 }
214 }
215
216 if (exiting) {
217 List<String> pathsToStart = new ArrayList<String>();
218 List<String> fullTransitions;
219 try {
220 fullTransitions = nodeHandler.multiExit(context);
221 int last = fullTransitions.size() - 1;
222 // TEST THIS
223 if (last >= 0) {
224 String transitionTo = getTransitionNode(fullTransitions.get(last));
225
226 persistentVars.put(nodeDef.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + TRANSITION_TO,
227 transitionTo);
228 }
229 }
230 catch (WorkflowException ex) {
231 status = Status.FAILED;
232 throw ex;
233 }
234
235 if (context.status == Status.KILLED) {
236 status = Status.KILLED;
237 log.debug(XLog.STD, "Completing job, kill node [{0}]", nodeJob.nodeName);
238 }
239 else {
240 if (context.status == Status.FAILED) {
241 status = Status.FAILED;
242 log.debug(XLog.STD, "Completing job, fail node [{0}]", nodeJob.nodeName);
243 }
244 else {
245 if (context.status == Status.SUCCEEDED) {
246 status = Status.SUCCEEDED;
247 log.debug(XLog.STD, "Completing job, end node [{0}]", nodeJob.nodeName);
248 }
249 /*
250 else if (context.status == Status.SUSPENDED) {
251 status = Status.SUSPENDED;
252 log.debug(XLog.STD, "Completing job, end node [{0}]", nodeJob.nodeName);
253 }
254 */
255 else {
256 for (String fullTransition : fullTransitions) {
257 // this is the whole trick for forking, we need the
258 // executionpath and the transition
259 // in the case of no forking last element of
260 // executionpath is different from transition
261 // in the case of forking they are the same
262
263 log.debug(XLog.STD, "Exiting node [{0}] with transition[{1}]", nodeJob.nodeName,
264 fullTransition);
265
266 String execPathFromTransition = getExecutionPath(fullTransition);
267 String transition = getTransitionNode(fullTransition);
268 def.validateTransition(nodeJob.nodeName, transition);
269
270 NodeInstance nodeJobInPath = executionPaths.get(execPathFromTransition);
271 if ((nodeJobInPath == null) || (!transition.equals(nodeJobInPath.nodeName))) {
272 // TODO explain this IF better
273 // If the WfJob is signaled with the parent
274 // execution executionPath again
275 // The Fork node will execute again.. and replace
276 // the Node WorkflowJobBean
277 // so this is required to prevent that..
278 // Question : Should we throw an error in this case
279 // ??
280 executionPaths.put(execPathFromTransition, new NodeInstance(transition));
281 pathsToStart.add(execPathFromTransition);
282 }
283
284 }
285 // signal all new synch transitions
286 for (String pathToStart : pathsToStart) {
287 signal(pathToStart, "::synch::");
288 }
289 }
290 }
291 }
292 }
293 }
294 if (status.isEndState()) {
295 if (status == Status.FAILED) {
296 List<String> failedNodes = terminateNodes(status);
297 log.warn(XLog.STD, "Workflow completed [{0}], failing [{1}] running nodes", status, failedNodes
298 .size());
299 }
300 else {
301 List<String> killedNodes = terminateNodes(Status.KILLED);
302 if (killedNodes.size() > 1) {
303 log.warn(XLog.STD, "Workflow completed [{0}], killing [{1}] running nodes", status, killedNodes
304 .size());
305 }
306 }
307 }
308 return status.isEndState();
309 }
310
311 /**
312 * Get NodeDef from workflow instance
313 * @param executionPath execution path
314 * @return node def
315 */
316 public NodeDef getNodeDef(String executionPath) {
317 NodeInstance nodeJob = executionPaths.get(executionPath);
318 NodeDef nodeDef = null;
319 if (nodeJob == null) {
320 log.error("invalid execution path [{0}]", executionPath);
321 }
322 else {
323 nodeDef = def.getNode(nodeJob.nodeName);
324 if (nodeDef == null) {
325 log.error("invalid transition [{0}]", nodeJob.nodeName);
326 }
327 }
328 return nodeDef;
329 }
330
331 public synchronized void fail(String nodeName) throws WorkflowException {
332 if (status.isEndState()) {
333 throw new WorkflowException(ErrorCode.E0718);
334 }
335 String failedNode = failNode(nodeName);
336 if (failedNode != null) {
337 log.warn(XLog.STD, "Workflow Failed. Failing node [{0}]", failedNode);
338 }
339 else {
340 //TODO failed attempting to fail the action. EXCEPTION
341 }
342 List<String> killedNodes = killNodes();
343 if (killedNodes.size() > 1) {
344 log.warn(XLog.STD, "Workflow Failed, killing [{0}] nodes", killedNodes.size());
345 }
346 status = Status.FAILED;
347 }
348
349 public synchronized void kill() throws WorkflowException {
350 if (status.isEndState()) {
351 throw new WorkflowException(ErrorCode.E0718);
352 }
353 log.debug(XLog.STD, "Killing job");
354 List<String> killedNodes = killNodes();
355 if (killedNodes.size() > 1) {
356 log.warn(XLog.STD, "workflow killed, killing [{0}] nodes", killedNodes.size());
357 }
358 status = Status.KILLED;
359 }
360
361 public synchronized void suspend() throws WorkflowException {
362 if (status != Status.RUNNING) {
363 throw new WorkflowException(ErrorCode.E0716);
364 }
365 log.debug(XLog.STD, "Suspending job");
366 this.status = Status.SUSPENDED;
367 }
368
369 public boolean isSuspended() {
370 return (status == Status.SUSPENDED);
371 }
372
373 public synchronized void resume() throws WorkflowException {
374 if (status != Status.SUSPENDED) {
375 throw new WorkflowException(ErrorCode.E0717);
376 }
377 log.debug(XLog.STD, "Resuming job");
378 status = Status.RUNNING;
379 }
380
381 public void setVar(String name, String value) {
382 if (value != null) {
383 persistentVars.put(name, value);
384 }
385 else {
386 persistentVars.remove(name);
387 }
388 }
389
390 @Override
391 public Map<String, String> getAllVars() {
392 return persistentVars;
393 }
394
395 @Override
396 public void setAllVars(Map<String, String> varMap) {
397 persistentVars.putAll(varMap);
398 }
399
400 public String getVar(String name) {
401 return persistentVars.get(name);
402 }
403
404
405 public void setTransientVar(String name, Object value) {
406 if (value != null) {
407 transientVars.put(name, value);
408 }
409 else {
410 transientVars.remove(name);
411 }
412 }
413
414 public boolean hasTransientVar(String name) {
415 return transientVars.containsKey(name);
416 }
417
418 public Object getTransientVar(String name) {
419 return transientVars.get(name);
420 }
421
422 public boolean hasEnded() {
423 return status.isEndState();
424 }
425
426 private List<String> terminateNodes(Status endStatus) {
427 List<String> endNodes = new ArrayList<String>();
428 for (Map.Entry<String, NodeInstance> entry : executionPaths.entrySet()) {
429 if (entry.getValue().started) {
430 NodeDef nodeDef = def.getNode(entry.getValue().nodeName);
431 NodeHandler nodeHandler = newInstance(nodeDef.getHandlerClass());
432 try {
433 if (endStatus == Status.KILLED) {
434 nodeHandler.kill(new Context(nodeDef, entry.getKey(), null));
435 }
436 else {
437 if (endStatus == Status.FAILED) {
438 nodeHandler.fail(new Context(nodeDef, entry.getKey(), null));
439 }
440 }
441 endNodes.add(nodeDef.getName());
442 }
443 catch (Exception ex) {
444 log.warn(XLog.STD, "Error Changing node state to [{0}] for Node [{1}]", endStatus.toString(),
445 nodeDef.getName(), ex);
446 }
447 }
448 }
449 return endNodes;
450 }
451
452 private String failNode(String nodeName) {
453 String failedNode = null;
454 for (Map.Entry<String, NodeInstance> entry : executionPaths.entrySet()) {
455 String node = entry.getKey();
456 NodeInstance nodeInstance = entry.getValue();
457 if (nodeInstance.started && nodeInstance.nodeName.equals(nodeName)) {
458 NodeDef nodeDef = def.getNode(nodeInstance.nodeName);
459 NodeHandler nodeHandler = newInstance(nodeDef.getHandlerClass());
460 try {
461 nodeHandler.fail(new Context(nodeDef, node, null));
462 failedNode = nodeDef.getName();
463 nodeInstance.started = false;
464 }
465 catch (Exception ex) {
466 log.warn(XLog.STD, "Error failing node [{0}]", nodeDef.getName(), ex);
467 }
468 return failedNode;
469 }
470 }
471 return failedNode;
472 }
473
474 private List<String> killNodes() {
475 List<String> killedNodes = new ArrayList<String>();
476 for (Map.Entry<String, NodeInstance> entry : executionPaths.entrySet()) {
477 String node = entry.getKey();
478 NodeInstance nodeInstance = entry.getValue();
479 if (nodeInstance.started) {
480 NodeDef nodeDef = def.getNode(nodeInstance.nodeName);
481 NodeHandler nodeHandler = newInstance(nodeDef.getHandlerClass());
482 try {
483 nodeHandler.kill(new Context(nodeDef, node, null));
484 killedNodes.add(nodeDef.getName());
485 }
486 catch (Exception ex) {
487 log.warn(XLog.STD, "Error killing node [{0}]", nodeDef.getName(), ex);
488 }
489 }
490 }
491 return killedNodes;
492 }
493
494 public LiteWorkflowApp getProcessDefinition() {
495 return def;
496 }
497
498 private static String createChildPath(String path, String child) {
499 return path + child + PATH_SEPARATOR;
500 }
501
502 private static String getParentPath(String path) {
503 path = path.substring(0, path.length() - 1);
504 return (path.length() == 0) ? null : path.substring(0, path.lastIndexOf(PATH_SEPARATOR) + 1);
505 }
506
507 private static String createFullTransition(String executionPath, String transition) {
508 return executionPath + TRANSITION_SEPARATOR + transition;
509 }
510
511 private static String getExecutionPath(String fullTransition) {
512 int index = fullTransition.indexOf(TRANSITION_SEPARATOR);
513 if (index == -1) {
514 throw new IllegalArgumentException("Invalid fullTransition");
515 }
516 return fullTransition.substring(0, index);
517 }
518
519 private static String getTransitionNode(String fullTransition) {
520 int index = fullTransition.indexOf(TRANSITION_SEPARATOR);
521 if (index == -1) {
522 throw new IllegalArgumentException("Invalid fullTransition");
523 }
524 return fullTransition.substring(index + 1);
525 }
526
527 private NodeHandler newInstance(Class<? extends NodeHandler> handler) {
528 return (NodeHandler) ReflectionUtils.newInstance(handler, null);
529 }
530
531 private void refreshLog() {
532 XLog.Info.get().setParameter(XLogService.USER, conf.get(OozieClient.USER_NAME));
533 XLog.Info.get().setParameter(XLogService.GROUP, conf.get(OozieClient.GROUP_NAME));
534 XLog.Info.get().setParameter(DagXLogInfoService.APP, def.getName());
535 XLog.Info.get().setParameter(DagXLogInfoService.TOKEN, conf.get(OozieClient.LOG_TOKEN, ""));
536 XLog.Info.get().setParameter(DagXLogInfoService.JOB, instanceId);
537 log = XLog.getLog(getClass());
538 }
539
540 public Status getStatus() {
541 return status;
542 }
543
544 public void setStatus(Status status) {
545 this.status = status;
546 }
547
548 @Override
549 public void write(DataOutput dOut) throws IOException {
550 dOut.writeUTF(instanceId);
551
552 //Hadoop Configuration has to get its act right
553 ByteArrayOutputStream baos = new ByteArrayOutputStream();
554 conf.writeXml(baos);
555 baos.close();
556 byte[] array = baos.toByteArray();
557 dOut.writeInt(array.length);
558 dOut.write(array);
559
560 def.write(dOut);
561 dOut.writeUTF(status.toString());
562 dOut.writeInt(executionPaths.size());
563 for (Map.Entry<String, NodeInstance> entry : executionPaths.entrySet()) {
564 dOut.writeUTF(entry.getKey());
565 dOut.writeUTF(entry.getValue().nodeName);
566 dOut.writeBoolean(entry.getValue().started);
567 }
568 dOut.writeInt(persistentVars.size());
569 for (Map.Entry<String, String> entry : persistentVars.entrySet()) {
570 dOut.writeUTF(entry.getKey());
571 dOut.writeUTF(entry.getValue());
572 }
573 }
574
575 @Override
576 public void readFields(DataInput dIn) throws IOException {
577 instanceId = dIn.readUTF();
578
579 //Hadoop Configuration has to get its act right
580 int len = dIn.readInt();
581 byte[] array = new byte[len];
582 dIn.readFully(array);
583 ByteArrayInputStream bais = new ByteArrayInputStream(array);
584 conf = new XConfiguration(bais);
585
586 def = new LiteWorkflowApp();
587 def.readFields(dIn);
588 status = Status.valueOf(dIn.readUTF());
589 int numExPaths = dIn.readInt();
590 for (int x = 0; x < numExPaths; x++) {
591 String path = dIn.readUTF();
592 String nodeName = dIn.readUTF();
593 boolean isStarted = dIn.readBoolean();
594 NodeInstance nodeInstance = new NodeInstance(nodeName);
595 nodeInstance.started = isStarted;
596 executionPaths.put(path, nodeInstance);
597 }
598 int numVars = dIn.readInt();
599 for (int x = 0; x < numVars; x++) {
600 String vName = dIn.readUTF();
601 String vVal = dIn.readUTF();
602 persistentVars.put(vName, vVal);
603 }
604 refreshLog();
605 }
606
607 @Override
608 public Configuration getConf() {
609 return conf;
610 }
611
612 @Override
613 public WorkflowApp getApp() {
614 return def;
615 }
616
617 @Override
618 public String getId() {
619 return instanceId;
620 }
621
622 @Override
623 public String getTransition(String node) {
624 return persistentVars.get(node + WorkflowInstance.NODE_VAR_SEPARATOR + TRANSITION_TO);
625 }
626
627 @Override
628 public boolean equals(Object o) {
629 return (o != null) && (getClass().isInstance(o)) && ((WorkflowInstance) o).getId().equals(instanceId);
630 }
631
632 @Override
633 public int hashCode() {
634 return instanceId.hashCode();
635 }
636
637 }