001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.workflow.lite;
020
021import org.apache.hadoop.conf.Configuration;
022import org.apache.hadoop.io.Writable;
023import org.apache.hadoop.util.ReflectionUtils;
024import org.apache.oozie.ErrorCode;
025import org.apache.oozie.client.OozieClient;
026import org.apache.oozie.service.DagXLogInfoService;
027import org.apache.oozie.service.XLogService;
028import org.apache.oozie.util.StringSerializationUtil;
029import org.apache.oozie.util.ParamChecker;
030import org.apache.oozie.util.XConfiguration;
031import org.apache.oozie.util.XLog;
032import org.apache.oozie.workflow.WorkflowApp;
033import org.apache.oozie.workflow.WorkflowException;
034import org.apache.oozie.workflow.WorkflowInstance;
035
036import java.io.ByteArrayInputStream;
037import java.io.ByteArrayOutputStream;
038import java.io.DataInput;
039import java.io.DataOutput;
040import java.io.IOException;
041import java.util.ArrayList;
042import java.util.HashMap;
043import java.util.List;
044import java.util.Map;
045
046//TODO javadoc
047public class LiteWorkflowInstance implements Writable, WorkflowInstance {
048    private static final String TRANSITION_TO = "transition.to";
049
050    private XLog log = XLog.getLog(getClass());
051
052    private static String PATH_SEPARATOR = "/";
053    private static String ROOT = PATH_SEPARATOR;
054    private static String TRANSITION_SEPARATOR = "#";
055
056    private static class NodeInstance {
057        String nodeName;
058        boolean started = false;
059
060        private NodeInstance(String nodeName) {
061            this.nodeName = nodeName;
062        }
063    }
064
065    private class Context implements NodeHandler.Context {
066        private NodeDef nodeDef;
067        private String executionPath;
068        private String exitState;
069        private Status status = Status.RUNNING;
070
071        private Context(NodeDef nodeDef, String executionPath, String exitState) {
072            this.nodeDef = nodeDef;
073            this.executionPath = executionPath;
074            this.exitState = exitState;
075        }
076
077        public NodeDef getNodeDef() {
078            return nodeDef;
079        }
080
081        public String getExecutionPath() {
082            return executionPath;
083        }
084
085        public String getParentExecutionPath(String executionPath) {
086            return LiteWorkflowInstance.getParentPath(executionPath);
087        }
088
089        public String getSignalValue() {
090            return exitState;
091        }
092
093        public String createExecutionPath(String name) {
094            return LiteWorkflowInstance.createChildPath(executionPath, name);
095        }
096
097        public String createFullTransition(String executionPath, String transition) {
098            return LiteWorkflowInstance.createFullTransition(executionPath, transition);
099        }
100
101        public void deleteExecutionPath() {
102            if (!executionPaths.containsKey(executionPath)) {
103                throw new IllegalStateException();
104            }
105            executionPaths.remove(executionPath);
106            executionPath = LiteWorkflowInstance.getParentPath(executionPath);
107        }
108
109        public void failJob() {
110            status = Status.FAILED;
111        }
112
113        public void killJob() {
114            status = Status.KILLED;
115        }
116
117        public void completeJob() {
118            status = Status.SUCCEEDED;
119        }
120
121        @Override
122        public Object getTransientVar(String name) {
123            return LiteWorkflowInstance.this.getTransientVar(name);
124        }
125
126        @Override
127        public String getVar(String name) {
128            return LiteWorkflowInstance.this.getVar(name);
129        }
130
131        @Override
132        public void setTransientVar(String name, Object value) {
133            LiteWorkflowInstance.this.setTransientVar(name, value);
134        }
135
136        @Override
137        public void setVar(String name, String value) {
138            LiteWorkflowInstance.this.setVar(name, value);
139        }
140
141        @Override
142        public LiteWorkflowInstance getProcessInstance() {
143            return LiteWorkflowInstance.this;
144        }
145
146    }
147
148    private LiteWorkflowApp def;
149    private Configuration conf;
150    private String instanceId;
151    private Status status;
152    private Map<String, NodeInstance> executionPaths = new HashMap<String, NodeInstance>();
153    private Map<String, String> persistentVars = new HashMap<String, String>();
154    private Map<String, Object> transientVars = new HashMap<String, Object>();
155
156    protected LiteWorkflowInstance() {
157        log = XLog.getLog(getClass());
158    }
159
160    public LiteWorkflowInstance(LiteWorkflowApp def, Configuration conf, String instanceId) {
161        this();
162        this.def = ParamChecker.notNull(def, "def");
163        this.instanceId = ParamChecker.notNull(instanceId, "instanceId");
164        this.conf = ParamChecker.notNull(conf, "conf");
165        refreshLog();
166        status = Status.PREP;
167    }
168
169    public synchronized boolean start() throws WorkflowException {
170        if (status != Status.PREP) {
171            throw new WorkflowException(ErrorCode.E0719);
172        }
173        log.debug(XLog.STD, "Starting job");
174        status = Status.RUNNING;
175        executionPaths.put(ROOT, new NodeInstance(StartNodeDef.START));
176        return signal(ROOT, StartNodeDef.START);
177    }
178
179    //todo if suspended store signal and use when resuming
180
181    public synchronized boolean signal(String executionPath, String signalValue) throws WorkflowException {
182        ParamChecker.notEmpty(executionPath, "executionPath");
183        ParamChecker.notNull(signalValue, "signalValue");
184
185        if (status != Status.RUNNING) {
186            throw new WorkflowException(ErrorCode.E0716);
187        }
188
189        NodeInstance nodeJob = executionPaths.get(executionPath);
190        log.debug(XLog.STD, "Signaling job execution path [{0}] signal value [{1}] for node [{2}]", executionPath,
191                signalValue, (nodeJob == null ? null : nodeJob.nodeName));
192        if (nodeJob == null) {
193            status = Status.FAILED;
194            log.error("invalid execution path [{0}]", executionPath);
195        }
196
197        NodeDef nodeDef = null;
198        if (!status.isEndState()) {
199            nodeDef = def.getNode(nodeJob.nodeName);
200            if (nodeDef == null) {
201                status = Status.FAILED;
202                log.error("invalid transition [{0}]", nodeJob.nodeName);
203            }
204        }
205
206        if (!status.isEndState()) {
207            NodeHandler nodeHandler = newInstance(nodeDef.getHandlerClass());
208            boolean exiting = true;
209
210            Context context = new Context(nodeDef, executionPath, signalValue);
211            if (!nodeJob.started) {
212                try {
213                    nodeHandler.loopDetection(context);
214                    exiting = nodeHandler.enter(context);
215                    nodeJob.started = true;
216                }
217                catch (WorkflowException ex) {
218                    status = Status.FAILED;
219                    List<String> killedNodes = terminateNodes(Status.KILLED);
220                    if (killedNodes.size() > 1) {
221                        log.warn(XLog.STD, "Workflow completed [{0}], killing [{1}] running nodes", status, killedNodes
222                                .size());
223                    }
224                    throw ex;
225                }
226            }
227
228            if (exiting) {
229                List<String> pathsToStart = new ArrayList<String>();
230                List<String> fullTransitions;
231                try {
232                    fullTransitions = nodeHandler.multiExit(context);
233                    int last = fullTransitions.size() - 1;
234                    // TEST THIS
235                    if (last >= 0) {
236                        String transitionTo = getTransitionNode(fullTransitions.get(last));
237                        if (nodeDef instanceof ForkNodeDef) {
238                            transitionTo = "*"; // WF action cannot hold all transitions for a fork.
239                                                // transitions are hardcoded in the WF app.
240                        }
241                        persistentVars.put(nodeDef.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + TRANSITION_TO,
242                                           transitionTo);
243                    }
244                }
245                catch (WorkflowException ex) {
246                    status = Status.FAILED;
247                    throw ex;
248                }
249
250                if (context.status == Status.KILLED) {
251                    status = Status.KILLED;
252                    log.debug(XLog.STD, "Completing job, kill node [{0}]", nodeJob.nodeName);
253                } else if (context.status == Status.FAILED) {
254                    status = Status.FAILED;
255                    log.debug(XLog.STD, "Completing job, fail node [{0}]", nodeJob.nodeName);
256                } else if (context.status == Status.SUCCEEDED) {
257                    status = Status.SUCCEEDED;
258                    log.debug(XLog.STD, "Completing job, end node [{0}]", nodeJob.nodeName);
259                } else {
260                    for (String fullTransition : fullTransitions) {
261                        //this is the whole trick for forking, we need the executionpath and the transition.
262                        //in case of no forking, last element of executionpath is different from transition.
263                        //in case of forking, they are the same
264
265                        log.debug(XLog.STD, "Exiting node [{0}] with transition[{1}]", nodeJob.nodeName,
266                                fullTransition);
267
268                        String execPathFromTransition = getExecutionPath(fullTransition);
269                        String transition = getTransitionNode(fullTransition);
270                        def.validateTransition(nodeJob.nodeName, transition);
271
272                        NodeInstance nodeJobInPath = executionPaths.get(execPathFromTransition);
273                        if ((nodeJobInPath == null) || (!transition.equals(nodeJobInPath.nodeName))) {
274                            // TODO explain this IF better
275                            // If the WfJob is signaled with the parent
276                            // execution executionPath again
277                            // The Fork node will execute again.. and replace
278                            // the Node WorkflowJobBean
279                            // so this is required to prevent that..
280                            // Question : Should we throw an error in this case
281                            // ??
282                            executionPaths.put(execPathFromTransition, new NodeInstance(transition));
283                            pathsToStart.add(execPathFromTransition);
284                        }
285
286                    }
287
288                    // signal all new synch transitions
289                    for (String pathToStart : pathsToStart) {
290                        signal(pathToStart, "::synch::");
291                    }
292                }
293            }
294        }
295
296        if (status.isEndState()) {
297            if (status == Status.FAILED) {
298                List<String> failedNodes = terminateNodes(status);
299                log.warn(XLog.STD, "Workflow completed [{0}], failing [{1}] running nodes", status, failedNodes
300                        .size());
301            }
302            else {
303                List<String> killedNodes = terminateNodes(Status.KILLED);
304
305                if (killedNodes.size() > 1) {
306                    log.warn(XLog.STD, "Workflow completed [{0}], killing [{1}] running nodes", status, killedNodes
307                            .size());
308                }
309            }
310        }
311
312        return status.isEndState();
313    }
314
315    /**
316     * Get NodeDef from workflow instance
317     * @param executionPath execution path
318     * @return node def
319     */
320    public NodeDef getNodeDef(String executionPath) {
321        NodeInstance nodeJob = executionPaths.get(executionPath);
322        NodeDef nodeDef = null;
323        if (nodeJob == null) {
324            log.error("invalid execution path [{0}]", executionPath);
325        }
326        else {
327            nodeDef = def.getNode(nodeJob.nodeName);
328            if (nodeDef == null) {
329                log.error("invalid transition [{0}]", nodeJob.nodeName);
330            }
331        }
332        return nodeDef;
333    }
334
335    public synchronized void fail(String nodeName) throws WorkflowException {
336        if (status.isEndState()) {
337            throw new WorkflowException(ErrorCode.E0718);
338        }
339        String failedNode = failNode(nodeName);
340        if (failedNode != null) {
341            log.warn(XLog.STD, "Workflow Failed. Failing node [{0}]", failedNode);
342        }
343        else {
344            //TODO failed attempting to fail the action. EXCEPTION
345        }
346        List<String> killedNodes = killNodes();
347        if (killedNodes.size() > 1) {
348            log.warn(XLog.STD, "Workflow Failed, killing [{0}] nodes", killedNodes.size());
349        }
350        status = Status.FAILED;
351    }
352
353    public synchronized void kill() throws WorkflowException {
354        if (status.isEndState()) {
355            throw new WorkflowException(ErrorCode.E0718);
356        }
357        log.debug(XLog.STD, "Killing job");
358        List<String> killedNodes = killNodes();
359        if (killedNodes.size() > 1) {
360            log.warn(XLog.STD, "workflow killed, killing [{0}] nodes", killedNodes.size());
361        }
362        status = Status.KILLED;
363    }
364
365    public synchronized void suspend() throws WorkflowException {
366        if (status != Status.RUNNING) {
367            throw new WorkflowException(ErrorCode.E0716);
368        }
369        log.debug(XLog.STD, "Suspending job");
370        this.status = Status.SUSPENDED;
371    }
372
373    public boolean isSuspended() {
374        return (status == Status.SUSPENDED);
375    }
376
377    public synchronized void resume() throws WorkflowException {
378        if (status != Status.SUSPENDED) {
379            throw new WorkflowException(ErrorCode.E0717);
380        }
381        log.debug(XLog.STD, "Resuming job");
382        status = Status.RUNNING;
383    }
384
385    public void setVar(String name, String value) {
386        if (value != null) {
387            persistentVars.put(name, value);
388        }
389        else {
390            persistentVars.remove(name);
391        }
392    }
393
394    @Override
395    public Map<String, String> getAllVars() {
396        return persistentVars;
397    }
398
399    @Override
400    public void setAllVars(Map<String, String> varMap) {
401        persistentVars.putAll(varMap);
402    }
403
404    public String getVar(String name) {
405        return persistentVars.get(name);
406    }
407
408
409    public void setTransientVar(String name, Object value) {
410        if (value != null) {
411            transientVars.put(name, value);
412        }
413        else {
414            transientVars.remove(name);
415        }
416    }
417
418    public boolean hasTransientVar(String name) {
419        return transientVars.containsKey(name);
420    }
421
422    public Object getTransientVar(String name) {
423        return transientVars.get(name);
424    }
425
426    public boolean hasEnded() {
427        return status.isEndState();
428    }
429
430    private List<String> terminateNodes(Status endStatus) {
431        List<String> endNodes = new ArrayList<String>();
432        for (Map.Entry<String, NodeInstance> entry : executionPaths.entrySet()) {
433            if (entry.getValue().started) {
434                NodeDef nodeDef = def.getNode(entry.getValue().nodeName);
435                if (!(nodeDef instanceof ControlNodeDef)) {
436                    NodeHandler nodeHandler = newInstance(nodeDef.getHandlerClass());
437                    try {
438                        if (endStatus == Status.KILLED) {
439                            nodeHandler.kill(new Context(nodeDef, entry.getKey(), null));
440                        }
441                        else {
442                            if (endStatus == Status.FAILED) {
443                                nodeHandler.fail(new Context(nodeDef, entry.getKey(), null));
444                            }
445                        }
446                        endNodes.add(nodeDef.getName());
447                    }
448                    catch (Exception ex) {
449                        log.warn(XLog.STD, "Error Changing node state to [{0}] for Node [{1}]", endStatus.toString(),
450                                 nodeDef.getName(), ex);
451                    }
452                }
453            }
454        }
455        return endNodes;
456    }
457
458    private String failNode(String nodeName) {
459        String failedNode = null;
460        for (Map.Entry<String, NodeInstance> entry : executionPaths.entrySet()) {
461            String node = entry.getKey();
462            NodeInstance nodeInstance = entry.getValue();
463            if (nodeInstance.started && nodeInstance.nodeName.equals(nodeName)) {
464                NodeDef nodeDef = def.getNode(nodeInstance.nodeName);
465                NodeHandler nodeHandler = newInstance(nodeDef.getHandlerClass());
466                try {
467                    nodeHandler.fail(new Context(nodeDef, node, null));
468                    failedNode = nodeDef.getName();
469                    nodeInstance.started = false;
470                }
471                catch (Exception ex) {
472                    log.warn(XLog.STD, "Error failing node [{0}]", nodeDef.getName(), ex);
473                }
474                return failedNode;
475            }
476        }
477        return failedNode;
478    }
479
480    private List<String> killNodes() {
481        List<String> killedNodes = new ArrayList<String>();
482        for (Map.Entry<String, NodeInstance> entry : executionPaths.entrySet()) {
483            String node = entry.getKey();
484            NodeInstance nodeInstance = entry.getValue();
485            if (nodeInstance.started) {
486                NodeDef nodeDef = def.getNode(nodeInstance.nodeName);
487                NodeHandler nodeHandler = newInstance(nodeDef.getHandlerClass());
488                try {
489                    nodeHandler.kill(new Context(nodeDef, node, null));
490                    killedNodes.add(nodeDef.getName());
491                }
492                catch (Exception ex) {
493                    log.warn(XLog.STD, "Error killing node [{0}]", nodeDef.getName(), ex);
494                }
495            }
496        }
497        return killedNodes;
498    }
499
500    public LiteWorkflowApp getProcessDefinition() {
501        return def;
502    }
503
504    private static String createChildPath(String path, String child) {
505        return path + child + PATH_SEPARATOR;
506    }
507
508    private static String getParentPath(String path) {
509        path = path.substring(0, path.length() - 1);
510        return (path.length() == 0) ? null : path.substring(0, path.lastIndexOf(PATH_SEPARATOR) + 1);
511    }
512
513    private static String createFullTransition(String executionPath, String transition) {
514        return executionPath + TRANSITION_SEPARATOR + transition;
515    }
516
517    private static String getExecutionPath(String fullTransition) {
518        int index = fullTransition.indexOf(TRANSITION_SEPARATOR);
519        if (index == -1) {
520            throw new IllegalArgumentException("Invalid fullTransition");
521        }
522        return fullTransition.substring(0, index);
523    }
524
525    private static String getTransitionNode(String fullTransition) {
526        int index = fullTransition.indexOf(TRANSITION_SEPARATOR);
527        if (index == -1) {
528            throw new IllegalArgumentException("Invalid fullTransition");
529        }
530        return fullTransition.substring(index + 1);
531    }
532
533    private NodeHandler newInstance(Class<? extends NodeHandler> handler) {
534        return (NodeHandler) ReflectionUtils.newInstance(handler, null);
535    }
536
537    private void refreshLog() {
538        XLog.Info.get().setParameter(XLogService.USER, conf.get(OozieClient.USER_NAME));
539        XLog.Info.get().setParameter(XLogService.GROUP, conf.get(OozieClient.GROUP_NAME));
540        XLog.Info.get().setParameter(DagXLogInfoService.APP, def.getName());
541        XLog.Info.get().setParameter(DagXLogInfoService.TOKEN, conf.get(OozieClient.LOG_TOKEN, ""));
542        XLog.Info.get().setParameter(DagXLogInfoService.JOB, instanceId);
543        log = XLog.getLog(getClass());
544    }
545
546    public Status getStatus() {
547        return status;
548    }
549
550    public void setStatus(Status status) {
551        this.status = status;
552    }
553
554    @Override
555    public void write(DataOutput dOut) throws IOException {
556
557        dOut.writeUTF(instanceId);
558
559        //Hadoop Configuration has to get its act right
560        ByteArrayOutputStream baos = new ByteArrayOutputStream();
561        conf.writeXml(baos);
562        baos.close();
563        byte[] array = baos.toByteArray();
564        dOut.writeInt(array.length);
565        dOut.write(array);
566
567        def.write(dOut);
568        dOut.writeUTF(status.toString());
569        dOut.writeInt(executionPaths.size());
570        for (Map.Entry<String, NodeInstance> entry : executionPaths.entrySet()) {
571            dOut.writeUTF(entry.getKey());
572            dOut.writeUTF(entry.getValue().nodeName);
573            dOut.writeBoolean(entry.getValue().started);
574        }
575        dOut.writeInt(persistentVars.size());
576        for (Map.Entry<String, String> entry : persistentVars.entrySet()) {
577            dOut.writeUTF(entry.getKey());
578            StringSerializationUtil.writeString(dOut, entry.getValue());
579        }
580    }
581
582    @Override
583    public void readFields(DataInput dIn) throws IOException {
584        instanceId = dIn.readUTF();
585
586        //Hadoop Configuration has to get its act right
587        int len = dIn.readInt();
588        byte[] array = new byte[len];
589        dIn.readFully(array);
590        ByteArrayInputStream bais = new ByteArrayInputStream(array);
591        conf = new XConfiguration(bais);
592
593        def = new LiteWorkflowApp();
594        def.readFields(dIn);
595        status = Status.valueOf(dIn.readUTF());
596        int numExPaths = dIn.readInt();
597        for (int x = 0; x < numExPaths; x++) {
598            String path = dIn.readUTF();
599            String nodeName = dIn.readUTF();
600            boolean isStarted = dIn.readBoolean();
601            NodeInstance nodeInstance = new NodeInstance(nodeName);
602            nodeInstance.started = isStarted;
603            executionPaths.put(path, nodeInstance);
604        }
605        int numVars = dIn.readInt();
606        for (int x = 0; x < numVars; x++) {
607            String vName = dIn.readUTF();
608            persistentVars.put(vName, StringSerializationUtil.readString(dIn));
609        }
610        refreshLog();
611    }
612
613    @Override
614    public Configuration getConf() {
615        return conf;
616    }
617
618    @Override
619    public WorkflowApp getApp() {
620        return def;
621    }
622
623    @Override
624    public String getId() {
625        return instanceId;
626    }
627
628    @Override
629    public String getTransition(String node) {
630        return persistentVars.get(node + WorkflowInstance.NODE_VAR_SEPARATOR + TRANSITION_TO);
631    }
632
633    @Override
634    public boolean equals(Object o) {
635        return (o != null) && (getClass().isInstance(o)) && ((WorkflowInstance) o).getId().equals(instanceId);
636    }
637
638    @Override
639    public int hashCode() {
640        return instanceId.hashCode();
641    }
642}