001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.workflow.lite; 019 020import org.apache.oozie.service.XLogService; 021import org.apache.oozie.service.DagXLogInfoService; 022import org.apache.oozie.client.OozieClient; 023import org.apache.hadoop.io.Writable; 024import org.apache.hadoop.util.ReflectionUtils; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.oozie.workflow.WorkflowApp; 027import org.apache.oozie.workflow.WorkflowException; 028import org.apache.oozie.workflow.WorkflowInstance; 029import org.apache.oozie.util.ParamChecker; 030import org.apache.oozie.util.XLog; 031import org.apache.oozie.util.XConfiguration; 032import org.apache.oozie.ErrorCode; 033 034import java.io.DataInput; 035import java.io.DataOutput; 036import java.io.IOException; 037import java.io.ByteArrayOutputStream; 038import java.io.ByteArrayInputStream; 039import java.util.ArrayList; 040import java.util.Arrays; 041import java.util.Collections; 042import java.util.Comparator; 043import java.util.Date; 044import java.util.HashMap; 045import java.util.List; 046import java.util.Map; 047 048//TODO javadoc 049public class LiteWorkflowInstance implements Writable, WorkflowInstance { 050 private static final String TRANSITION_TO = "transition.to"; 051 052 private final Date FAR_INTO_THE_FUTURE = new Date(Long.MAX_VALUE); 053 054 private XLog log; 055 056 private static String PATH_SEPARATOR = "/"; 057 private static String ROOT = PATH_SEPARATOR; 058 private static String TRANSITION_SEPARATOR = "#"; 059 060 // Using unique string to indicate version. This is to make sure that it 061 // doesn't match with user data. 062 private static final String DATA_VERSION = "V==1"; 063 064 private static class NodeInstance { 065 String nodeName; 066 boolean started = false; 067 068 private NodeInstance(String nodeName) { 069 this.nodeName = nodeName; 070 } 071 } 072 073 private class Context implements NodeHandler.Context { 074 private NodeDef nodeDef; 075 private String executionPath; 076 private String exitState; 077 private Status status = Status.RUNNING; 078 079 private Context(NodeDef nodeDef, String executionPath, String exitState) { 080 this.nodeDef = nodeDef; 081 this.executionPath = executionPath; 082 this.exitState = exitState; 083 } 084 085 public NodeDef getNodeDef() { 086 return nodeDef; 087 } 088 089 public String getExecutionPath() { 090 return executionPath; 091 } 092 093 public String getParentExecutionPath(String executionPath) { 094 return LiteWorkflowInstance.getParentPath(executionPath); 095 } 096 097 public String getSignalValue() { 098 return exitState; 099 } 100 101 public String createExecutionPath(String name) { 102 return LiteWorkflowInstance.createChildPath(executionPath, name); 103 } 104 105 public String createFullTransition(String executionPath, String transition) { 106 return LiteWorkflowInstance.createFullTransition(executionPath, transition); 107 } 108 109 public void deleteExecutionPath() { 110 if (!executionPaths.containsKey(executionPath)) { 111 throw new IllegalStateException(); 112 } 113 executionPaths.remove(executionPath); 114 executionPath = LiteWorkflowInstance.getParentPath(executionPath); 115 } 116 117 public void failJob() { 118 status = Status.FAILED; 119 } 120 121 public void killJob() { 122 status = Status.KILLED; 123 } 124 125 public void completeJob() { 126 status = Status.SUCCEEDED; 127 } 128 129 @Override 130 public Object getTransientVar(String name) { 131 return LiteWorkflowInstance.this.getTransientVar(name); 132 } 133 134 @Override 135 public String getVar(String name) { 136 return LiteWorkflowInstance.this.getVar(name); 137 } 138 139 @Override 140 public void setTransientVar(String name, Object value) { 141 LiteWorkflowInstance.this.setTransientVar(name, value); 142 } 143 144 @Override 145 public void setVar(String name, String value) { 146 LiteWorkflowInstance.this.setVar(name, value); 147 } 148 149 @Override 150 public LiteWorkflowInstance getProcessInstance() { 151 return LiteWorkflowInstance.this; 152 } 153 154 } 155 156 private LiteWorkflowApp def; 157 private Configuration conf; 158 private String instanceId; 159 private Status status; 160 private Map<String, NodeInstance> executionPaths = new HashMap<String, NodeInstance>(); 161 private Map<String, String> persistentVars = new HashMap<String, String>(); 162 private Map<String, Object> transientVars = new HashMap<String, Object>(); 163 private ActionEndTimesComparator actionEndTimesComparator = null; 164 165 protected LiteWorkflowInstance() { 166 log = XLog.getLog(getClass()); 167 } 168 169 public LiteWorkflowInstance(LiteWorkflowApp def, Configuration conf, String instanceId) { 170 this(); 171 this.def = ParamChecker.notNull(def, "def"); 172 this.instanceId = ParamChecker.notNull(instanceId, "instanceId"); 173 this.conf = ParamChecker.notNull(conf, "conf"); 174 refreshLog(); 175 status = Status.PREP; 176 } 177 178 public LiteWorkflowInstance(LiteWorkflowApp def, Configuration conf, String instanceId, Map<String, Date> actionEndTimes) { 179 this(def, conf, instanceId); 180 actionEndTimesComparator = new ActionEndTimesComparator(actionEndTimes); 181 } 182 183 public synchronized boolean start() throws WorkflowException { 184 if (status != Status.PREP) { 185 throw new WorkflowException(ErrorCode.E0719); 186 } 187 log.debug(XLog.STD, "Starting job"); 188 status = Status.RUNNING; 189 executionPaths.put(ROOT, new NodeInstance(StartNodeDef.START)); 190 return signal(ROOT, StartNodeDef.START); 191 } 192 193 //todo if suspended store signal and use when resuming 194 195 public synchronized boolean signal(String executionPath, String signalValue) throws WorkflowException { 196 ParamChecker.notEmpty(executionPath, "executionPath"); 197 ParamChecker.notNull(signalValue, "signalValue"); 198 log.debug(XLog.STD, "Signaling job execution path [{0}] signal value [{1}]", executionPath, signalValue); 199 if (status != Status.RUNNING) { 200 throw new WorkflowException(ErrorCode.E0716); 201 } 202 NodeInstance nodeJob = executionPaths.get(executionPath); 203 if (nodeJob == null) { 204 status = Status.FAILED; 205 log.error("invalid execution path [{0}]", executionPath); 206 } 207 NodeDef nodeDef = null; 208 if (!status.isEndState()) { 209 nodeDef = def.getNode(nodeJob.nodeName); 210 if (nodeDef == null) { 211 status = Status.FAILED; 212 log.error("invalid transition [{0}]", nodeJob.nodeName); 213 } 214 } 215 if (!status.isEndState()) { 216 NodeHandler nodeHandler = newInstance(nodeDef.getHandlerClass()); 217 boolean exiting = true; 218 219 Context context = new Context(nodeDef, executionPath, signalValue); 220 if (!nodeJob.started) { 221 try { 222 nodeHandler.loopDetection(context); 223 exiting = nodeHandler.enter(context); 224 nodeJob.started = true; 225 } 226 catch (WorkflowException ex) { 227 status = Status.FAILED; 228 List<String> killedNodes = terminateNodes(Status.KILLED); 229 if (killedNodes.size() > 1) { 230 log.warn(XLog.STD, "Workflow completed [{0}], killing [{1}] running nodes", status, killedNodes 231 .size()); 232 } 233 throw ex; 234 } 235 } 236 237 if (exiting) { 238 List<String> pathsToStart = new ArrayList<String>(); 239 List<String> fullTransitions; 240 try { 241 fullTransitions = nodeHandler.multiExit(context); 242 int last = fullTransitions.size() - 1; 243 // TEST THIS 244 if (last >= 0) { 245 String transitionTo = getTransitionNode(fullTransitions.get(last)); 246 if (nodeDef instanceof ForkNodeDef) { 247 transitionTo = "*"; // WF action cannot hold all transitions for a fork. 248 // transitions are hardcoded in the WF app. 249 } 250 persistentVars.put(nodeDef.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + TRANSITION_TO, 251 transitionTo); 252 } 253 } 254 catch (WorkflowException ex) { 255 status = Status.FAILED; 256 throw ex; 257 } 258 259 if (context.status == Status.KILLED) { 260 status = Status.KILLED; 261 log.debug(XLog.STD, "Completing job, kill node [{0}]", nodeJob.nodeName); 262 } 263 else { 264 if (context.status == Status.FAILED) { 265 status = Status.FAILED; 266 log.debug(XLog.STD, "Completing job, fail node [{0}]", nodeJob.nodeName); 267 } 268 else { 269 if (context.status == Status.SUCCEEDED) { 270 status = Status.SUCCEEDED; 271 log.debug(XLog.STD, "Completing job, end node [{0}]", nodeJob.nodeName); 272 } 273/* 274 else if (context.status == Status.SUSPENDED) { 275 status = Status.SUSPENDED; 276 log.debug(XLog.STD, "Completing job, end node [{0}]", nodeJob.nodeName); 277 } 278*/ 279 else { 280 for (String fullTransition : fullTransitions) { 281 // this is the whole trick for forking, we need the 282 // executionpath and the transition 283 // in the case of no forking last element of 284 // executionpath is different from transition 285 // in the case of forking they are the same 286 287 log.debug(XLog.STD, "Exiting node [{0}] with transition[{1}]", nodeJob.nodeName, 288 fullTransition); 289 290 String execPathFromTransition = getExecutionPath(fullTransition); 291 String transition = getTransitionNode(fullTransition); 292 def.validateTransition(nodeJob.nodeName, transition); 293 294 NodeInstance nodeJobInPath = executionPaths.get(execPathFromTransition); 295 if ((nodeJobInPath == null) || (!transition.equals(nodeJobInPath.nodeName))) { 296 // TODO explain this IF better 297 // If the WfJob is signaled with the parent 298 // execution executionPath again 299 // The Fork node will execute again.. and replace 300 // the Node WorkflowJobBean 301 // so this is required to prevent that.. 302 // Question : Should we throw an error in this case 303 // ?? 304 executionPaths.put(execPathFromTransition, new NodeInstance(transition)); 305 pathsToStart.add(execPathFromTransition); 306 } 307 308 } 309 310 // If we're doing a rerun, then we need to make sure to put the actions in pathToStart into the order 311 // that they ended in. Otherwise, it could result in an error later on in some edge cases. 312 // e.g. You have a fork with two nodes, A and B, that both succeeded, followed by a join and some more 313 // nodes, some of which failed. If you do the rerun, it will always signal A and then B, even if in the 314 // original run B signaled first and then A. By sorting this, we maintain the proper signal ordering. 315 if (actionEndTimesComparator != null && pathsToStart.size() > 1) { 316 Collections.sort(pathsToStart, actionEndTimesComparator); 317 } 318 319 // signal all new synch transitions 320 for (String pathToStart : pathsToStart) { 321 signal(pathToStart, "::synch::"); 322 } 323 } 324 } 325 } 326 } 327 } 328 if (status.isEndState()) { 329 if (status == Status.FAILED) { 330 List<String> failedNodes = terminateNodes(status); 331 log.warn(XLog.STD, "Workflow completed [{0}], failing [{1}] running nodes", status, failedNodes 332 .size()); 333 } 334 else { 335 List<String> killedNodes = terminateNodes(Status.KILLED); 336 337 if (killedNodes.size() > 1) { 338 log.warn(XLog.STD, "Workflow completed [{0}], killing [{1}] running nodes", status, killedNodes 339 .size()); 340 } 341 } 342 } 343 return status.isEndState(); 344 } 345 346 /** 347 * Get NodeDef from workflow instance 348 * @param executionPath execution path 349 * @return node def 350 */ 351 public NodeDef getNodeDef(String executionPath) { 352 NodeInstance nodeJob = executionPaths.get(executionPath); 353 NodeDef nodeDef = null; 354 if (nodeJob == null) { 355 log.error("invalid execution path [{0}]", executionPath); 356 } 357 else { 358 nodeDef = def.getNode(nodeJob.nodeName); 359 if (nodeDef == null) { 360 log.error("invalid transition [{0}]", nodeJob.nodeName); 361 } 362 } 363 return nodeDef; 364 } 365 366 public synchronized void fail(String nodeName) throws WorkflowException { 367 if (status.isEndState()) { 368 throw new WorkflowException(ErrorCode.E0718); 369 } 370 String failedNode = failNode(nodeName); 371 if (failedNode != null) { 372 log.warn(XLog.STD, "Workflow Failed. Failing node [{0}]", failedNode); 373 } 374 else { 375 //TODO failed attempting to fail the action. EXCEPTION 376 } 377 List<String> killedNodes = killNodes(); 378 if (killedNodes.size() > 1) { 379 log.warn(XLog.STD, "Workflow Failed, killing [{0}] nodes", killedNodes.size()); 380 } 381 status = Status.FAILED; 382 } 383 384 public synchronized void kill() throws WorkflowException { 385 if (status.isEndState()) { 386 throw new WorkflowException(ErrorCode.E0718); 387 } 388 log.debug(XLog.STD, "Killing job"); 389 List<String> killedNodes = killNodes(); 390 if (killedNodes.size() > 1) { 391 log.warn(XLog.STD, "workflow killed, killing [{0}] nodes", killedNodes.size()); 392 } 393 status = Status.KILLED; 394 } 395 396 public synchronized void suspend() throws WorkflowException { 397 if (status != Status.RUNNING) { 398 throw new WorkflowException(ErrorCode.E0716); 399 } 400 log.debug(XLog.STD, "Suspending job"); 401 this.status = Status.SUSPENDED; 402 } 403 404 public boolean isSuspended() { 405 return (status == Status.SUSPENDED); 406 } 407 408 public synchronized void resume() throws WorkflowException { 409 if (status != Status.SUSPENDED) { 410 throw new WorkflowException(ErrorCode.E0717); 411 } 412 log.debug(XLog.STD, "Resuming job"); 413 status = Status.RUNNING; 414 } 415 416 public void setVar(String name, String value) { 417 if (value != null) { 418 persistentVars.put(name, value); 419 } 420 else { 421 persistentVars.remove(name); 422 } 423 } 424 425 @Override 426 public Map<String, String> getAllVars() { 427 return persistentVars; 428 } 429 430 @Override 431 public void setAllVars(Map<String, String> varMap) { 432 persistentVars.putAll(varMap); 433 } 434 435 public String getVar(String name) { 436 return persistentVars.get(name); 437 } 438 439 440 public void setTransientVar(String name, Object value) { 441 if (value != null) { 442 transientVars.put(name, value); 443 } 444 else { 445 transientVars.remove(name); 446 } 447 } 448 449 public boolean hasTransientVar(String name) { 450 return transientVars.containsKey(name); 451 } 452 453 public Object getTransientVar(String name) { 454 return transientVars.get(name); 455 } 456 457 public boolean hasEnded() { 458 return status.isEndState(); 459 } 460 461 private List<String> terminateNodes(Status endStatus) { 462 List<String> endNodes = new ArrayList<String>(); 463 for (Map.Entry<String, NodeInstance> entry : executionPaths.entrySet()) { 464 if (entry.getValue().started) { 465 NodeDef nodeDef = def.getNode(entry.getValue().nodeName); 466 if (!(nodeDef instanceof ControlNodeDef)) { 467 NodeHandler nodeHandler = newInstance(nodeDef.getHandlerClass()); 468 try { 469 if (endStatus == Status.KILLED) { 470 nodeHandler.kill(new Context(nodeDef, entry.getKey(), null)); 471 } 472 else { 473 if (endStatus == Status.FAILED) { 474 nodeHandler.fail(new Context(nodeDef, entry.getKey(), null)); 475 } 476 } 477 endNodes.add(nodeDef.getName()); 478 } 479 catch (Exception ex) { 480 log.warn(XLog.STD, "Error Changing node state to [{0}] for Node [{1}]", endStatus.toString(), 481 nodeDef.getName(), ex); 482 } 483 } 484 } 485 } 486 return endNodes; 487 } 488 489 private String failNode(String nodeName) { 490 String failedNode = null; 491 for (Map.Entry<String, NodeInstance> entry : executionPaths.entrySet()) { 492 String node = entry.getKey(); 493 NodeInstance nodeInstance = entry.getValue(); 494 if (nodeInstance.started && nodeInstance.nodeName.equals(nodeName)) { 495 NodeDef nodeDef = def.getNode(nodeInstance.nodeName); 496 NodeHandler nodeHandler = newInstance(nodeDef.getHandlerClass()); 497 try { 498 nodeHandler.fail(new Context(nodeDef, node, null)); 499 failedNode = nodeDef.getName(); 500 nodeInstance.started = false; 501 } 502 catch (Exception ex) { 503 log.warn(XLog.STD, "Error failing node [{0}]", nodeDef.getName(), ex); 504 } 505 return failedNode; 506 } 507 } 508 return failedNode; 509 } 510 511 private List<String> killNodes() { 512 List<String> killedNodes = new ArrayList<String>(); 513 for (Map.Entry<String, NodeInstance> entry : executionPaths.entrySet()) { 514 String node = entry.getKey(); 515 NodeInstance nodeInstance = entry.getValue(); 516 if (nodeInstance.started) { 517 NodeDef nodeDef = def.getNode(nodeInstance.nodeName); 518 NodeHandler nodeHandler = newInstance(nodeDef.getHandlerClass()); 519 try { 520 nodeHandler.kill(new Context(nodeDef, node, null)); 521 killedNodes.add(nodeDef.getName()); 522 } 523 catch (Exception ex) { 524 log.warn(XLog.STD, "Error killing node [{0}]", nodeDef.getName(), ex); 525 } 526 } 527 } 528 return killedNodes; 529 } 530 531 public LiteWorkflowApp getProcessDefinition() { 532 return def; 533 } 534 535 private static String createChildPath(String path, String child) { 536 return path + child + PATH_SEPARATOR; 537 } 538 539 private static String getParentPath(String path) { 540 path = path.substring(0, path.length() - 1); 541 return (path.length() == 0) ? null : path.substring(0, path.lastIndexOf(PATH_SEPARATOR) + 1); 542 } 543 544 private static String createFullTransition(String executionPath, String transition) { 545 return executionPath + TRANSITION_SEPARATOR + transition; 546 } 547 548 private static String getExecutionPath(String fullTransition) { 549 int index = fullTransition.indexOf(TRANSITION_SEPARATOR); 550 if (index == -1) { 551 throw new IllegalArgumentException("Invalid fullTransition"); 552 } 553 return fullTransition.substring(0, index); 554 } 555 556 private static String getTransitionNode(String fullTransition) { 557 int index = fullTransition.indexOf(TRANSITION_SEPARATOR); 558 if (index == -1) { 559 throw new IllegalArgumentException("Invalid fullTransition"); 560 } 561 return fullTransition.substring(index + 1); 562 } 563 564 private NodeHandler newInstance(Class<? extends NodeHandler> handler) { 565 return (NodeHandler) ReflectionUtils.newInstance(handler, null); 566 } 567 568 private void refreshLog() { 569 XLog.Info.get().setParameter(XLogService.USER, conf.get(OozieClient.USER_NAME)); 570 XLog.Info.get().setParameter(XLogService.GROUP, conf.get(OozieClient.GROUP_NAME)); 571 XLog.Info.get().setParameter(DagXLogInfoService.APP, def.getName()); 572 XLog.Info.get().setParameter(DagXLogInfoService.TOKEN, conf.get(OozieClient.LOG_TOKEN, "")); 573 XLog.Info.get().setParameter(DagXLogInfoService.JOB, instanceId); 574 log = XLog.getLog(getClass()); 575 } 576 577 public Status getStatus() { 578 return status; 579 } 580 581 public void setStatus(Status status) { 582 this.status = status; 583 } 584 585 @Override 586 public void write(DataOutput dOut) throws IOException { 587 dOut.writeUTF(instanceId); 588 589 //Hadoop Configuration has to get its act right 590 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 591 conf.writeXml(baos); 592 baos.close(); 593 byte[] array = baos.toByteArray(); 594 dOut.writeInt(array.length); 595 dOut.write(array); 596 597 def.write(dOut); 598 dOut.writeUTF(status.toString()); 599 dOut.writeInt(executionPaths.size()); 600 for (Map.Entry<String, NodeInstance> entry : executionPaths.entrySet()) { 601 dOut.writeUTF(entry.getKey()); 602 dOut.writeUTF(entry.getValue().nodeName); 603 dOut.writeBoolean(entry.getValue().started); 604 } 605 dOut.writeInt(persistentVars.size()); 606 for (Map.Entry<String, String> entry : persistentVars.entrySet()) { 607 dOut.writeUTF(entry.getKey()); 608 writeStringAsBytes(entry.getValue(), dOut); 609 } 610 if (actionEndTimesComparator != null) { 611 Map<String, Date> actionEndTimes = actionEndTimesComparator.getActionEndTimes(); 612 dOut.writeInt(actionEndTimes.size()); 613 for (Map.Entry<String, Date> entry : actionEndTimes.entrySet()) { 614 dOut.writeUTF(entry.getKey()); 615 dOut.writeLong(entry.getValue().getTime()); 616 } 617 } 618 } 619 620 @Override 621 public void readFields(DataInput dIn) throws IOException { 622 instanceId = dIn.readUTF(); 623 624 //Hadoop Configuration has to get its act right 625 int len = dIn.readInt(); 626 byte[] array = new byte[len]; 627 dIn.readFully(array); 628 ByteArrayInputStream bais = new ByteArrayInputStream(array); 629 conf = new XConfiguration(bais); 630 631 def = new LiteWorkflowApp(); 632 def.readFields(dIn); 633 status = Status.valueOf(dIn.readUTF()); 634 int numExPaths = dIn.readInt(); 635 for (int x = 0; x < numExPaths; x++) { 636 String path = dIn.readUTF(); 637 String nodeName = dIn.readUTF(); 638 boolean isStarted = dIn.readBoolean(); 639 NodeInstance nodeInstance = new NodeInstance(nodeName); 640 nodeInstance.started = isStarted; 641 executionPaths.put(path, nodeInstance); 642 } 643 int numVars = dIn.readInt(); 644 for (int x = 0; x < numVars; x++) { 645 String vName = dIn.readUTF(); 646 String vVal = readBytesAsString(dIn); 647 persistentVars.put(vName, vVal); 648 } 649 int numActionEndTimes = -1; 650 try { 651 numActionEndTimes = dIn.readInt(); 652 } catch (IOException ioe) { 653 // This means that there isn't an actionEndTimes, so just ignore 654 } 655 if (numActionEndTimes > 0) { 656 Map<String, Date> actionEndTimes = new HashMap<String, Date>(numActionEndTimes); 657 for (int x = 0; x < numActionEndTimes; x++) { 658 String name = dIn.readUTF(); 659 long endTime = dIn.readLong(); 660 actionEndTimes.put(name, new Date(endTime)); 661 } 662 actionEndTimesComparator = new ActionEndTimesComparator(actionEndTimes); 663 } 664 refreshLog(); 665 } 666 667 private void writeStringAsBytes(String value, DataOutput dOut) throws IOException { 668 if (value == null) { 669 dOut.writeUTF(null); 670 return; 671 } 672 dOut.writeUTF(DATA_VERSION); 673 byte[] data = value.getBytes("UTF-8"); 674 dOut.writeInt(data.length); 675 dOut.write(data); 676 } 677 678 private String readBytesAsString(DataInput dIn) throws IOException { 679 String value = dIn.readUTF(); 680 if (value != null && value.equals(DATA_VERSION)) { 681 int length = dIn.readInt(); 682 byte[] data = new byte[length]; 683 dIn.readFully(data); 684 value = new String(data, "UTF-8"); 685 } 686 return value; 687 } 688 689 @Override 690 public Configuration getConf() { 691 return conf; 692 } 693 694 @Override 695 public WorkflowApp getApp() { 696 return def; 697 } 698 699 @Override 700 public String getId() { 701 return instanceId; 702 } 703 704 @Override 705 public String getTransition(String node) { 706 return persistentVars.get(node + WorkflowInstance.NODE_VAR_SEPARATOR + TRANSITION_TO); 707 } 708 709 @Override 710 public boolean equals(Object o) { 711 return (o != null) && (getClass().isInstance(o)) && ((WorkflowInstance) o).getId().equals(instanceId); 712 } 713 714 @Override 715 public int hashCode() { 716 return instanceId.hashCode(); 717 } 718 719 private class ActionEndTimesComparator implements Comparator<String> { 720 721 private final Map<String, Date> actionEndTimes; 722 723 public ActionEndTimesComparator(Map<String, Date> actionEndTimes) { 724 this.actionEndTimes = actionEndTimes; 725 } 726 727 @Override 728 public int compare(String node1, String node2) { 729 Date date1 = null; 730 Date date2 = null; 731 NodeInstance node1Instance = executionPaths.get(node1); 732 if (node1Instance != null) { 733 date1 = this.actionEndTimes.get(node1Instance.nodeName); 734 } 735 NodeInstance node2Instance = executionPaths.get(node2); 736 if (node2Instance != null) { 737 date2 = this.actionEndTimes.get(node2Instance.nodeName); 738 } 739 date1 = (date1 == null) ? FAR_INTO_THE_FUTURE : date1; 740 date2 = (date2 == null) ? FAR_INTO_THE_FUTURE : date2; 741 return date1.compareTo(date2); 742 } 743 744 public Map<String, Date> getActionEndTimes() { 745 return actionEndTimes; 746 } 747 } 748}