This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.workflow.lite;
019
020 import org.apache.oozie.service.XLogService;
021 import org.apache.oozie.service.DagXLogInfoService;
022 import org.apache.oozie.client.OozieClient;
023 import org.apache.hadoop.io.Writable;
024 import org.apache.hadoop.util.ReflectionUtils;
025 import org.apache.hadoop.conf.Configuration;
026 import org.apache.oozie.workflow.WorkflowApp;
027 import org.apache.oozie.workflow.WorkflowException;
028 import org.apache.oozie.workflow.WorkflowInstance;
029 import org.apache.oozie.util.ParamChecker;
030 import org.apache.oozie.util.XLog;
031 import org.apache.oozie.util.XConfiguration;
032 import org.apache.oozie.ErrorCode;
033
034 import java.io.DataInput;
035 import java.io.DataOutput;
036 import java.io.IOException;
037 import java.io.ByteArrayOutputStream;
038 import java.io.ByteArrayInputStream;
039 import java.util.ArrayList;
040 import java.util.HashMap;
041 import java.util.List;
042 import java.util.Map;
043
044 //TODO javadoc
045 public class LiteWorkflowInstance implements Writable, WorkflowInstance {
046 private static final String TRANSITION_TO = "transition.to";
047
048 private XLog log;
049
050 private static String PATH_SEPARATOR = "/";
051 private static String ROOT = PATH_SEPARATOR;
052 private static String TRANSITION_SEPARATOR = "#";
053
054 private static class NodeInstance {
055 String nodeName;
056 boolean started = false;
057
058 private NodeInstance(String nodeName) {
059 this.nodeName = nodeName;
060 }
061 }
062
063 private class Context implements NodeHandler.Context {
064 private NodeDef nodeDef;
065 private String executionPath;
066 private String exitState;
067 private Status status = Status.RUNNING;
068
069 private Context(NodeDef nodeDef, String executionPath, String exitState) {
070 this.nodeDef = nodeDef;
071 this.executionPath = executionPath;
072 this.exitState = exitState;
073 }
074
075 public NodeDef getNodeDef() {
076 return nodeDef;
077 }
078
079 public String getExecutionPath() {
080 return executionPath;
081 }
082
083 public String getParentExecutionPath(String executionPath) {
084 return LiteWorkflowInstance.getParentPath(executionPath);
085 }
086
087 public String getSignalValue() {
088 return exitState;
089 }
090
091 public String createExecutionPath(String name) {
092 return LiteWorkflowInstance.createChildPath(executionPath, name);
093 }
094
095 public String createFullTransition(String executionPath, String transition) {
096 return LiteWorkflowInstance.createFullTransition(executionPath, transition);
097 }
098
099 public void deleteExecutionPath() {
100 if (!executionPaths.containsKey(executionPath)) {
101 throw new IllegalStateException();
102 }
103 executionPaths.remove(executionPath);
104 executionPath = LiteWorkflowInstance.getParentPath(executionPath);
105 }
106
107 public void failJob() {
108 status = Status.FAILED;
109 }
110
111 public void killJob() {
112 status = Status.KILLED;
113 }
114
115 public void completeJob() {
116 status = Status.SUCCEEDED;
117 }
118
119 @Override
120 public Object getTransientVar(String name) {
121 return LiteWorkflowInstance.this.getTransientVar(name);
122 }
123
124 @Override
125 public String getVar(String name) {
126 return LiteWorkflowInstance.this.getVar(name);
127 }
128
129 @Override
130 public void setTransientVar(String name, Object value) {
131 LiteWorkflowInstance.this.setTransientVar(name, value);
132 }
133
134 @Override
135 public void setVar(String name, String value) {
136 LiteWorkflowInstance.this.setVar(name, value);
137 }
138
139 @Override
140 public LiteWorkflowInstance getProcessInstance() {
141 return LiteWorkflowInstance.this;
142 }
143
144 }
145
146 private LiteWorkflowApp def;
147 private Configuration conf;
148 private String instanceId;
149 private Status status;
150 private Map<String, NodeInstance> executionPaths = new HashMap<String, NodeInstance>();
151 private Map<String, String> persistentVars = new HashMap<String, String>();
152 private Map<String, Object> transientVars = new HashMap<String, Object>();
153
154 protected LiteWorkflowInstance() {
155 log = XLog.getLog(getClass());
156 }
157
158 public LiteWorkflowInstance(LiteWorkflowApp def, Configuration conf, String instanceId) {
159 this();
160 this.def = ParamChecker.notNull(def, "def");
161 this.instanceId = ParamChecker.notNull(instanceId, "instanceId");
162 this.conf = ParamChecker.notNull(conf, "conf");
163 refreshLog();
164 status = Status.PREP;
165 }
166
167 public synchronized boolean start() throws WorkflowException {
168 if (status != Status.PREP) {
169 throw new WorkflowException(ErrorCode.E0719);
170 }
171 log.debug(XLog.STD, "Starting job");
172 status = Status.RUNNING;
173 executionPaths.put(ROOT, new NodeInstance(StartNodeDef.START));
174 return signal(ROOT, StartNodeDef.START);
175 }
176
177 //todo if suspended store signal and use when resuming
178
179 public synchronized boolean signal(String executionPath, String signalValue) throws WorkflowException {
180 ParamChecker.notEmpty(executionPath, "executionPath");
181 ParamChecker.notNull(signalValue, "signalValue");
182 log.debug(XLog.STD, "Signaling job execution path [{0}] signal value [{1}]", executionPath, signalValue);
183 if (status != Status.RUNNING) {
184 throw new WorkflowException(ErrorCode.E0716);
185 }
186 NodeInstance nodeJob = executionPaths.get(executionPath);
187 if (nodeJob == null) {
188 status = Status.FAILED;
189 log.error("invalid execution path [{0}]", executionPath);
190 }
191 NodeDef nodeDef = null;
192 if (!status.isEndState()) {
193 nodeDef = def.getNode(nodeJob.nodeName);
194 if (nodeDef == null) {
195 status = Status.FAILED;
196 log.error("invalid transition [{0}]", nodeJob.nodeName);
197 }
198 }
199 if (!status.isEndState()) {
200 NodeHandler nodeHandler = newInstance(nodeDef.getHandlerClass());
201 boolean exiting = true;
202
203 Context context = new Context(nodeDef, executionPath, signalValue);
204 if (!nodeJob.started) {
205 try {
206 nodeHandler.loopDetection(context);
207 exiting = nodeHandler.enter(context);
208 nodeJob.started = true;
209 }
210 catch (WorkflowException ex) {
211 status = Status.FAILED;
212 throw ex;
213 }
214 }
215
216 if (exiting) {
217 List<String> pathsToStart = new ArrayList<String>();
218 List<String> fullTransitions;
219 try {
220 fullTransitions = nodeHandler.multiExit(context);
221 int last = fullTransitions.size() - 1;
222 // TEST THIS
223 if (last >= 0) {
224 String transitionTo = getTransitionNode(fullTransitions.get(last));
225 if (nodeDef instanceof ForkNodeDef) {
226 transitionTo = "*"; // WF action cannot hold all transitions for a fork.
227 // transitions are hardcoded in the WF app.
228 }
229 persistentVars.put(nodeDef.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + TRANSITION_TO,
230 transitionTo);
231 }
232 }
233 catch (WorkflowException ex) {
234 status = Status.FAILED;
235 throw ex;
236 }
237
238 if (context.status == Status.KILLED) {
239 status = Status.KILLED;
240 log.debug(XLog.STD, "Completing job, kill node [{0}]", nodeJob.nodeName);
241 }
242 else {
243 if (context.status == Status.FAILED) {
244 status = Status.FAILED;
245 log.debug(XLog.STD, "Completing job, fail node [{0}]", nodeJob.nodeName);
246 }
247 else {
248 if (context.status == Status.SUCCEEDED) {
249 status = Status.SUCCEEDED;
250 log.debug(XLog.STD, "Completing job, end node [{0}]", nodeJob.nodeName);
251 }
252 /*
253 else if (context.status == Status.SUSPENDED) {
254 status = Status.SUSPENDED;
255 log.debug(XLog.STD, "Completing job, end node [{0}]", nodeJob.nodeName);
256 }
257 */
258 else {
259 for (String fullTransition : fullTransitions) {
260 // this is the whole trick for forking, we need the
261 // executionpath and the transition
262 // in the case of no forking last element of
263 // executionpath is different from transition
264 // in the case of forking they are the same
265
266 log.debug(XLog.STD, "Exiting node [{0}] with transition[{1}]", nodeJob.nodeName,
267 fullTransition);
268
269 String execPathFromTransition = getExecutionPath(fullTransition);
270 String transition = getTransitionNode(fullTransition);
271 def.validateTransition(nodeJob.nodeName, transition);
272
273 NodeInstance nodeJobInPath = executionPaths.get(execPathFromTransition);
274 if ((nodeJobInPath == null) || (!transition.equals(nodeJobInPath.nodeName))) {
275 // TODO explain this IF better
276 // If the WfJob is signaled with the parent
277 // execution executionPath again
278 // The Fork node will execute again.. and replace
279 // the Node WorkflowJobBean
280 // so this is required to prevent that..
281 // Question : Should we throw an error in this case
282 // ??
283 executionPaths.put(execPathFromTransition, new NodeInstance(transition));
284 pathsToStart.add(execPathFromTransition);
285 }
286
287 }
288 // signal all new synch transitions
289 for (String pathToStart : pathsToStart) {
290 signal(pathToStart, "::synch::");
291 }
292 }
293 }
294 }
295 }
296 }
297 if (status.isEndState()) {
298 if (status == Status.FAILED) {
299 List<String> failedNodes = terminateNodes(status);
300 log.warn(XLog.STD, "Workflow completed [{0}], failing [{1}] running nodes", status, failedNodes
301 .size());
302 }
303 else {
304 List<String> killedNodes = terminateNodes(Status.KILLED);
305
306 if (killedNodes.size() > 1) {
307 log.warn(XLog.STD, "Workflow completed [{0}], killing [{1}] running nodes", status, killedNodes
308 .size());
309 }
310 }
311 }
312 return status.isEndState();
313 }
314
315 /**
316 * Get NodeDef from workflow instance
317 * @param executionPath execution path
318 * @return node def
319 */
320 public NodeDef getNodeDef(String executionPath) {
321 NodeInstance nodeJob = executionPaths.get(executionPath);
322 NodeDef nodeDef = null;
323 if (nodeJob == null) {
324 log.error("invalid execution path [{0}]", executionPath);
325 }
326 else {
327 nodeDef = def.getNode(nodeJob.nodeName);
328 if (nodeDef == null) {
329 log.error("invalid transition [{0}]", nodeJob.nodeName);
330 }
331 }
332 return nodeDef;
333 }
334
335 public synchronized void fail(String nodeName) throws WorkflowException {
336 if (status.isEndState()) {
337 throw new WorkflowException(ErrorCode.E0718);
338 }
339 String failedNode = failNode(nodeName);
340 if (failedNode != null) {
341 log.warn(XLog.STD, "Workflow Failed. Failing node [{0}]", failedNode);
342 }
343 else {
344 //TODO failed attempting to fail the action. EXCEPTION
345 }
346 List<String> killedNodes = killNodes();
347 if (killedNodes.size() > 1) {
348 log.warn(XLog.STD, "Workflow Failed, killing [{0}] nodes", killedNodes.size());
349 }
350 status = Status.FAILED;
351 }
352
353 public synchronized void kill() throws WorkflowException {
354 if (status.isEndState()) {
355 throw new WorkflowException(ErrorCode.E0718);
356 }
357 log.debug(XLog.STD, "Killing job");
358 List<String> killedNodes = killNodes();
359 if (killedNodes.size() > 1) {
360 log.warn(XLog.STD, "workflow killed, killing [{0}] nodes", killedNodes.size());
361 }
362 status = Status.KILLED;
363 }
364
365 public synchronized void suspend() throws WorkflowException {
366 if (status != Status.RUNNING) {
367 throw new WorkflowException(ErrorCode.E0716);
368 }
369 log.debug(XLog.STD, "Suspending job");
370 this.status = Status.SUSPENDED;
371 }
372
373 public boolean isSuspended() {
374 return (status == Status.SUSPENDED);
375 }
376
377 public synchronized void resume() throws WorkflowException {
378 if (status != Status.SUSPENDED) {
379 throw new WorkflowException(ErrorCode.E0717);
380 }
381 log.debug(XLog.STD, "Resuming job");
382 status = Status.RUNNING;
383 }
384
385 public void setVar(String name, String value) {
386 if (value != null) {
387 persistentVars.put(name, value);
388 }
389 else {
390 persistentVars.remove(name);
391 }
392 }
393
394 @Override
395 public Map<String, String> getAllVars() {
396 return persistentVars;
397 }
398
399 @Override
400 public void setAllVars(Map<String, String> varMap) {
401 persistentVars.putAll(varMap);
402 }
403
404 public String getVar(String name) {
405 return persistentVars.get(name);
406 }
407
408
409 public void setTransientVar(String name, Object value) {
410 if (value != null) {
411 transientVars.put(name, value);
412 }
413 else {
414 transientVars.remove(name);
415 }
416 }
417
418 public boolean hasTransientVar(String name) {
419 return transientVars.containsKey(name);
420 }
421
422 public Object getTransientVar(String name) {
423 return transientVars.get(name);
424 }
425
426 public boolean hasEnded() {
427 return status.isEndState();
428 }
429
430 private List<String> terminateNodes(Status endStatus) {
431 List<String> endNodes = new ArrayList<String>();
432 for (Map.Entry<String, NodeInstance> entry : executionPaths.entrySet()) {
433 if (entry.getValue().started) {
434 NodeDef nodeDef = def.getNode(entry.getValue().nodeName);
435 if (!(nodeDef instanceof ControlNodeDef)) {
436 NodeHandler nodeHandler = newInstance(nodeDef.getHandlerClass());
437 try {
438 if (endStatus == Status.KILLED) {
439 nodeHandler.kill(new Context(nodeDef, entry.getKey(), null));
440 }
441 else {
442 if (endStatus == Status.FAILED) {
443 nodeHandler.fail(new Context(nodeDef, entry.getKey(), null));
444 }
445 }
446 endNodes.add(nodeDef.getName());
447 }
448 catch (Exception ex) {
449 log.warn(XLog.STD, "Error Changing node state to [{0}] for Node [{1}]", endStatus.toString(),
450 nodeDef.getName(), ex);
451 }
452 }
453 }
454 }
455 return endNodes;
456 }
457
458 private String failNode(String nodeName) {
459 String failedNode = null;
460 for (Map.Entry<String, NodeInstance> entry : executionPaths.entrySet()) {
461 String node = entry.getKey();
462 NodeInstance nodeInstance = entry.getValue();
463 if (nodeInstance.started && nodeInstance.nodeName.equals(nodeName)) {
464 NodeDef nodeDef = def.getNode(nodeInstance.nodeName);
465 NodeHandler nodeHandler = newInstance(nodeDef.getHandlerClass());
466 try {
467 nodeHandler.fail(new Context(nodeDef, node, null));
468 failedNode = nodeDef.getName();
469 nodeInstance.started = false;
470 }
471 catch (Exception ex) {
472 log.warn(XLog.STD, "Error failing node [{0}]", nodeDef.getName(), ex);
473 }
474 return failedNode;
475 }
476 }
477 return failedNode;
478 }
479
480 private List<String> killNodes() {
481 List<String> killedNodes = new ArrayList<String>();
482 for (Map.Entry<String, NodeInstance> entry : executionPaths.entrySet()) {
483 String node = entry.getKey();
484 NodeInstance nodeInstance = entry.getValue();
485 if (nodeInstance.started) {
486 NodeDef nodeDef = def.getNode(nodeInstance.nodeName);
487 NodeHandler nodeHandler = newInstance(nodeDef.getHandlerClass());
488 try {
489 nodeHandler.kill(new Context(nodeDef, node, null));
490 killedNodes.add(nodeDef.getName());
491 }
492 catch (Exception ex) {
493 log.warn(XLog.STD, "Error killing node [{0}]", nodeDef.getName(), ex);
494 }
495 }
496 }
497 return killedNodes;
498 }
499
500 public LiteWorkflowApp getProcessDefinition() {
501 return def;
502 }
503
504 private static String createChildPath(String path, String child) {
505 return path + child + PATH_SEPARATOR;
506 }
507
508 private static String getParentPath(String path) {
509 path = path.substring(0, path.length() - 1);
510 return (path.length() == 0) ? null : path.substring(0, path.lastIndexOf(PATH_SEPARATOR) + 1);
511 }
512
513 private static String createFullTransition(String executionPath, String transition) {
514 return executionPath + TRANSITION_SEPARATOR + transition;
515 }
516
517 private static String getExecutionPath(String fullTransition) {
518 int index = fullTransition.indexOf(TRANSITION_SEPARATOR);
519 if (index == -1) {
520 throw new IllegalArgumentException("Invalid fullTransition");
521 }
522 return fullTransition.substring(0, index);
523 }
524
525 private static String getTransitionNode(String fullTransition) {
526 int index = fullTransition.indexOf(TRANSITION_SEPARATOR);
527 if (index == -1) {
528 throw new IllegalArgumentException("Invalid fullTransition");
529 }
530 return fullTransition.substring(index + 1);
531 }
532
533 private NodeHandler newInstance(Class<? extends NodeHandler> handler) {
534 return (NodeHandler) ReflectionUtils.newInstance(handler, null);
535 }
536
537 private void refreshLog() {
538 XLog.Info.get().setParameter(XLogService.USER, conf.get(OozieClient.USER_NAME));
539 XLog.Info.get().setParameter(XLogService.GROUP, conf.get(OozieClient.GROUP_NAME));
540 XLog.Info.get().setParameter(DagXLogInfoService.APP, def.getName());
541 XLog.Info.get().setParameter(DagXLogInfoService.TOKEN, conf.get(OozieClient.LOG_TOKEN, ""));
542 XLog.Info.get().setParameter(DagXLogInfoService.JOB, instanceId);
543 log = XLog.getLog(getClass());
544 }
545
546 public Status getStatus() {
547 return status;
548 }
549
550 public void setStatus(Status status) {
551 this.status = status;
552 }
553
554 @Override
555 public void write(DataOutput dOut) throws IOException {
556 dOut.writeUTF(instanceId);
557
558 //Hadoop Configuration has to get its act right
559 ByteArrayOutputStream baos = new ByteArrayOutputStream();
560 conf.writeXml(baos);
561 baos.close();
562 byte[] array = baos.toByteArray();
563 dOut.writeInt(array.length);
564 dOut.write(array);
565
566 def.write(dOut);
567 dOut.writeUTF(status.toString());
568 dOut.writeInt(executionPaths.size());
569 for (Map.Entry<String, NodeInstance> entry : executionPaths.entrySet()) {
570 dOut.writeUTF(entry.getKey());
571 dOut.writeUTF(entry.getValue().nodeName);
572 dOut.writeBoolean(entry.getValue().started);
573 }
574 dOut.writeInt(persistentVars.size());
575 for (Map.Entry<String, String> entry : persistentVars.entrySet()) {
576 dOut.writeUTF(entry.getKey());
577 dOut.writeUTF(entry.getValue());
578 }
579 }
580
581 @Override
582 public void readFields(DataInput dIn) throws IOException {
583 instanceId = dIn.readUTF();
584
585 //Hadoop Configuration has to get its act right
586 int len = dIn.readInt();
587 byte[] array = new byte[len];
588 dIn.readFully(array);
589 ByteArrayInputStream bais = new ByteArrayInputStream(array);
590 conf = new XConfiguration(bais);
591
592 def = new LiteWorkflowApp();
593 def.readFields(dIn);
594 status = Status.valueOf(dIn.readUTF());
595 int numExPaths = dIn.readInt();
596 for (int x = 0; x < numExPaths; x++) {
597 String path = dIn.readUTF();
598 String nodeName = dIn.readUTF();
599 boolean isStarted = dIn.readBoolean();
600 NodeInstance nodeInstance = new NodeInstance(nodeName);
601 nodeInstance.started = isStarted;
602 executionPaths.put(path, nodeInstance);
603 }
604 int numVars = dIn.readInt();
605 for (int x = 0; x < numVars; x++) {
606 String vName = dIn.readUTF();
607 String vVal = dIn.readUTF();
608 persistentVars.put(vName, vVal);
609 }
610 refreshLog();
611 }
612
613 @Override
614 public Configuration getConf() {
615 return conf;
616 }
617
618 @Override
619 public WorkflowApp getApp() {
620 return def;
621 }
622
623 @Override
624 public String getId() {
625 return instanceId;
626 }
627
628 @Override
629 public String getTransition(String node) {
630 return persistentVars.get(node + WorkflowInstance.NODE_VAR_SEPARATOR + TRANSITION_TO);
631 }
632
633 @Override
634 public boolean equals(Object o) {
635 return (o != null) && (getClass().isInstance(o)) && ((WorkflowInstance) o).getId().equals(instanceId);
636 }
637
638 @Override
639 public int hashCode() {
640 return instanceId.hashCode();
641 }
642
643 }