001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.workflow.lite;
019
020import org.apache.oozie.service.XLogService;
021import org.apache.oozie.service.DagXLogInfoService;
022import org.apache.oozie.client.OozieClient;
023import org.apache.hadoop.io.Writable;
024import org.apache.hadoop.util.ReflectionUtils;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.oozie.workflow.WorkflowApp;
027import org.apache.oozie.workflow.WorkflowException;
028import org.apache.oozie.workflow.WorkflowInstance;
029import org.apache.oozie.util.ParamChecker;
030import org.apache.oozie.util.XLog;
031import org.apache.oozie.util.XConfiguration;
032import org.apache.oozie.ErrorCode;
033
034import java.io.DataInput;
035import java.io.DataOutput;
036import java.io.IOException;
037import java.io.ByteArrayOutputStream;
038import java.io.ByteArrayInputStream;
039import java.util.ArrayList;
040import java.util.Arrays;
041import java.util.Collections;
042import java.util.Comparator;
043import java.util.Date;
044import java.util.HashMap;
045import java.util.List;
046import java.util.Map;
047
048//TODO javadoc
049public class LiteWorkflowInstance implements Writable, WorkflowInstance {
050    private static final String TRANSITION_TO = "transition.to";
051
052    private final Date FAR_INTO_THE_FUTURE = new Date(Long.MAX_VALUE);
053
054    private XLog log;
055
056    private static String PATH_SEPARATOR = "/";
057    private static String ROOT = PATH_SEPARATOR;
058    private static String TRANSITION_SEPARATOR = "#";
059
060    // Using unique string to indicate version. This is to make sure that it
061    // doesn't match with user data.
062    private static final String DATA_VERSION = "V==1";
063
064    private static class NodeInstance {
065        String nodeName;
066        boolean started = false;
067
068        private NodeInstance(String nodeName) {
069            this.nodeName = nodeName;
070        }
071    }
072
073    private class Context implements NodeHandler.Context {
074        private NodeDef nodeDef;
075        private String executionPath;
076        private String exitState;
077        private Status status = Status.RUNNING;
078
079        private Context(NodeDef nodeDef, String executionPath, String exitState) {
080            this.nodeDef = nodeDef;
081            this.executionPath = executionPath;
082            this.exitState = exitState;
083        }
084
085        public NodeDef getNodeDef() {
086            return nodeDef;
087        }
088
089        public String getExecutionPath() {
090            return executionPath;
091        }
092
093        public String getParentExecutionPath(String executionPath) {
094            return LiteWorkflowInstance.getParentPath(executionPath);
095        }
096
097        public String getSignalValue() {
098            return exitState;
099        }
100
101        public String createExecutionPath(String name) {
102            return LiteWorkflowInstance.createChildPath(executionPath, name);
103        }
104
105        public String createFullTransition(String executionPath, String transition) {
106            return LiteWorkflowInstance.createFullTransition(executionPath, transition);
107        }
108
109        public void deleteExecutionPath() {
110            if (!executionPaths.containsKey(executionPath)) {
111                throw new IllegalStateException();
112            }
113            executionPaths.remove(executionPath);
114            executionPath = LiteWorkflowInstance.getParentPath(executionPath);
115        }
116
117        public void failJob() {
118            status = Status.FAILED;
119        }
120
121        public void killJob() {
122            status = Status.KILLED;
123        }
124
125        public void completeJob() {
126            status = Status.SUCCEEDED;
127        }
128
129        @Override
130        public Object getTransientVar(String name) {
131            return LiteWorkflowInstance.this.getTransientVar(name);
132        }
133
134        @Override
135        public String getVar(String name) {
136            return LiteWorkflowInstance.this.getVar(name);
137        }
138
139        @Override
140        public void setTransientVar(String name, Object value) {
141            LiteWorkflowInstance.this.setTransientVar(name, value);
142        }
143
144        @Override
145        public void setVar(String name, String value) {
146            LiteWorkflowInstance.this.setVar(name, value);
147        }
148
149        @Override
150        public LiteWorkflowInstance getProcessInstance() {
151            return LiteWorkflowInstance.this;
152        }
153
154    }
155
156    private LiteWorkflowApp def;
157    private Configuration conf;
158    private String instanceId;
159    private Status status;
160    private Map<String, NodeInstance> executionPaths = new HashMap<String, NodeInstance>();
161    private Map<String, String> persistentVars = new HashMap<String, String>();
162    private Map<String, Object> transientVars = new HashMap<String, Object>();
163    private ActionEndTimesComparator actionEndTimesComparator = null;
164
165    protected LiteWorkflowInstance() {
166        log = XLog.getLog(getClass());
167    }
168
169    public LiteWorkflowInstance(LiteWorkflowApp def, Configuration conf, String instanceId) {
170        this();
171        this.def = ParamChecker.notNull(def, "def");
172        this.instanceId = ParamChecker.notNull(instanceId, "instanceId");
173        this.conf = ParamChecker.notNull(conf, "conf");
174        refreshLog();
175        status = Status.PREP;
176    }
177
178    public LiteWorkflowInstance(LiteWorkflowApp def, Configuration conf, String instanceId, Map<String, Date> actionEndTimes) {
179        this(def, conf, instanceId);
180        actionEndTimesComparator = new ActionEndTimesComparator(actionEndTimes);
181    }
182
183    public synchronized boolean start() throws WorkflowException {
184        if (status != Status.PREP) {
185            throw new WorkflowException(ErrorCode.E0719);
186        }
187        log.debug(XLog.STD, "Starting job");
188        status = Status.RUNNING;
189        executionPaths.put(ROOT, new NodeInstance(StartNodeDef.START));
190        return signal(ROOT, StartNodeDef.START);
191    }
192
193    //todo if suspended store signal and use when resuming
194
195    public synchronized boolean signal(String executionPath, String signalValue) throws WorkflowException {
196        ParamChecker.notEmpty(executionPath, "executionPath");
197        ParamChecker.notNull(signalValue, "signalValue");
198        log.debug(XLog.STD, "Signaling job execution path [{0}] signal value [{1}]", executionPath, signalValue);
199        if (status != Status.RUNNING) {
200            throw new WorkflowException(ErrorCode.E0716);
201        }
202        NodeInstance nodeJob = executionPaths.get(executionPath);
203        if (nodeJob == null) {
204            status = Status.FAILED;
205            log.error("invalid execution path [{0}]", executionPath);
206        }
207        NodeDef nodeDef = null;
208        if (!status.isEndState()) {
209            nodeDef = def.getNode(nodeJob.nodeName);
210            if (nodeDef == null) {
211                status = Status.FAILED;
212                log.error("invalid transition [{0}]", nodeJob.nodeName);
213            }
214        }
215        if (!status.isEndState()) {
216            NodeHandler nodeHandler = newInstance(nodeDef.getHandlerClass());
217            boolean exiting = true;
218
219            Context context = new Context(nodeDef, executionPath, signalValue);
220            if (!nodeJob.started) {
221                try {
222                    nodeHandler.loopDetection(context);
223                    exiting = nodeHandler.enter(context);
224                    nodeJob.started = true;
225                }
226                catch (WorkflowException ex) {
227                    status = Status.FAILED;
228                    List<String> killedNodes = terminateNodes(Status.KILLED);
229                    if (killedNodes.size() > 1) {
230                        log.warn(XLog.STD, "Workflow completed [{0}], killing [{1}] running nodes", status, killedNodes
231                                .size());
232                    }
233                    throw ex;
234                }
235            }
236
237            if (exiting) {
238                List<String> pathsToStart = new ArrayList<String>();
239                List<String> fullTransitions;
240                try {
241                    fullTransitions = nodeHandler.multiExit(context);
242                    int last = fullTransitions.size() - 1;
243                    // TEST THIS
244                    if (last >= 0) {
245                        String transitionTo = getTransitionNode(fullTransitions.get(last));
246                        if (nodeDef instanceof ForkNodeDef) {
247                            transitionTo = "*"; // WF action cannot hold all transitions for a fork.
248                                                // transitions are hardcoded in the WF app.
249                        }
250                        persistentVars.put(nodeDef.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + TRANSITION_TO,
251                                           transitionTo);
252                    }
253                }
254                catch (WorkflowException ex) {
255                    status = Status.FAILED;
256                    throw ex;
257                }
258
259                if (context.status == Status.KILLED) {
260                    status = Status.KILLED;
261                    log.debug(XLog.STD, "Completing job, kill node [{0}]", nodeJob.nodeName);
262                }
263                else {
264                    if (context.status == Status.FAILED) {
265                        status = Status.FAILED;
266                        log.debug(XLog.STD, "Completing job, fail node [{0}]", nodeJob.nodeName);
267                    }
268                    else {
269                        if (context.status == Status.SUCCEEDED) {
270                            status = Status.SUCCEEDED;
271                            log.debug(XLog.STD, "Completing job, end node [{0}]", nodeJob.nodeName);
272                        }
273/*
274                else if (context.status == Status.SUSPENDED) {
275                    status = Status.SUSPENDED;
276                    log.debug(XLog.STD, "Completing job, end node [{0}]", nodeJob.nodeName);
277                }
278*/
279                        else {
280                            for (String fullTransition : fullTransitions) {
281                                // this is the whole trick for forking, we need the
282                                // executionpath and the transition
283                                // in the case of no forking last element of
284                                // executionpath is different from transition
285                                // in the case of forking they are the same
286
287                                log.debug(XLog.STD, "Exiting node [{0}] with transition[{1}]", nodeJob.nodeName,
288                                          fullTransition);
289
290                                String execPathFromTransition = getExecutionPath(fullTransition);
291                                String transition = getTransitionNode(fullTransition);
292                                def.validateTransition(nodeJob.nodeName, transition);
293
294                                NodeInstance nodeJobInPath = executionPaths.get(execPathFromTransition);
295                                if ((nodeJobInPath == null) || (!transition.equals(nodeJobInPath.nodeName))) {
296                                    // TODO explain this IF better
297                                    // If the WfJob is signaled with the parent
298                                    // execution executionPath again
299                                    // The Fork node will execute again.. and replace
300                                    // the Node WorkflowJobBean
301                                    // so this is required to prevent that..
302                                    // Question : Should we throw an error in this case
303                                    // ??
304                                    executionPaths.put(execPathFromTransition, new NodeInstance(transition));
305                                    pathsToStart.add(execPathFromTransition);
306                                }
307
308                            }
309
310                            // If we're doing a rerun, then we need to make sure to put the actions in pathToStart into the order
311                            // that they ended in.  Otherwise, it could result in an error later on in some edge cases.
312                            // e.g. You have a fork with two nodes, A and B, that both succeeded, followed by a join and some more
313                            // nodes, some of which failed.  If you do the rerun, it will always signal A and then B, even if in the
314                            // original run B signaled first and then A.  By sorting this, we maintain the proper signal ordering.
315                            if (actionEndTimesComparator != null && pathsToStart.size() > 1) {
316                                Collections.sort(pathsToStart, actionEndTimesComparator);
317                            }
318
319                            // signal all new synch transitions
320                            for (String pathToStart : pathsToStart) {
321                                signal(pathToStart, "::synch::");
322                            }
323                        }
324                    }
325                }
326            }
327        }
328        if (status.isEndState()) {
329            if (status == Status.FAILED) {
330                List<String> failedNodes = terminateNodes(status);
331                log.warn(XLog.STD, "Workflow completed [{0}], failing [{1}] running nodes", status, failedNodes
332                        .size());
333            }
334            else {
335                List<String> killedNodes = terminateNodes(Status.KILLED);
336
337                if (killedNodes.size() > 1) {
338                    log.warn(XLog.STD, "Workflow completed [{0}], killing [{1}] running nodes", status, killedNodes
339                            .size());
340                }
341            }
342        }
343        return status.isEndState();
344    }
345
346    /**
347     * Get NodeDef from workflow instance
348     * @param executionPath execution path
349     * @return node def
350     */
351    public NodeDef getNodeDef(String executionPath) {
352        NodeInstance nodeJob = executionPaths.get(executionPath);
353        NodeDef nodeDef = null;
354        if (nodeJob == null) {
355            log.error("invalid execution path [{0}]", executionPath);
356        }
357        else {
358            nodeDef = def.getNode(nodeJob.nodeName);
359            if (nodeDef == null) {
360                log.error("invalid transition [{0}]", nodeJob.nodeName);
361            }
362        }
363        return nodeDef;
364    }
365
366    public synchronized void fail(String nodeName) throws WorkflowException {
367        if (status.isEndState()) {
368            throw new WorkflowException(ErrorCode.E0718);
369        }
370        String failedNode = failNode(nodeName);
371        if (failedNode != null) {
372            log.warn(XLog.STD, "Workflow Failed. Failing node [{0}]", failedNode);
373        }
374        else {
375            //TODO failed attempting to fail the action. EXCEPTION
376        }
377        List<String> killedNodes = killNodes();
378        if (killedNodes.size() > 1) {
379            log.warn(XLog.STD, "Workflow Failed, killing [{0}] nodes", killedNodes.size());
380        }
381        status = Status.FAILED;
382    }
383
384    public synchronized void kill() throws WorkflowException {
385        if (status.isEndState()) {
386            throw new WorkflowException(ErrorCode.E0718);
387        }
388        log.debug(XLog.STD, "Killing job");
389        List<String> killedNodes = killNodes();
390        if (killedNodes.size() > 1) {
391            log.warn(XLog.STD, "workflow killed, killing [{0}] nodes", killedNodes.size());
392        }
393        status = Status.KILLED;
394    }
395
396    public synchronized void suspend() throws WorkflowException {
397        if (status != Status.RUNNING) {
398            throw new WorkflowException(ErrorCode.E0716);
399        }
400        log.debug(XLog.STD, "Suspending job");
401        this.status = Status.SUSPENDED;
402    }
403
404    public boolean isSuspended() {
405        return (status == Status.SUSPENDED);
406    }
407
408    public synchronized void resume() throws WorkflowException {
409        if (status != Status.SUSPENDED) {
410            throw new WorkflowException(ErrorCode.E0717);
411        }
412        log.debug(XLog.STD, "Resuming job");
413        status = Status.RUNNING;
414    }
415
416    public void setVar(String name, String value) {
417        if (value != null) {
418            persistentVars.put(name, value);
419        }
420        else {
421            persistentVars.remove(name);
422        }
423    }
424
425    @Override
426    public Map<String, String> getAllVars() {
427        return persistentVars;
428    }
429
430    @Override
431    public void setAllVars(Map<String, String> varMap) {
432        persistentVars.putAll(varMap);
433    }
434
435    public String getVar(String name) {
436        return persistentVars.get(name);
437    }
438
439
440    public void setTransientVar(String name, Object value) {
441        if (value != null) {
442            transientVars.put(name, value);
443        }
444        else {
445            transientVars.remove(name);
446        }
447    }
448
449    public boolean hasTransientVar(String name) {
450        return transientVars.containsKey(name);
451    }
452
453    public Object getTransientVar(String name) {
454        return transientVars.get(name);
455    }
456
457    public boolean hasEnded() {
458        return status.isEndState();
459    }
460
461    private List<String> terminateNodes(Status endStatus) {
462        List<String> endNodes = new ArrayList<String>();
463        for (Map.Entry<String, NodeInstance> entry : executionPaths.entrySet()) {
464            if (entry.getValue().started) {
465                NodeDef nodeDef = def.getNode(entry.getValue().nodeName);
466                if (!(nodeDef instanceof ControlNodeDef)) {
467                    NodeHandler nodeHandler = newInstance(nodeDef.getHandlerClass());
468                    try {
469                        if (endStatus == Status.KILLED) {
470                            nodeHandler.kill(new Context(nodeDef, entry.getKey(), null));
471                        }
472                        else {
473                            if (endStatus == Status.FAILED) {
474                                nodeHandler.fail(new Context(nodeDef, entry.getKey(), null));
475                            }
476                        }
477                        endNodes.add(nodeDef.getName());
478                    }
479                    catch (Exception ex) {
480                        log.warn(XLog.STD, "Error Changing node state to [{0}] for Node [{1}]", endStatus.toString(),
481                                 nodeDef.getName(), ex);
482                    }
483                }
484            }
485        }
486        return endNodes;
487    }
488
489    private String failNode(String nodeName) {
490        String failedNode = null;
491        for (Map.Entry<String, NodeInstance> entry : executionPaths.entrySet()) {
492            String node = entry.getKey();
493            NodeInstance nodeInstance = entry.getValue();
494            if (nodeInstance.started && nodeInstance.nodeName.equals(nodeName)) {
495                NodeDef nodeDef = def.getNode(nodeInstance.nodeName);
496                NodeHandler nodeHandler = newInstance(nodeDef.getHandlerClass());
497                try {
498                    nodeHandler.fail(new Context(nodeDef, node, null));
499                    failedNode = nodeDef.getName();
500                    nodeInstance.started = false;
501                }
502                catch (Exception ex) {
503                    log.warn(XLog.STD, "Error failing node [{0}]", nodeDef.getName(), ex);
504                }
505                return failedNode;
506            }
507        }
508        return failedNode;
509    }
510
511    private List<String> killNodes() {
512        List<String> killedNodes = new ArrayList<String>();
513        for (Map.Entry<String, NodeInstance> entry : executionPaths.entrySet()) {
514            String node = entry.getKey();
515            NodeInstance nodeInstance = entry.getValue();
516            if (nodeInstance.started) {
517                NodeDef nodeDef = def.getNode(nodeInstance.nodeName);
518                NodeHandler nodeHandler = newInstance(nodeDef.getHandlerClass());
519                try {
520                    nodeHandler.kill(new Context(nodeDef, node, null));
521                    killedNodes.add(nodeDef.getName());
522                }
523                catch (Exception ex) {
524                    log.warn(XLog.STD, "Error killing node [{0}]", nodeDef.getName(), ex);
525                }
526            }
527        }
528        return killedNodes;
529    }
530
531    public LiteWorkflowApp getProcessDefinition() {
532        return def;
533    }
534
535    private static String createChildPath(String path, String child) {
536        return path + child + PATH_SEPARATOR;
537    }
538
539    private static String getParentPath(String path) {
540        path = path.substring(0, path.length() - 1);
541        return (path.length() == 0) ? null : path.substring(0, path.lastIndexOf(PATH_SEPARATOR) + 1);
542    }
543
544    private static String createFullTransition(String executionPath, String transition) {
545        return executionPath + TRANSITION_SEPARATOR + transition;
546    }
547
548    private static String getExecutionPath(String fullTransition) {
549        int index = fullTransition.indexOf(TRANSITION_SEPARATOR);
550        if (index == -1) {
551            throw new IllegalArgumentException("Invalid fullTransition");
552        }
553        return fullTransition.substring(0, index);
554    }
555
556    private static String getTransitionNode(String fullTransition) {
557        int index = fullTransition.indexOf(TRANSITION_SEPARATOR);
558        if (index == -1) {
559            throw new IllegalArgumentException("Invalid fullTransition");
560        }
561        return fullTransition.substring(index + 1);
562    }
563
564    private NodeHandler newInstance(Class<? extends NodeHandler> handler) {
565        return (NodeHandler) ReflectionUtils.newInstance(handler, null);
566    }
567
568    private void refreshLog() {
569        XLog.Info.get().setParameter(XLogService.USER, conf.get(OozieClient.USER_NAME));
570        XLog.Info.get().setParameter(XLogService.GROUP, conf.get(OozieClient.GROUP_NAME));
571        XLog.Info.get().setParameter(DagXLogInfoService.APP, def.getName());
572        XLog.Info.get().setParameter(DagXLogInfoService.TOKEN, conf.get(OozieClient.LOG_TOKEN, ""));
573        XLog.Info.get().setParameter(DagXLogInfoService.JOB, instanceId);
574        log = XLog.getLog(getClass());
575    }
576
577    public Status getStatus() {
578        return status;
579    }
580
581    public void setStatus(Status status) {
582        this.status = status;
583    }
584
585    @Override
586    public void write(DataOutput dOut) throws IOException {
587        dOut.writeUTF(instanceId);
588
589        //Hadoop Configuration has to get its act right
590        ByteArrayOutputStream baos = new ByteArrayOutputStream();
591        conf.writeXml(baos);
592        baos.close();
593        byte[] array = baos.toByteArray();
594        dOut.writeInt(array.length);
595        dOut.write(array);
596
597        def.write(dOut);
598        dOut.writeUTF(status.toString());
599        dOut.writeInt(executionPaths.size());
600        for (Map.Entry<String, NodeInstance> entry : executionPaths.entrySet()) {
601            dOut.writeUTF(entry.getKey());
602            dOut.writeUTF(entry.getValue().nodeName);
603            dOut.writeBoolean(entry.getValue().started);
604        }
605        dOut.writeInt(persistentVars.size());
606        for (Map.Entry<String, String> entry : persistentVars.entrySet()) {
607            dOut.writeUTF(entry.getKey());
608            writeStringAsBytes(entry.getValue(), dOut);
609        }
610        if (actionEndTimesComparator != null) {
611            Map<String, Date> actionEndTimes = actionEndTimesComparator.getActionEndTimes();
612            dOut.writeInt(actionEndTimes.size());
613            for (Map.Entry<String, Date> entry : actionEndTimes.entrySet()) {
614                dOut.writeUTF(entry.getKey());
615                dOut.writeLong(entry.getValue().getTime());
616            }
617        }
618    }
619
620    @Override
621    public void readFields(DataInput dIn) throws IOException {
622        instanceId = dIn.readUTF();
623
624        //Hadoop Configuration has to get its act right
625        int len = dIn.readInt();
626        byte[] array = new byte[len];
627        dIn.readFully(array);
628        ByteArrayInputStream bais = new ByteArrayInputStream(array);
629        conf = new XConfiguration(bais);
630
631        def = new LiteWorkflowApp();
632        def.readFields(dIn);
633        status = Status.valueOf(dIn.readUTF());
634        int numExPaths = dIn.readInt();
635        for (int x = 0; x < numExPaths; x++) {
636            String path = dIn.readUTF();
637            String nodeName = dIn.readUTF();
638            boolean isStarted = dIn.readBoolean();
639            NodeInstance nodeInstance = new NodeInstance(nodeName);
640            nodeInstance.started = isStarted;
641            executionPaths.put(path, nodeInstance);
642        }
643        int numVars = dIn.readInt();
644        for (int x = 0; x < numVars; x++) {
645            String vName = dIn.readUTF();
646            String vVal = readBytesAsString(dIn);
647            persistentVars.put(vName, vVal);
648        }
649        int numActionEndTimes = -1;
650        try {
651            numActionEndTimes = dIn.readInt();
652        } catch (IOException ioe) {
653            // This means that there isn't an actionEndTimes, so just ignore
654        }
655        if (numActionEndTimes > 0) {
656            Map<String, Date> actionEndTimes = new HashMap<String, Date>(numActionEndTimes);
657            for (int x = 0; x < numActionEndTimes; x++) {
658                String name = dIn.readUTF();
659                long endTime = dIn.readLong();
660                actionEndTimes.put(name, new Date(endTime));
661            }
662            actionEndTimesComparator = new ActionEndTimesComparator(actionEndTimes);
663        }
664        refreshLog();
665    }
666
667    private void writeStringAsBytes(String value, DataOutput dOut) throws IOException {
668        if (value == null) {
669            dOut.writeUTF(null);
670            return;
671        }
672        dOut.writeUTF(DATA_VERSION);
673        byte[] data = value.getBytes("UTF-8");
674        dOut.writeInt(data.length);
675        dOut.write(data);
676    }
677
678    private String readBytesAsString(DataInput dIn) throws IOException {
679        String value = dIn.readUTF();
680        if (value != null && value.equals(DATA_VERSION)) {
681            int length = dIn.readInt();
682            byte[] data = new byte[length];
683            dIn.readFully(data);
684            value = new String(data, "UTF-8");
685        }
686        return value;
687    }
688
689    @Override
690    public Configuration getConf() {
691        return conf;
692    }
693
694    @Override
695    public WorkflowApp getApp() {
696        return def;
697    }
698
699    @Override
700    public String getId() {
701        return instanceId;
702    }
703
704    @Override
705    public String getTransition(String node) {
706        return persistentVars.get(node + WorkflowInstance.NODE_VAR_SEPARATOR + TRANSITION_TO);
707    }
708
709    @Override
710    public boolean equals(Object o) {
711        return (o != null) && (getClass().isInstance(o)) && ((WorkflowInstance) o).getId().equals(instanceId);
712    }
713
714    @Override
715    public int hashCode() {
716        return instanceId.hashCode();
717    }
718
719    private class ActionEndTimesComparator implements Comparator<String> {
720
721        private final Map<String, Date> actionEndTimes;
722
723        public ActionEndTimesComparator(Map<String, Date> actionEndTimes) {
724            this.actionEndTimes = actionEndTimes;
725        }
726
727        @Override
728        public int compare(String node1, String node2) {
729            Date date1 = null;
730            Date date2 = null;
731            NodeInstance node1Instance = executionPaths.get(node1);
732            if (node1Instance != null) {
733                date1 = this.actionEndTimes.get(node1Instance.nodeName);
734            }
735            NodeInstance node2Instance = executionPaths.get(node2);
736            if (node2Instance != null) {
737                date2 = this.actionEndTimes.get(node2Instance.nodeName);
738            }
739            date1 = (date1 == null) ? FAR_INTO_THE_FUTURE : date1;
740            date2 = (date2 == null) ? FAR_INTO_THE_FUTURE : date2;
741            return date1.compareTo(date2);
742        }
743
744        public Map<String, Date> getActionEndTimes() {
745            return actionEndTimes;
746        }
747    }
748}