001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.workflow.lite;
019    
020    import org.apache.oozie.service.XLogService;
021    import org.apache.oozie.service.DagXLogInfoService;
022    import org.apache.oozie.client.OozieClient;
023    import org.apache.hadoop.io.Writable;
024    import org.apache.hadoop.util.ReflectionUtils;
025    import org.apache.hadoop.conf.Configuration;
026    import org.apache.oozie.workflow.WorkflowApp;
027    import org.apache.oozie.workflow.WorkflowException;
028    import org.apache.oozie.workflow.WorkflowInstance;
029    import org.apache.oozie.util.ParamChecker;
030    import org.apache.oozie.util.XLog;
031    import org.apache.oozie.util.XConfiguration;
032    import org.apache.oozie.ErrorCode;
033    
034    import java.io.DataInput;
035    import java.io.DataOutput;
036    import java.io.IOException;
037    import java.io.ByteArrayOutputStream;
038    import java.io.ByteArrayInputStream;
039    import java.util.ArrayList;
040    import java.util.HashMap;
041    import java.util.List;
042    import java.util.Map;
043    
044    //TODO javadoc
045    public class LiteWorkflowInstance implements Writable, WorkflowInstance {
046        private static final String TRANSITION_TO = "transition.to";
047    
048        private XLog log;
049    
050        private static String PATH_SEPARATOR = "/";
051        private static String ROOT = PATH_SEPARATOR;
052        private static String TRANSITION_SEPARATOR = "#";
053    
054        private static class NodeInstance {
055            String nodeName;
056            boolean started = false;
057    
058            private NodeInstance(String nodeName) {
059                this.nodeName = nodeName;
060            }
061        }
062    
063        private class Context implements NodeHandler.Context {
064            private NodeDef nodeDef;
065            private String executionPath;
066            private String exitState;
067            private Status status = Status.RUNNING;
068    
069            private Context(NodeDef nodeDef, String executionPath, String exitState) {
070                this.nodeDef = nodeDef;
071                this.executionPath = executionPath;
072                this.exitState = exitState;
073            }
074    
075            public NodeDef getNodeDef() {
076                return nodeDef;
077            }
078    
079            public String getExecutionPath() {
080                return executionPath;
081            }
082    
083            public String getParentExecutionPath(String executionPath) {
084                return LiteWorkflowInstance.getParentPath(executionPath);
085            }
086    
087            public String getSignalValue() {
088                return exitState;
089            }
090    
091            public String createExecutionPath(String name) {
092                return LiteWorkflowInstance.createChildPath(executionPath, name);
093            }
094    
095            public String createFullTransition(String executionPath, String transition) {
096                return LiteWorkflowInstance.createFullTransition(executionPath, transition);
097            }
098    
099            public void deleteExecutionPath() {
100                if (!executionPaths.containsKey(executionPath)) {
101                    throw new IllegalStateException();
102                }
103                executionPaths.remove(executionPath);
104                executionPath = LiteWorkflowInstance.getParentPath(executionPath);
105            }
106    
107            public void failJob() {
108                status = Status.FAILED;
109            }
110    
111            public void killJob() {
112                status = Status.KILLED;
113            }
114    
115            public void completeJob() {
116                status = Status.SUCCEEDED;
117            }
118    
119            @Override
120            public Object getTransientVar(String name) {
121                return LiteWorkflowInstance.this.getTransientVar(name);
122            }
123    
124            @Override
125            public String getVar(String name) {
126                return LiteWorkflowInstance.this.getVar(name);
127            }
128    
129            @Override
130            public void setTransientVar(String name, Object value) {
131                LiteWorkflowInstance.this.setTransientVar(name, value);
132            }
133    
134            @Override
135            public void setVar(String name, String value) {
136                LiteWorkflowInstance.this.setVar(name, value);
137            }
138    
139            @Override
140            public LiteWorkflowInstance getProcessInstance() {
141                return LiteWorkflowInstance.this;
142            }
143    
144        }
145    
146        private LiteWorkflowApp def;
147        private Configuration conf;
148        private String instanceId;
149        private Status status;
150        private Map<String, NodeInstance> executionPaths = new HashMap<String, NodeInstance>();
151        private Map<String, String> persistentVars = new HashMap<String, String>();
152        private Map<String, Object> transientVars = new HashMap<String, Object>();
153    
154        protected LiteWorkflowInstance() {
155            log = XLog.getLog(getClass());
156        }
157    
158        public LiteWorkflowInstance(LiteWorkflowApp def, Configuration conf, String instanceId) {
159            this();
160            this.def = ParamChecker.notNull(def, "def");
161            this.instanceId = ParamChecker.notNull(instanceId, "instanceId");
162            this.conf = ParamChecker.notNull(conf, "conf");
163            refreshLog();
164            status = Status.PREP;
165        }
166    
167        public synchronized boolean start() throws WorkflowException {
168            if (status != Status.PREP) {
169                throw new WorkflowException(ErrorCode.E0719);
170            }
171            log.debug(XLog.STD, "Starting job");
172            status = Status.RUNNING;
173            executionPaths.put(ROOT, new NodeInstance(StartNodeDef.START));
174            return signal(ROOT, StartNodeDef.START);
175        }
176    
177        //todo if suspended store signal and use when resuming
178    
179        public synchronized boolean signal(String executionPath, String signalValue) throws WorkflowException {
180            ParamChecker.notEmpty(executionPath, "executionPath");
181            ParamChecker.notNull(signalValue, "signalValue");
182            log.debug(XLog.STD, "Signaling job execution path [{0}] signal value [{1}]", executionPath, signalValue);
183            if (status != Status.RUNNING) {
184                throw new WorkflowException(ErrorCode.E0716);
185            }
186            NodeInstance nodeJob = executionPaths.get(executionPath);
187            if (nodeJob == null) {
188                status = Status.FAILED;
189                log.error("invalid execution path [{0}]", executionPath);
190            }
191            NodeDef nodeDef = null;
192            if (!status.isEndState()) {
193                nodeDef = def.getNode(nodeJob.nodeName);
194                if (nodeDef == null) {
195                    status = Status.FAILED;
196                    log.error("invalid transition [{0}]", nodeJob.nodeName);
197                }
198            }
199            if (!status.isEndState()) {
200                NodeHandler nodeHandler = newInstance(nodeDef.getHandlerClass());
201                boolean exiting = true;
202    
203                Context context = new Context(nodeDef, executionPath, signalValue);
204                if (!nodeJob.started) {
205                    try {
206                        nodeHandler.loopDetection(context);
207                        exiting = nodeHandler.enter(context);
208                        nodeJob.started = true;
209                    }
210                    catch (WorkflowException ex) {
211                        status = Status.FAILED;
212                        throw ex;
213                    }
214                }
215    
216                if (exiting) {
217                    List<String> pathsToStart = new ArrayList<String>();
218                    List<String> fullTransitions;
219                    try {
220                        fullTransitions = nodeHandler.multiExit(context);
221                        int last = fullTransitions.size() - 1;
222                        // TEST THIS
223                        if (last >= 0) {
224                            String transitionTo = getTransitionNode(fullTransitions.get(last));
225    
226                            persistentVars.put(nodeDef.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + TRANSITION_TO,
227                                               transitionTo);
228                        }
229                    }
230                    catch (WorkflowException ex) {
231                        status = Status.FAILED;
232                        throw ex;
233                    }
234    
235                    if (context.status == Status.KILLED) {
236                        status = Status.KILLED;
237                        log.debug(XLog.STD, "Completing job, kill node [{0}]", nodeJob.nodeName);
238                    }
239                    else {
240                        if (context.status == Status.FAILED) {
241                            status = Status.FAILED;
242                            log.debug(XLog.STD, "Completing job, fail node [{0}]", nodeJob.nodeName);
243                        }
244                        else {
245                            if (context.status == Status.SUCCEEDED) {
246                                status = Status.SUCCEEDED;
247                                log.debug(XLog.STD, "Completing job, end node [{0}]", nodeJob.nodeName);
248                            }
249    /*
250                    else if (context.status == Status.SUSPENDED) {
251                        status = Status.SUSPENDED;
252                        log.debug(XLog.STD, "Completing job, end node [{0}]", nodeJob.nodeName);
253                    }
254    */
255                            else {
256                                for (String fullTransition : fullTransitions) {
257                                    // this is the whole trick for forking, we need the
258                                    // executionpath and the transition
259                                    // in the case of no forking last element of
260                                    // executionpath is different from transition
261                                    // in the case of forking they are the same
262    
263                                    log.debug(XLog.STD, "Exiting node [{0}] with transition[{1}]", nodeJob.nodeName,
264                                              fullTransition);
265    
266                                    String execPathFromTransition = getExecutionPath(fullTransition);
267                                    String transition = getTransitionNode(fullTransition);
268                                    def.validateTransition(nodeJob.nodeName, transition);
269    
270                                    NodeInstance nodeJobInPath = executionPaths.get(execPathFromTransition);
271                                    if ((nodeJobInPath == null) || (!transition.equals(nodeJobInPath.nodeName))) {
272                                        // TODO explain this IF better
273                                        // If the WfJob is signaled with the parent
274                                        // execution executionPath again
275                                        // The Fork node will execute again.. and replace
276                                        // the Node WorkflowJobBean
277                                        // so this is required to prevent that..
278                                        // Question : Should we throw an error in this case
279                                        // ??
280                                        executionPaths.put(execPathFromTransition, new NodeInstance(transition));
281                                        pathsToStart.add(execPathFromTransition);
282                                    }
283    
284                                }
285                                // signal all new synch transitions
286                                for (String pathToStart : pathsToStart) {
287                                    signal(pathToStart, "::synch::");
288                                }
289                            }
290                        }
291                    }
292                }
293            }
294            if (status.isEndState()) {
295                if (status == Status.FAILED) {
296                    List<String> failedNodes = terminateNodes(status);
297                    log.warn(XLog.STD, "Workflow completed [{0}], failing [{1}] running nodes", status, failedNodes
298                            .size());
299                }
300                else {
301                    List<String> killedNodes = terminateNodes(Status.KILLED);
302                    if (killedNodes.size() > 1) {
303                        log.warn(XLog.STD, "Workflow completed [{0}], killing [{1}] running nodes", status, killedNodes
304                                .size());
305                    }
306                }
307            }
308            return status.isEndState();
309        }
310    
311        /**
312         * Get NodeDef from workflow instance
313         * @param executionPath execution path
314         * @return node def
315         */
316        public NodeDef getNodeDef(String executionPath) {
317            NodeInstance nodeJob = executionPaths.get(executionPath);
318            NodeDef nodeDef = null;
319            if (nodeJob == null) {
320                log.error("invalid execution path [{0}]", executionPath);
321            }
322            else {
323                nodeDef = def.getNode(nodeJob.nodeName);
324                if (nodeDef == null) {
325                    log.error("invalid transition [{0}]", nodeJob.nodeName);
326                }
327            }
328            return nodeDef;
329        }
330    
331        public synchronized void fail(String nodeName) throws WorkflowException {
332            if (status.isEndState()) {
333                throw new WorkflowException(ErrorCode.E0718);
334            }
335            String failedNode = failNode(nodeName);
336            if (failedNode != null) {
337                log.warn(XLog.STD, "Workflow Failed. Failing node [{0}]", failedNode);
338            }
339            else {
340                //TODO failed attempting to fail the action. EXCEPTION
341            }
342            List<String> killedNodes = killNodes();
343            if (killedNodes.size() > 1) {
344                log.warn(XLog.STD, "Workflow Failed, killing [{0}] nodes", killedNodes.size());
345            }
346            status = Status.FAILED;
347        }
348    
349        public synchronized void kill() throws WorkflowException {
350            if (status.isEndState()) {
351                throw new WorkflowException(ErrorCode.E0718);
352            }
353            log.debug(XLog.STD, "Killing job");
354            List<String> killedNodes = killNodes();
355            if (killedNodes.size() > 1) {
356                log.warn(XLog.STD, "workflow killed, killing [{0}] nodes", killedNodes.size());
357            }
358            status = Status.KILLED;
359        }
360    
361        public synchronized void suspend() throws WorkflowException {
362            if (status != Status.RUNNING) {
363                throw new WorkflowException(ErrorCode.E0716);
364            }
365            log.debug(XLog.STD, "Suspending job");
366            this.status = Status.SUSPENDED;
367        }
368    
369        public boolean isSuspended() {
370            return (status == Status.SUSPENDED);
371        }
372    
373        public synchronized void resume() throws WorkflowException {
374            if (status != Status.SUSPENDED) {
375                throw new WorkflowException(ErrorCode.E0717);
376            }
377            log.debug(XLog.STD, "Resuming job");
378            status = Status.RUNNING;
379        }
380    
381        public void setVar(String name, String value) {
382            if (value != null) {
383                persistentVars.put(name, value);
384            }
385            else {
386                persistentVars.remove(name);
387            }
388        }
389    
390        @Override
391        public Map<String, String> getAllVars() {
392            return persistentVars;
393        }
394    
395        @Override
396        public void setAllVars(Map<String, String> varMap) {
397            persistentVars.putAll(varMap);
398        }
399    
400        public String getVar(String name) {
401            return persistentVars.get(name);
402        }
403    
404    
405        public void setTransientVar(String name, Object value) {
406            if (value != null) {
407                transientVars.put(name, value);
408            }
409            else {
410                transientVars.remove(name);
411            }
412        }
413    
414        public boolean hasTransientVar(String name) {
415            return transientVars.containsKey(name);
416        }
417    
418        public Object getTransientVar(String name) {
419            return transientVars.get(name);
420        }
421    
422        public boolean hasEnded() {
423            return status.isEndState();
424        }
425    
426        private List<String> terminateNodes(Status endStatus) {
427            List<String> endNodes = new ArrayList<String>();
428            for (Map.Entry<String, NodeInstance> entry : executionPaths.entrySet()) {
429                if (entry.getValue().started) {
430                    NodeDef nodeDef = def.getNode(entry.getValue().nodeName);
431                    NodeHandler nodeHandler = newInstance(nodeDef.getHandlerClass());
432                    try {
433                        if (endStatus == Status.KILLED) {
434                            nodeHandler.kill(new Context(nodeDef, entry.getKey(), null));
435                        }
436                        else {
437                            if (endStatus == Status.FAILED) {
438                                nodeHandler.fail(new Context(nodeDef, entry.getKey(), null));
439                            }
440                        }
441                        endNodes.add(nodeDef.getName());
442                    }
443                    catch (Exception ex) {
444                        log.warn(XLog.STD, "Error Changing node state to [{0}] for Node [{1}]", endStatus.toString(),
445                                 nodeDef.getName(), ex);
446                    }
447                }
448            }
449            return endNodes;
450        }
451    
452        private String failNode(String nodeName) {
453            String failedNode = null;
454            for (Map.Entry<String, NodeInstance> entry : executionPaths.entrySet()) {
455                String node = entry.getKey();
456                NodeInstance nodeInstance = entry.getValue();
457                if (nodeInstance.started && nodeInstance.nodeName.equals(nodeName)) {
458                    NodeDef nodeDef = def.getNode(nodeInstance.nodeName);
459                    NodeHandler nodeHandler = newInstance(nodeDef.getHandlerClass());
460                    try {
461                        nodeHandler.fail(new Context(nodeDef, node, null));
462                        failedNode = nodeDef.getName();
463                        nodeInstance.started = false;
464                    }
465                    catch (Exception ex) {
466                        log.warn(XLog.STD, "Error failing node [{0}]", nodeDef.getName(), ex);
467                    }
468                    return failedNode;
469                }
470            }
471            return failedNode;
472        }
473    
474        private List<String> killNodes() {
475            List<String> killedNodes = new ArrayList<String>();
476            for (Map.Entry<String, NodeInstance> entry : executionPaths.entrySet()) {
477                String node = entry.getKey();
478                NodeInstance nodeInstance = entry.getValue();
479                if (nodeInstance.started) {
480                    NodeDef nodeDef = def.getNode(nodeInstance.nodeName);
481                    NodeHandler nodeHandler = newInstance(nodeDef.getHandlerClass());
482                    try {
483                        nodeHandler.kill(new Context(nodeDef, node, null));
484                        killedNodes.add(nodeDef.getName());
485                    }
486                    catch (Exception ex) {
487                        log.warn(XLog.STD, "Error killing node [{0}]", nodeDef.getName(), ex);
488                    }
489                }
490            }
491            return killedNodes;
492        }
493    
494        public LiteWorkflowApp getProcessDefinition() {
495            return def;
496        }
497    
498        private static String createChildPath(String path, String child) {
499            return path + child + PATH_SEPARATOR;
500        }
501    
502        private static String getParentPath(String path) {
503            path = path.substring(0, path.length() - 1);
504            return (path.length() == 0) ? null : path.substring(0, path.lastIndexOf(PATH_SEPARATOR) + 1);
505        }
506    
507        private static String createFullTransition(String executionPath, String transition) {
508            return executionPath + TRANSITION_SEPARATOR + transition;
509        }
510    
511        private static String getExecutionPath(String fullTransition) {
512            int index = fullTransition.indexOf(TRANSITION_SEPARATOR);
513            if (index == -1) {
514                throw new IllegalArgumentException("Invalid fullTransition");
515            }
516            return fullTransition.substring(0, index);
517        }
518    
519        private static String getTransitionNode(String fullTransition) {
520            int index = fullTransition.indexOf(TRANSITION_SEPARATOR);
521            if (index == -1) {
522                throw new IllegalArgumentException("Invalid fullTransition");
523            }
524            return fullTransition.substring(index + 1);
525        }
526    
527        private NodeHandler newInstance(Class<? extends NodeHandler> handler) {
528            return (NodeHandler) ReflectionUtils.newInstance(handler, null);
529        }
530    
531        private void refreshLog() {
532            XLog.Info.get().setParameter(XLogService.USER, conf.get(OozieClient.USER_NAME));
533            XLog.Info.get().setParameter(XLogService.GROUP, conf.get(OozieClient.GROUP_NAME));
534            XLog.Info.get().setParameter(DagXLogInfoService.APP, def.getName());
535            XLog.Info.get().setParameter(DagXLogInfoService.TOKEN, conf.get(OozieClient.LOG_TOKEN, ""));
536            XLog.Info.get().setParameter(DagXLogInfoService.JOB, instanceId);
537            log = XLog.getLog(getClass());
538        }
539    
540        public Status getStatus() {
541            return status;
542        }
543    
544        public void setStatus(Status status) {
545            this.status = status;
546        }
547    
548        @Override
549        public void write(DataOutput dOut) throws IOException {
550            dOut.writeUTF(instanceId);
551    
552            //Hadoop Configuration has to get its act right
553            ByteArrayOutputStream baos = new ByteArrayOutputStream();
554            conf.writeXml(baos);
555            baos.close();
556            byte[] array = baos.toByteArray();
557            dOut.writeInt(array.length);
558            dOut.write(array);
559    
560            def.write(dOut);
561            dOut.writeUTF(status.toString());
562            dOut.writeInt(executionPaths.size());
563            for (Map.Entry<String, NodeInstance> entry : executionPaths.entrySet()) {
564                dOut.writeUTF(entry.getKey());
565                dOut.writeUTF(entry.getValue().nodeName);
566                dOut.writeBoolean(entry.getValue().started);
567            }
568            dOut.writeInt(persistentVars.size());
569            for (Map.Entry<String, String> entry : persistentVars.entrySet()) {
570                dOut.writeUTF(entry.getKey());
571                dOut.writeUTF(entry.getValue());
572            }
573        }
574    
575        @Override
576        public void readFields(DataInput dIn) throws IOException {
577            instanceId = dIn.readUTF();
578    
579            //Hadoop Configuration has to get its act right
580            int len = dIn.readInt();
581            byte[] array = new byte[len];
582            dIn.readFully(array);
583            ByteArrayInputStream bais = new ByteArrayInputStream(array);
584            conf = new XConfiguration(bais);
585    
586            def = new LiteWorkflowApp();
587            def.readFields(dIn);
588            status = Status.valueOf(dIn.readUTF());
589            int numExPaths = dIn.readInt();
590            for (int x = 0; x < numExPaths; x++) {
591                String path = dIn.readUTF();
592                String nodeName = dIn.readUTF();
593                boolean isStarted = dIn.readBoolean();
594                NodeInstance nodeInstance = new NodeInstance(nodeName);
595                nodeInstance.started = isStarted;
596                executionPaths.put(path, nodeInstance);
597            }
598            int numVars = dIn.readInt();
599            for (int x = 0; x < numVars; x++) {
600                String vName = dIn.readUTF();
601                String vVal = dIn.readUTF();
602                persistentVars.put(vName, vVal);
603            }
604            refreshLog();
605        }
606    
607        @Override
608        public Configuration getConf() {
609            return conf;
610        }
611    
612        @Override
613        public WorkflowApp getApp() {
614            return def;
615        }
616    
617        @Override
618        public String getId() {
619            return instanceId;
620        }
621    
622        @Override
623        public String getTransition(String node) {
624            return persistentVars.get(node + WorkflowInstance.NODE_VAR_SEPARATOR + TRANSITION_TO);
625        }
626    
627        @Override
628        public boolean equals(Object o) {
629            return (o != null) && (getClass().isInstance(o)) && ((WorkflowInstance) o).getId().equals(instanceId);
630        }
631    
632        @Override
633        public int hashCode() {
634            return instanceId.hashCode();
635        }
636    
637    }