001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.workflow.lite;
020
021import org.apache.hadoop.conf.Configuration;
022import org.apache.hadoop.io.Writable;
023import org.apache.hadoop.util.ReflectionUtils;
024import org.apache.oozie.ErrorCode;
025import org.apache.oozie.client.OozieClient;
026import org.apache.oozie.service.DagXLogInfoService;
027import org.apache.oozie.service.XLogService;
028import org.apache.oozie.util.ParamChecker;
029import org.apache.oozie.util.XConfiguration;
030import org.apache.oozie.util.XLog;
031import org.apache.oozie.workflow.WorkflowApp;
032import org.apache.oozie.workflow.WorkflowException;
033import org.apache.oozie.workflow.WorkflowInstance;
034
035import java.io.ByteArrayInputStream;
036import java.io.ByteArrayOutputStream;
037import java.io.DataInput;
038import java.io.DataOutput;
039import java.io.IOException;
040import java.util.ArrayList;
041import java.util.HashMap;
042import java.util.List;
043import java.util.Map;
044
045//TODO javadoc
046public class LiteWorkflowInstance implements Writable, WorkflowInstance {
047    private static final String TRANSITION_TO = "transition.to";
048
049    private XLog log = XLog.getLog(getClass());
050
051    private static String PATH_SEPARATOR = "/";
052    private static String ROOT = PATH_SEPARATOR;
053    private static String TRANSITION_SEPARATOR = "#";
054
055    // Using unique string to indicate version. This is to make sure that it
056    // doesn't match with user data.
057    private static final String DATA_VERSION = "V==1";
058
059    private static class NodeInstance {
060        String nodeName;
061        boolean started = false;
062
063        private NodeInstance(String nodeName) {
064            this.nodeName = nodeName;
065        }
066    }
067
068    private class Context implements NodeHandler.Context {
069        private NodeDef nodeDef;
070        private String executionPath;
071        private String exitState;
072        private Status status = Status.RUNNING;
073
074        private Context(NodeDef nodeDef, String executionPath, String exitState) {
075            this.nodeDef = nodeDef;
076            this.executionPath = executionPath;
077            this.exitState = exitState;
078        }
079
080        public NodeDef getNodeDef() {
081            return nodeDef;
082        }
083
084        public String getExecutionPath() {
085            return executionPath;
086        }
087
088        public String getParentExecutionPath(String executionPath) {
089            return LiteWorkflowInstance.getParentPath(executionPath);
090        }
091
092        public String getSignalValue() {
093            return exitState;
094        }
095
096        public String createExecutionPath(String name) {
097            return LiteWorkflowInstance.createChildPath(executionPath, name);
098        }
099
100        public String createFullTransition(String executionPath, String transition) {
101            return LiteWorkflowInstance.createFullTransition(executionPath, transition);
102        }
103
104        public void deleteExecutionPath() {
105            if (!executionPaths.containsKey(executionPath)) {
106                throw new IllegalStateException();
107            }
108            executionPaths.remove(executionPath);
109            executionPath = LiteWorkflowInstance.getParentPath(executionPath);
110        }
111
112        public void failJob() {
113            status = Status.FAILED;
114        }
115
116        public void killJob() {
117            status = Status.KILLED;
118        }
119
120        public void completeJob() {
121            status = Status.SUCCEEDED;
122        }
123
124        @Override
125        public Object getTransientVar(String name) {
126            return LiteWorkflowInstance.this.getTransientVar(name);
127        }
128
129        @Override
130        public String getVar(String name) {
131            return LiteWorkflowInstance.this.getVar(name);
132        }
133
134        @Override
135        public void setTransientVar(String name, Object value) {
136            LiteWorkflowInstance.this.setTransientVar(name, value);
137        }
138
139        @Override
140        public void setVar(String name, String value) {
141            LiteWorkflowInstance.this.setVar(name, value);
142        }
143
144        @Override
145        public LiteWorkflowInstance getProcessInstance() {
146            return LiteWorkflowInstance.this;
147        }
148
149    }
150
151    private LiteWorkflowApp def;
152    private Configuration conf;
153    private String instanceId;
154    private Status status;
155    private Map<String, NodeInstance> executionPaths = new HashMap<String, NodeInstance>();
156    private Map<String, String> persistentVars = new HashMap<String, String>();
157    private Map<String, Object> transientVars = new HashMap<String, Object>();
158
159    protected LiteWorkflowInstance() {
160        log = XLog.getLog(getClass());
161    }
162
163    public LiteWorkflowInstance(LiteWorkflowApp def, Configuration conf, String instanceId) {
164        this();
165        this.def = ParamChecker.notNull(def, "def");
166        this.instanceId = ParamChecker.notNull(instanceId, "instanceId");
167        this.conf = ParamChecker.notNull(conf, "conf");
168        refreshLog();
169        status = Status.PREP;
170    }
171
172    public synchronized boolean start() throws WorkflowException {
173        if (status != Status.PREP) {
174            throw new WorkflowException(ErrorCode.E0719);
175        }
176        log.debug(XLog.STD, "Starting job");
177        status = Status.RUNNING;
178        executionPaths.put(ROOT, new NodeInstance(StartNodeDef.START));
179        return signal(ROOT, StartNodeDef.START);
180    }
181
182    //todo if suspended store signal and use when resuming
183
184    public synchronized boolean signal(String executionPath, String signalValue) throws WorkflowException {
185        ParamChecker.notEmpty(executionPath, "executionPath");
186        ParamChecker.notNull(signalValue, "signalValue");
187
188        if (status != Status.RUNNING) {
189            throw new WorkflowException(ErrorCode.E0716);
190        }
191
192        NodeInstance nodeJob = executionPaths.get(executionPath);
193        log.debug(XLog.STD, "Signaling job execution path [{0}] signal value [{1}] for node [{2}]", executionPath,
194                signalValue, (nodeJob == null ? null : nodeJob.nodeName));
195        if (nodeJob == null) {
196            status = Status.FAILED;
197            log.error("invalid execution path [{0}]", executionPath);
198        }
199
200        NodeDef nodeDef = null;
201        if (!status.isEndState()) {
202            nodeDef = def.getNode(nodeJob.nodeName);
203            if (nodeDef == null) {
204                status = Status.FAILED;
205                log.error("invalid transition [{0}]", nodeJob.nodeName);
206            }
207        }
208
209        if (!status.isEndState()) {
210            NodeHandler nodeHandler = newInstance(nodeDef.getHandlerClass());
211            boolean exiting = true;
212
213            Context context = new Context(nodeDef, executionPath, signalValue);
214            if (!nodeJob.started) {
215                try {
216                    nodeHandler.loopDetection(context);
217                    exiting = nodeHandler.enter(context);
218                    nodeJob.started = true;
219                }
220                catch (WorkflowException ex) {
221                    status = Status.FAILED;
222                    List<String> killedNodes = terminateNodes(Status.KILLED);
223                    if (killedNodes.size() > 1) {
224                        log.warn(XLog.STD, "Workflow completed [{0}], killing [{1}] running nodes", status, killedNodes
225                                .size());
226                    }
227                    throw ex;
228                }
229            }
230
231            if (exiting) {
232                List<String> pathsToStart = new ArrayList<String>();
233                List<String> fullTransitions;
234                try {
235                    fullTransitions = nodeHandler.multiExit(context);
236                    int last = fullTransitions.size() - 1;
237                    // TEST THIS
238                    if (last >= 0) {
239                        String transitionTo = getTransitionNode(fullTransitions.get(last));
240                        if (nodeDef instanceof ForkNodeDef) {
241                            transitionTo = "*"; // WF action cannot hold all transitions for a fork.
242                                                // transitions are hardcoded in the WF app.
243                        }
244                        persistentVars.put(nodeDef.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + TRANSITION_TO,
245                                           transitionTo);
246                    }
247                }
248                catch (WorkflowException ex) {
249                    status = Status.FAILED;
250                    throw ex;
251                }
252
253                if (context.status == Status.KILLED) {
254                    status = Status.KILLED;
255                    log.debug(XLog.STD, "Completing job, kill node [{0}]", nodeJob.nodeName);
256                } else if (context.status == Status.FAILED) {
257                    status = Status.FAILED;
258                    log.debug(XLog.STD, "Completing job, fail node [{0}]", nodeJob.nodeName);
259                } else if (context.status == Status.SUCCEEDED) {
260                    status = Status.SUCCEEDED;
261                    log.debug(XLog.STD, "Completing job, end node [{0}]", nodeJob.nodeName);
262                } else {
263                    for (String fullTransition : fullTransitions) {
264                        //this is the whole trick for forking, we need the executionpath and the transition.
265                        //in case of no forking, last element of executionpath is different from transition.
266                        //in case of forking, they are the same
267
268                        log.debug(XLog.STD, "Exiting node [{0}] with transition[{1}]", nodeJob.nodeName,
269                                fullTransition);
270
271                        String execPathFromTransition = getExecutionPath(fullTransition);
272                        String transition = getTransitionNode(fullTransition);
273                        def.validateTransition(nodeJob.nodeName, transition);
274
275                        NodeInstance nodeJobInPath = executionPaths.get(execPathFromTransition);
276                        if ((nodeJobInPath == null) || (!transition.equals(nodeJobInPath.nodeName))) {
277                            // TODO explain this IF better
278                            // If the WfJob is signaled with the parent
279                            // execution executionPath again
280                            // The Fork node will execute again.. and replace
281                            // the Node WorkflowJobBean
282                            // so this is required to prevent that..
283                            // Question : Should we throw an error in this case
284                            // ??
285                            executionPaths.put(execPathFromTransition, new NodeInstance(transition));
286                            pathsToStart.add(execPathFromTransition);
287                        }
288
289                    }
290
291                    // signal all new synch transitions
292                    for (String pathToStart : pathsToStart) {
293                        signal(pathToStart, "::synch::");
294                    }
295                }
296            }
297        }
298
299        if (status.isEndState()) {
300            if (status == Status.FAILED) {
301                List<String> failedNodes = terminateNodes(status);
302                log.warn(XLog.STD, "Workflow completed [{0}], failing [{1}] running nodes", status, failedNodes
303                        .size());
304            }
305            else {
306                List<String> killedNodes = terminateNodes(Status.KILLED);
307
308                if (killedNodes.size() > 1) {
309                    log.warn(XLog.STD, "Workflow completed [{0}], killing [{1}] running nodes", status, killedNodes
310                            .size());
311                }
312            }
313        }
314
315        return status.isEndState();
316    }
317
318    /**
319     * Get NodeDef from workflow instance
320     * @param executionPath execution path
321     * @return node def
322     */
323    public NodeDef getNodeDef(String executionPath) {
324        NodeInstance nodeJob = executionPaths.get(executionPath);
325        NodeDef nodeDef = null;
326        if (nodeJob == null) {
327            log.error("invalid execution path [{0}]", executionPath);
328        }
329        else {
330            nodeDef = def.getNode(nodeJob.nodeName);
331            if (nodeDef == null) {
332                log.error("invalid transition [{0}]", nodeJob.nodeName);
333            }
334        }
335        return nodeDef;
336    }
337
338    public synchronized void fail(String nodeName) throws WorkflowException {
339        if (status.isEndState()) {
340            throw new WorkflowException(ErrorCode.E0718);
341        }
342        String failedNode = failNode(nodeName);
343        if (failedNode != null) {
344            log.warn(XLog.STD, "Workflow Failed. Failing node [{0}]", failedNode);
345        }
346        else {
347            //TODO failed attempting to fail the action. EXCEPTION
348        }
349        List<String> killedNodes = killNodes();
350        if (killedNodes.size() > 1) {
351            log.warn(XLog.STD, "Workflow Failed, killing [{0}] nodes", killedNodes.size());
352        }
353        status = Status.FAILED;
354    }
355
356    public synchronized void kill() throws WorkflowException {
357        if (status.isEndState()) {
358            throw new WorkflowException(ErrorCode.E0718);
359        }
360        log.debug(XLog.STD, "Killing job");
361        List<String> killedNodes = killNodes();
362        if (killedNodes.size() > 1) {
363            log.warn(XLog.STD, "workflow killed, killing [{0}] nodes", killedNodes.size());
364        }
365        status = Status.KILLED;
366    }
367
368    public synchronized void suspend() throws WorkflowException {
369        if (status != Status.RUNNING) {
370            throw new WorkflowException(ErrorCode.E0716);
371        }
372        log.debug(XLog.STD, "Suspending job");
373        this.status = Status.SUSPENDED;
374    }
375
376    public boolean isSuspended() {
377        return (status == Status.SUSPENDED);
378    }
379
380    public synchronized void resume() throws WorkflowException {
381        if (status != Status.SUSPENDED) {
382            throw new WorkflowException(ErrorCode.E0717);
383        }
384        log.debug(XLog.STD, "Resuming job");
385        status = Status.RUNNING;
386    }
387
388    public void setVar(String name, String value) {
389        if (value != null) {
390            persistentVars.put(name, value);
391        }
392        else {
393            persistentVars.remove(name);
394        }
395    }
396
397    @Override
398    public Map<String, String> getAllVars() {
399        return persistentVars;
400    }
401
402    @Override
403    public void setAllVars(Map<String, String> varMap) {
404        persistentVars.putAll(varMap);
405    }
406
407    public String getVar(String name) {
408        return persistentVars.get(name);
409    }
410
411
412    public void setTransientVar(String name, Object value) {
413        if (value != null) {
414            transientVars.put(name, value);
415        }
416        else {
417            transientVars.remove(name);
418        }
419    }
420
421    public boolean hasTransientVar(String name) {
422        return transientVars.containsKey(name);
423    }
424
425    public Object getTransientVar(String name) {
426        return transientVars.get(name);
427    }
428
429    public boolean hasEnded() {
430        return status.isEndState();
431    }
432
433    private List<String> terminateNodes(Status endStatus) {
434        List<String> endNodes = new ArrayList<String>();
435        for (Map.Entry<String, NodeInstance> entry : executionPaths.entrySet()) {
436            if (entry.getValue().started) {
437                NodeDef nodeDef = def.getNode(entry.getValue().nodeName);
438                if (!(nodeDef instanceof ControlNodeDef)) {
439                    NodeHandler nodeHandler = newInstance(nodeDef.getHandlerClass());
440                    try {
441                        if (endStatus == Status.KILLED) {
442                            nodeHandler.kill(new Context(nodeDef, entry.getKey(), null));
443                        }
444                        else {
445                            if (endStatus == Status.FAILED) {
446                                nodeHandler.fail(new Context(nodeDef, entry.getKey(), null));
447                            }
448                        }
449                        endNodes.add(nodeDef.getName());
450                    }
451                    catch (Exception ex) {
452                        log.warn(XLog.STD, "Error Changing node state to [{0}] for Node [{1}]", endStatus.toString(),
453                                 nodeDef.getName(), ex);
454                    }
455                }
456            }
457        }
458        return endNodes;
459    }
460
461    private String failNode(String nodeName) {
462        String failedNode = null;
463        for (Map.Entry<String, NodeInstance> entry : executionPaths.entrySet()) {
464            String node = entry.getKey();
465            NodeInstance nodeInstance = entry.getValue();
466            if (nodeInstance.started && nodeInstance.nodeName.equals(nodeName)) {
467                NodeDef nodeDef = def.getNode(nodeInstance.nodeName);
468                NodeHandler nodeHandler = newInstance(nodeDef.getHandlerClass());
469                try {
470                    nodeHandler.fail(new Context(nodeDef, node, null));
471                    failedNode = nodeDef.getName();
472                    nodeInstance.started = false;
473                }
474                catch (Exception ex) {
475                    log.warn(XLog.STD, "Error failing node [{0}]", nodeDef.getName(), ex);
476                }
477                return failedNode;
478            }
479        }
480        return failedNode;
481    }
482
483    private List<String> killNodes() {
484        List<String> killedNodes = new ArrayList<String>();
485        for (Map.Entry<String, NodeInstance> entry : executionPaths.entrySet()) {
486            String node = entry.getKey();
487            NodeInstance nodeInstance = entry.getValue();
488            if (nodeInstance.started) {
489                NodeDef nodeDef = def.getNode(nodeInstance.nodeName);
490                NodeHandler nodeHandler = newInstance(nodeDef.getHandlerClass());
491                try {
492                    nodeHandler.kill(new Context(nodeDef, node, null));
493                    killedNodes.add(nodeDef.getName());
494                }
495                catch (Exception ex) {
496                    log.warn(XLog.STD, "Error killing node [{0}]", nodeDef.getName(), ex);
497                }
498            }
499        }
500        return killedNodes;
501    }
502
503    public LiteWorkflowApp getProcessDefinition() {
504        return def;
505    }
506
507    private static String createChildPath(String path, String child) {
508        return path + child + PATH_SEPARATOR;
509    }
510
511    private static String getParentPath(String path) {
512        path = path.substring(0, path.length() - 1);
513        return (path.length() == 0) ? null : path.substring(0, path.lastIndexOf(PATH_SEPARATOR) + 1);
514    }
515
516    private static String createFullTransition(String executionPath, String transition) {
517        return executionPath + TRANSITION_SEPARATOR + transition;
518    }
519
520    private static String getExecutionPath(String fullTransition) {
521        int index = fullTransition.indexOf(TRANSITION_SEPARATOR);
522        if (index == -1) {
523            throw new IllegalArgumentException("Invalid fullTransition");
524        }
525        return fullTransition.substring(0, index);
526    }
527
528    private static String getTransitionNode(String fullTransition) {
529        int index = fullTransition.indexOf(TRANSITION_SEPARATOR);
530        if (index == -1) {
531            throw new IllegalArgumentException("Invalid fullTransition");
532        }
533        return fullTransition.substring(index + 1);
534    }
535
536    private NodeHandler newInstance(Class<? extends NodeHandler> handler) {
537        return (NodeHandler) ReflectionUtils.newInstance(handler, null);
538    }
539
540    private void refreshLog() {
541        XLog.Info.get().setParameter(XLogService.USER, conf.get(OozieClient.USER_NAME));
542        XLog.Info.get().setParameter(XLogService.GROUP, conf.get(OozieClient.GROUP_NAME));
543        XLog.Info.get().setParameter(DagXLogInfoService.APP, def.getName());
544        XLog.Info.get().setParameter(DagXLogInfoService.TOKEN, conf.get(OozieClient.LOG_TOKEN, ""));
545        XLog.Info.get().setParameter(DagXLogInfoService.JOB, instanceId);
546        log = XLog.getLog(getClass());
547    }
548
549    public Status getStatus() {
550        return status;
551    }
552
553    public void setStatus(Status status) {
554        this.status = status;
555    }
556
557    @Override
558    public void write(DataOutput dOut) throws IOException {
559        dOut.writeUTF(instanceId);
560
561        //Hadoop Configuration has to get its act right
562        ByteArrayOutputStream baos = new ByteArrayOutputStream();
563        conf.writeXml(baos);
564        baos.close();
565        byte[] array = baos.toByteArray();
566        dOut.writeInt(array.length);
567        dOut.write(array);
568
569        def.write(dOut);
570        dOut.writeUTF(status.toString());
571        dOut.writeInt(executionPaths.size());
572        for (Map.Entry<String, NodeInstance> entry : executionPaths.entrySet()) {
573            dOut.writeUTF(entry.getKey());
574            dOut.writeUTF(entry.getValue().nodeName);
575            dOut.writeBoolean(entry.getValue().started);
576        }
577        dOut.writeInt(persistentVars.size());
578        for (Map.Entry<String, String> entry : persistentVars.entrySet()) {
579            dOut.writeUTF(entry.getKey());
580            writeStringAsBytes(entry.getValue(), dOut);
581        }
582    }
583
584    @Override
585    public void readFields(DataInput dIn) throws IOException {
586        instanceId = dIn.readUTF();
587
588        //Hadoop Configuration has to get its act right
589        int len = dIn.readInt();
590        byte[] array = new byte[len];
591        dIn.readFully(array);
592        ByteArrayInputStream bais = new ByteArrayInputStream(array);
593        conf = new XConfiguration(bais);
594
595        def = new LiteWorkflowApp();
596        def.readFields(dIn);
597        status = Status.valueOf(dIn.readUTF());
598        int numExPaths = dIn.readInt();
599        for (int x = 0; x < numExPaths; x++) {
600            String path = dIn.readUTF();
601            String nodeName = dIn.readUTF();
602            boolean isStarted = dIn.readBoolean();
603            NodeInstance nodeInstance = new NodeInstance(nodeName);
604            nodeInstance.started = isStarted;
605            executionPaths.put(path, nodeInstance);
606        }
607        int numVars = dIn.readInt();
608        for (int x = 0; x < numVars; x++) {
609            String vName = dIn.readUTF();
610            String vVal = readBytesAsString(dIn);
611            persistentVars.put(vName, vVal);
612        }
613        refreshLog();
614    }
615
616    private void writeStringAsBytes(String value, DataOutput dOut) throws IOException {
617        if (value == null) {
618            dOut.writeUTF(null);
619            return;
620        }
621        dOut.writeUTF(DATA_VERSION);
622        byte[] data = value.getBytes("UTF-8");
623        dOut.writeInt(data.length);
624        dOut.write(data);
625    }
626
627    private String readBytesAsString(DataInput dIn) throws IOException {
628        String value = dIn.readUTF();
629        if (value != null && value.equals(DATA_VERSION)) {
630            int length = dIn.readInt();
631            byte[] data = new byte[length];
632            dIn.readFully(data);
633            value = new String(data, "UTF-8");
634        }
635        return value;
636    }
637
638    @Override
639    public Configuration getConf() {
640        return conf;
641    }
642
643    @Override
644    public WorkflowApp getApp() {
645        return def;
646    }
647
648    @Override
649    public String getId() {
650        return instanceId;
651    }
652
653    @Override
654    public String getTransition(String node) {
655        return persistentVars.get(node + WorkflowInstance.NODE_VAR_SEPARATOR + TRANSITION_TO);
656    }
657
658    @Override
659    public boolean equals(Object o) {
660        return (o != null) && (getClass().isInstance(o)) && ((WorkflowInstance) o).getId().equals(instanceId);
661    }
662
663    @Override
664    public int hashCode() {
665        return instanceId.hashCode();
666    }
667}