001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.workflow.lite;
019    
020    import org.apache.oozie.service.XLogService;
021    import org.apache.oozie.service.DagXLogInfoService;
022    import org.apache.oozie.client.OozieClient;
023    import org.apache.hadoop.io.Writable;
024    import org.apache.hadoop.util.ReflectionUtils;
025    import org.apache.hadoop.conf.Configuration;
026    import org.apache.oozie.workflow.WorkflowApp;
027    import org.apache.oozie.workflow.WorkflowException;
028    import org.apache.oozie.workflow.WorkflowInstance;
029    import org.apache.oozie.util.ParamChecker;
030    import org.apache.oozie.util.XLog;
031    import org.apache.oozie.util.XConfiguration;
032    import org.apache.oozie.ErrorCode;
033    
034    import java.io.DataInput;
035    import java.io.DataOutput;
036    import java.io.IOException;
037    import java.io.ByteArrayOutputStream;
038    import java.io.ByteArrayInputStream;
039    import java.util.ArrayList;
040    import java.util.HashMap;
041    import java.util.List;
042    import java.util.Map;
043    
044    //TODO javadoc
045    public class LiteWorkflowInstance implements Writable, WorkflowInstance {
046        private static final String TRANSITION_TO = "transition.to";
047    
048        private XLog log;
049    
050        private static String PATH_SEPARATOR = "/";
051        private static String ROOT = PATH_SEPARATOR;
052        private static String TRANSITION_SEPARATOR = "#";
053    
054        private static class NodeInstance {
055            String nodeName;
056            boolean started = false;
057    
058            private NodeInstance(String nodeName) {
059                this.nodeName = nodeName;
060            }
061        }
062    
063        private class Context implements NodeHandler.Context {
064            private NodeDef nodeDef;
065            private String executionPath;
066            private String exitState;
067            private Status status = Status.RUNNING;
068    
069            private Context(NodeDef nodeDef, String executionPath, String exitState) {
070                this.nodeDef = nodeDef;
071                this.executionPath = executionPath;
072                this.exitState = exitState;
073            }
074    
075            public NodeDef getNodeDef() {
076                return nodeDef;
077            }
078    
079            public String getExecutionPath() {
080                return executionPath;
081            }
082    
083            public String getParentExecutionPath(String executionPath) {
084                return LiteWorkflowInstance.getParentPath(executionPath);
085            }
086    
087            public String getSignalValue() {
088                return exitState;
089            }
090    
091            public String createExecutionPath(String name) {
092                return LiteWorkflowInstance.createChildPath(executionPath, name);
093            }
094    
095            public String createFullTransition(String executionPath, String transition) {
096                return LiteWorkflowInstance.createFullTransition(executionPath, transition);
097            }
098    
099            public void deleteExecutionPath() {
100                if (!executionPaths.containsKey(executionPath)) {
101                    throw new IllegalStateException();
102                }
103                executionPaths.remove(executionPath);
104                executionPath = LiteWorkflowInstance.getParentPath(executionPath);
105            }
106    
107            public void failJob() {
108                status = Status.FAILED;
109            }
110    
111            public void killJob() {
112                status = Status.KILLED;
113            }
114    
115            public void completeJob() {
116                status = Status.SUCCEEDED;
117            }
118    
119            @Override
120            public Object getTransientVar(String name) {
121                return LiteWorkflowInstance.this.getTransientVar(name);
122            }
123    
124            @Override
125            public String getVar(String name) {
126                return LiteWorkflowInstance.this.getVar(name);
127            }
128    
129            @Override
130            public void setTransientVar(String name, Object value) {
131                LiteWorkflowInstance.this.setTransientVar(name, value);
132            }
133    
134            @Override
135            public void setVar(String name, String value) {
136                LiteWorkflowInstance.this.setVar(name, value);
137            }
138    
139            @Override
140            public LiteWorkflowInstance getProcessInstance() {
141                return LiteWorkflowInstance.this;
142            }
143    
144        }
145    
146        private LiteWorkflowApp def;
147        private Configuration conf;
148        private String instanceId;
149        private Status status;
150        private Map<String, NodeInstance> executionPaths = new HashMap<String, NodeInstance>();
151        private Map<String, String> persistentVars = new HashMap<String, String>();
152        private Map<String, Object> transientVars = new HashMap<String, Object>();
153    
154        protected LiteWorkflowInstance() {
155            log = XLog.getLog(getClass());
156        }
157    
158        public LiteWorkflowInstance(LiteWorkflowApp def, Configuration conf, String instanceId) {
159            this();
160            this.def = ParamChecker.notNull(def, "def");
161            this.instanceId = ParamChecker.notNull(instanceId, "instanceId");
162            this.conf = ParamChecker.notNull(conf, "conf");
163            refreshLog();
164            status = Status.PREP;
165        }
166    
167        public synchronized boolean start() throws WorkflowException {
168            if (status != Status.PREP) {
169                throw new WorkflowException(ErrorCode.E0719);
170            }
171            log.debug(XLog.STD, "Starting job");
172            status = Status.RUNNING;
173            executionPaths.put(ROOT, new NodeInstance(StartNodeDef.START));
174            return signal(ROOT, StartNodeDef.START);
175        }
176    
177        //todo if suspended store signal and use when resuming
178    
179        public synchronized boolean signal(String executionPath, String signalValue) throws WorkflowException {
180            ParamChecker.notEmpty(executionPath, "executionPath");
181            ParamChecker.notNull(signalValue, "signalValue");
182            log.debug(XLog.STD, "Signaling job execution path [{0}] signal value [{1}]", executionPath, signalValue);
183            if (status != Status.RUNNING) {
184                throw new WorkflowException(ErrorCode.E0716);
185            }
186            NodeInstance nodeJob = executionPaths.get(executionPath);
187            if (nodeJob == null) {
188                status = Status.FAILED;
189                log.error("invalid execution path [{0}]", executionPath);
190            }
191            NodeDef nodeDef = null;
192            if (!status.isEndState()) {
193                nodeDef = def.getNode(nodeJob.nodeName);
194                if (nodeDef == null) {
195                    status = Status.FAILED;
196                    log.error("invalid transition [{0}]", nodeJob.nodeName);
197                }
198            }
199            if (!status.isEndState()) {
200                NodeHandler nodeHandler = newInstance(nodeDef.getHandlerClass());
201                boolean exiting = true;
202    
203                Context context = new Context(nodeDef, executionPath, signalValue);
204                if (!nodeJob.started) {
205                    try {
206                        nodeHandler.loopDetection(context);
207                        exiting = nodeHandler.enter(context);
208                        nodeJob.started = true;
209                    }
210                    catch (WorkflowException ex) {
211                        status = Status.FAILED;
212                        throw ex;
213                    }
214                }
215    
216                if (exiting) {
217                    List<String> pathsToStart = new ArrayList<String>();
218                    List<String> fullTransitions;
219                    try {
220                        fullTransitions = nodeHandler.multiExit(context);
221                        int last = fullTransitions.size() - 1;
222                        // TEST THIS
223                        if (last >= 0) {
224                            String transitionTo = getTransitionNode(fullTransitions.get(last));
225                            if (nodeDef instanceof ForkNodeDef) {
226                                transitionTo = "*"; // WF action cannot hold all transitions for a fork.
227                                                    // transitions are hardcoded in the WF app.
228                            }
229                            persistentVars.put(nodeDef.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + TRANSITION_TO,
230                                               transitionTo);
231                        }
232                    }
233                    catch (WorkflowException ex) {
234                        status = Status.FAILED;
235                        throw ex;
236                    }
237    
238                    if (context.status == Status.KILLED) {
239                        status = Status.KILLED;
240                        log.debug(XLog.STD, "Completing job, kill node [{0}]", nodeJob.nodeName);
241                    }
242                    else {
243                        if (context.status == Status.FAILED) {
244                            status = Status.FAILED;
245                            log.debug(XLog.STD, "Completing job, fail node [{0}]", nodeJob.nodeName);
246                        }
247                        else {
248                            if (context.status == Status.SUCCEEDED) {
249                                status = Status.SUCCEEDED;
250                                log.debug(XLog.STD, "Completing job, end node [{0}]", nodeJob.nodeName);
251                            }
252    /*
253                    else if (context.status == Status.SUSPENDED) {
254                        status = Status.SUSPENDED;
255                        log.debug(XLog.STD, "Completing job, end node [{0}]", nodeJob.nodeName);
256                    }
257    */
258                            else {
259                                for (String fullTransition : fullTransitions) {
260                                    // this is the whole trick for forking, we need the
261                                    // executionpath and the transition
262                                    // in the case of no forking last element of
263                                    // executionpath is different from transition
264                                    // in the case of forking they are the same
265    
266                                    log.debug(XLog.STD, "Exiting node [{0}] with transition[{1}]", nodeJob.nodeName,
267                                              fullTransition);
268    
269                                    String execPathFromTransition = getExecutionPath(fullTransition);
270                                    String transition = getTransitionNode(fullTransition);
271                                    def.validateTransition(nodeJob.nodeName, transition);
272    
273                                    NodeInstance nodeJobInPath = executionPaths.get(execPathFromTransition);
274                                    if ((nodeJobInPath == null) || (!transition.equals(nodeJobInPath.nodeName))) {
275                                        // TODO explain this IF better
276                                        // If the WfJob is signaled with the parent
277                                        // execution executionPath again
278                                        // The Fork node will execute again.. and replace
279                                        // the Node WorkflowJobBean
280                                        // so this is required to prevent that..
281                                        // Question : Should we throw an error in this case
282                                        // ??
283                                        executionPaths.put(execPathFromTransition, new NodeInstance(transition));
284                                        pathsToStart.add(execPathFromTransition);
285                                    }
286    
287                                }
288                                // signal all new synch transitions
289                                for (String pathToStart : pathsToStart) {
290                                    signal(pathToStart, "::synch::");
291                                }
292                            }
293                        }
294                    }
295                }
296            }
297            if (status.isEndState()) {
298                if (status == Status.FAILED) {
299                    List<String> failedNodes = terminateNodes(status);
300                    log.warn(XLog.STD, "Workflow completed [{0}], failing [{1}] running nodes", status, failedNodes
301                            .size());
302                }
303                else {
304                    List<String> killedNodes = terminateNodes(Status.KILLED);
305    
306                    if (killedNodes.size() > 1) {
307                        log.warn(XLog.STD, "Workflow completed [{0}], killing [{1}] running nodes", status, killedNodes
308                                .size());
309                    }
310                }
311            }
312            return status.isEndState();
313        }
314    
315        /**
316         * Get NodeDef from workflow instance
317         * @param executionPath execution path
318         * @return node def
319         */
320        public NodeDef getNodeDef(String executionPath) {
321            NodeInstance nodeJob = executionPaths.get(executionPath);
322            NodeDef nodeDef = null;
323            if (nodeJob == null) {
324                log.error("invalid execution path [{0}]", executionPath);
325            }
326            else {
327                nodeDef = def.getNode(nodeJob.nodeName);
328                if (nodeDef == null) {
329                    log.error("invalid transition [{0}]", nodeJob.nodeName);
330                }
331            }
332            return nodeDef;
333        }
334    
335        public synchronized void fail(String nodeName) throws WorkflowException {
336            if (status.isEndState()) {
337                throw new WorkflowException(ErrorCode.E0718);
338            }
339            String failedNode = failNode(nodeName);
340            if (failedNode != null) {
341                log.warn(XLog.STD, "Workflow Failed. Failing node [{0}]", failedNode);
342            }
343            else {
344                //TODO failed attempting to fail the action. EXCEPTION
345            }
346            List<String> killedNodes = killNodes();
347            if (killedNodes.size() > 1) {
348                log.warn(XLog.STD, "Workflow Failed, killing [{0}] nodes", killedNodes.size());
349            }
350            status = Status.FAILED;
351        }
352    
353        public synchronized void kill() throws WorkflowException {
354            if (status.isEndState()) {
355                throw new WorkflowException(ErrorCode.E0718);
356            }
357            log.debug(XLog.STD, "Killing job");
358            List<String> killedNodes = killNodes();
359            if (killedNodes.size() > 1) {
360                log.warn(XLog.STD, "workflow killed, killing [{0}] nodes", killedNodes.size());
361            }
362            status = Status.KILLED;
363        }
364    
365        public synchronized void suspend() throws WorkflowException {
366            if (status != Status.RUNNING) {
367                throw new WorkflowException(ErrorCode.E0716);
368            }
369            log.debug(XLog.STD, "Suspending job");
370            this.status = Status.SUSPENDED;
371        }
372    
373        public boolean isSuspended() {
374            return (status == Status.SUSPENDED);
375        }
376    
377        public synchronized void resume() throws WorkflowException {
378            if (status != Status.SUSPENDED) {
379                throw new WorkflowException(ErrorCode.E0717);
380            }
381            log.debug(XLog.STD, "Resuming job");
382            status = Status.RUNNING;
383        }
384    
385        public void setVar(String name, String value) {
386            if (value != null) {
387                persistentVars.put(name, value);
388            }
389            else {
390                persistentVars.remove(name);
391            }
392        }
393    
394        @Override
395        public Map<String, String> getAllVars() {
396            return persistentVars;
397        }
398    
399        @Override
400        public void setAllVars(Map<String, String> varMap) {
401            persistentVars.putAll(varMap);
402        }
403    
404        public String getVar(String name) {
405            return persistentVars.get(name);
406        }
407    
408    
409        public void setTransientVar(String name, Object value) {
410            if (value != null) {
411                transientVars.put(name, value);
412            }
413            else {
414                transientVars.remove(name);
415            }
416        }
417    
418        public boolean hasTransientVar(String name) {
419            return transientVars.containsKey(name);
420        }
421    
422        public Object getTransientVar(String name) {
423            return transientVars.get(name);
424        }
425    
426        public boolean hasEnded() {
427            return status.isEndState();
428        }
429    
430        private List<String> terminateNodes(Status endStatus) {
431            List<String> endNodes = new ArrayList<String>();
432            for (Map.Entry<String, NodeInstance> entry : executionPaths.entrySet()) {
433                if (entry.getValue().started) {
434                    NodeDef nodeDef = def.getNode(entry.getValue().nodeName);
435                    if (!(nodeDef instanceof ControlNodeDef)) {
436                        NodeHandler nodeHandler = newInstance(nodeDef.getHandlerClass());
437                        try {
438                            if (endStatus == Status.KILLED) {
439                                nodeHandler.kill(new Context(nodeDef, entry.getKey(), null));
440                            }
441                            else {
442                                if (endStatus == Status.FAILED) {
443                                    nodeHandler.fail(new Context(nodeDef, entry.getKey(), null));
444                                }
445                            }
446                            endNodes.add(nodeDef.getName());
447                        }
448                        catch (Exception ex) {
449                            log.warn(XLog.STD, "Error Changing node state to [{0}] for Node [{1}]", endStatus.toString(),
450                                     nodeDef.getName(), ex);
451                        }
452                    }
453                }
454            }
455            return endNodes;
456        }
457    
458        private String failNode(String nodeName) {
459            String failedNode = null;
460            for (Map.Entry<String, NodeInstance> entry : executionPaths.entrySet()) {
461                String node = entry.getKey();
462                NodeInstance nodeInstance = entry.getValue();
463                if (nodeInstance.started && nodeInstance.nodeName.equals(nodeName)) {
464                    NodeDef nodeDef = def.getNode(nodeInstance.nodeName);
465                    NodeHandler nodeHandler = newInstance(nodeDef.getHandlerClass());
466                    try {
467                        nodeHandler.fail(new Context(nodeDef, node, null));
468                        failedNode = nodeDef.getName();
469                        nodeInstance.started = false;
470                    }
471                    catch (Exception ex) {
472                        log.warn(XLog.STD, "Error failing node [{0}]", nodeDef.getName(), ex);
473                    }
474                    return failedNode;
475                }
476            }
477            return failedNode;
478        }
479    
480        private List<String> killNodes() {
481            List<String> killedNodes = new ArrayList<String>();
482            for (Map.Entry<String, NodeInstance> entry : executionPaths.entrySet()) {
483                String node = entry.getKey();
484                NodeInstance nodeInstance = entry.getValue();
485                if (nodeInstance.started) {
486                    NodeDef nodeDef = def.getNode(nodeInstance.nodeName);
487                    NodeHandler nodeHandler = newInstance(nodeDef.getHandlerClass());
488                    try {
489                        nodeHandler.kill(new Context(nodeDef, node, null));
490                        killedNodes.add(nodeDef.getName());
491                    }
492                    catch (Exception ex) {
493                        log.warn(XLog.STD, "Error killing node [{0}]", nodeDef.getName(), ex);
494                    }
495                }
496            }
497            return killedNodes;
498        }
499    
500        public LiteWorkflowApp getProcessDefinition() {
501            return def;
502        }
503    
504        private static String createChildPath(String path, String child) {
505            return path + child + PATH_SEPARATOR;
506        }
507    
508        private static String getParentPath(String path) {
509            path = path.substring(0, path.length() - 1);
510            return (path.length() == 0) ? null : path.substring(0, path.lastIndexOf(PATH_SEPARATOR) + 1);
511        }
512    
513        private static String createFullTransition(String executionPath, String transition) {
514            return executionPath + TRANSITION_SEPARATOR + transition;
515        }
516    
517        private static String getExecutionPath(String fullTransition) {
518            int index = fullTransition.indexOf(TRANSITION_SEPARATOR);
519            if (index == -1) {
520                throw new IllegalArgumentException("Invalid fullTransition");
521            }
522            return fullTransition.substring(0, index);
523        }
524    
525        private static String getTransitionNode(String fullTransition) {
526            int index = fullTransition.indexOf(TRANSITION_SEPARATOR);
527            if (index == -1) {
528                throw new IllegalArgumentException("Invalid fullTransition");
529            }
530            return fullTransition.substring(index + 1);
531        }
532    
533        private NodeHandler newInstance(Class<? extends NodeHandler> handler) {
534            return (NodeHandler) ReflectionUtils.newInstance(handler, null);
535        }
536    
537        private void refreshLog() {
538            XLog.Info.get().setParameter(XLogService.USER, conf.get(OozieClient.USER_NAME));
539            XLog.Info.get().setParameter(XLogService.GROUP, conf.get(OozieClient.GROUP_NAME));
540            XLog.Info.get().setParameter(DagXLogInfoService.APP, def.getName());
541            XLog.Info.get().setParameter(DagXLogInfoService.TOKEN, conf.get(OozieClient.LOG_TOKEN, ""));
542            XLog.Info.get().setParameter(DagXLogInfoService.JOB, instanceId);
543            log = XLog.getLog(getClass());
544        }
545    
546        public Status getStatus() {
547            return status;
548        }
549    
550        public void setStatus(Status status) {
551            this.status = status;
552        }
553    
554        @Override
555        public void write(DataOutput dOut) throws IOException {
556            dOut.writeUTF(instanceId);
557    
558            //Hadoop Configuration has to get its act right
559            ByteArrayOutputStream baos = new ByteArrayOutputStream();
560            conf.writeXml(baos);
561            baos.close();
562            byte[] array = baos.toByteArray();
563            dOut.writeInt(array.length);
564            dOut.write(array);
565    
566            def.write(dOut);
567            dOut.writeUTF(status.toString());
568            dOut.writeInt(executionPaths.size());
569            for (Map.Entry<String, NodeInstance> entry : executionPaths.entrySet()) {
570                dOut.writeUTF(entry.getKey());
571                dOut.writeUTF(entry.getValue().nodeName);
572                dOut.writeBoolean(entry.getValue().started);
573            }
574            dOut.writeInt(persistentVars.size());
575            for (Map.Entry<String, String> entry : persistentVars.entrySet()) {
576                dOut.writeUTF(entry.getKey());
577                dOut.writeUTF(entry.getValue());
578            }
579        }
580    
581        @Override
582        public void readFields(DataInput dIn) throws IOException {
583            instanceId = dIn.readUTF();
584    
585            //Hadoop Configuration has to get its act right
586            int len = dIn.readInt();
587            byte[] array = new byte[len];
588            dIn.readFully(array);
589            ByteArrayInputStream bais = new ByteArrayInputStream(array);
590            conf = new XConfiguration(bais);
591    
592            def = new LiteWorkflowApp();
593            def.readFields(dIn);
594            status = Status.valueOf(dIn.readUTF());
595            int numExPaths = dIn.readInt();
596            for (int x = 0; x < numExPaths; x++) {
597                String path = dIn.readUTF();
598                String nodeName = dIn.readUTF();
599                boolean isStarted = dIn.readBoolean();
600                NodeInstance nodeInstance = new NodeInstance(nodeName);
601                nodeInstance.started = isStarted;
602                executionPaths.put(path, nodeInstance);
603            }
604            int numVars = dIn.readInt();
605            for (int x = 0; x < numVars; x++) {
606                String vName = dIn.readUTF();
607                String vVal = dIn.readUTF();
608                persistentVars.put(vName, vVal);
609            }
610            refreshLog();
611        }
612    
613        @Override
614        public Configuration getConf() {
615            return conf;
616        }
617    
618        @Override
619        public WorkflowApp getApp() {
620            return def;
621        }
622    
623        @Override
624        public String getId() {
625            return instanceId;
626        }
627    
628        @Override
629        public String getTransition(String node) {
630            return persistentVars.get(node + WorkflowInstance.NODE_VAR_SEPARATOR + TRANSITION_TO);
631        }
632    
633        @Override
634        public boolean equals(Object o) {
635            return (o != null) && (getClass().isInstance(o)) && ((WorkflowInstance) o).getId().equals(instanceId);
636        }
637    
638        @Override
639        public int hashCode() {
640            return instanceId.hashCode();
641        }
642    
643    }