001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.workflow.lite; 020 021import org.apache.hadoop.conf.Configuration; 022import org.apache.hadoop.io.Writable; 023import org.apache.hadoop.util.ReflectionUtils; 024import org.apache.oozie.ErrorCode; 025import org.apache.oozie.client.OozieClient; 026import org.apache.oozie.service.DagXLogInfoService; 027import org.apache.oozie.service.XLogService; 028import org.apache.oozie.util.StringSerializationUtil; 029import org.apache.oozie.util.ParamChecker; 030import org.apache.oozie.util.XConfiguration; 031import org.apache.oozie.util.XLog; 032import org.apache.oozie.workflow.WorkflowApp; 033import org.apache.oozie.workflow.WorkflowException; 034import org.apache.oozie.workflow.WorkflowInstance; 035 036import java.io.ByteArrayInputStream; 037import java.io.ByteArrayOutputStream; 038import java.io.DataInput; 039import java.io.DataOutput; 040import java.io.IOException; 041import java.util.ArrayList; 042import java.util.HashMap; 043import java.util.List; 044import java.util.Map; 045 046//TODO javadoc 047public class LiteWorkflowInstance implements Writable, WorkflowInstance { 048 private static final String TRANSITION_TO = "transition.to"; 049 050 private XLog log = XLog.getLog(getClass()); 051 052 private static String PATH_SEPARATOR = "/"; 053 private static String ROOT = PATH_SEPARATOR; 054 private static String TRANSITION_SEPARATOR = "#"; 055 056 private static class NodeInstance { 057 String nodeName; 058 boolean started = false; 059 060 private NodeInstance(String nodeName) { 061 this.nodeName = nodeName; 062 } 063 } 064 065 private class Context implements NodeHandler.Context { 066 private NodeDef nodeDef; 067 private String executionPath; 068 private String exitState; 069 private Status status = Status.RUNNING; 070 071 private Context(NodeDef nodeDef, String executionPath, String exitState) { 072 this.nodeDef = nodeDef; 073 this.executionPath = executionPath; 074 this.exitState = exitState; 075 } 076 077 public NodeDef getNodeDef() { 078 return nodeDef; 079 } 080 081 public String getExecutionPath() { 082 return executionPath; 083 } 084 085 public String getParentExecutionPath(String executionPath) { 086 return LiteWorkflowInstance.getParentPath(executionPath); 087 } 088 089 public String getSignalValue() { 090 return exitState; 091 } 092 093 public String createExecutionPath(String name) { 094 return LiteWorkflowInstance.createChildPath(executionPath, name); 095 } 096 097 public String createFullTransition(String executionPath, String transition) { 098 return LiteWorkflowInstance.createFullTransition(executionPath, transition); 099 } 100 101 public void deleteExecutionPath() { 102 if (!executionPaths.containsKey(executionPath)) { 103 throw new IllegalStateException(); 104 } 105 executionPaths.remove(executionPath); 106 executionPath = LiteWorkflowInstance.getParentPath(executionPath); 107 } 108 109 public void failJob() { 110 status = Status.FAILED; 111 } 112 113 public void killJob() { 114 status = Status.KILLED; 115 } 116 117 public void completeJob() { 118 status = Status.SUCCEEDED; 119 } 120 121 @Override 122 public Object getTransientVar(String name) { 123 return LiteWorkflowInstance.this.getTransientVar(name); 124 } 125 126 @Override 127 public String getVar(String name) { 128 return LiteWorkflowInstance.this.getVar(name); 129 } 130 131 @Override 132 public void setTransientVar(String name, Object value) { 133 LiteWorkflowInstance.this.setTransientVar(name, value); 134 } 135 136 @Override 137 public void setVar(String name, String value) { 138 LiteWorkflowInstance.this.setVar(name, value); 139 } 140 141 @Override 142 public LiteWorkflowInstance getProcessInstance() { 143 return LiteWorkflowInstance.this; 144 } 145 146 } 147 148 private LiteWorkflowApp def; 149 private Configuration conf; 150 private String instanceId; 151 private Status status; 152 private Map<String, NodeInstance> executionPaths = new HashMap<String, NodeInstance>(); 153 private Map<String, String> persistentVars = new HashMap<String, String>(); 154 private Map<String, Object> transientVars = new HashMap<String, Object>(); 155 156 protected LiteWorkflowInstance() { 157 log = XLog.getLog(getClass()); 158 } 159 160 public LiteWorkflowInstance(LiteWorkflowApp def, Configuration conf, String instanceId) { 161 this(); 162 this.def = ParamChecker.notNull(def, "def"); 163 this.instanceId = ParamChecker.notNull(instanceId, "instanceId"); 164 this.conf = ParamChecker.notNull(conf, "conf"); 165 refreshLog(); 166 status = Status.PREP; 167 } 168 169 public synchronized boolean start() throws WorkflowException { 170 if (status != Status.PREP) { 171 throw new WorkflowException(ErrorCode.E0719); 172 } 173 log.debug(XLog.STD, "Starting job"); 174 status = Status.RUNNING; 175 executionPaths.put(ROOT, new NodeInstance(StartNodeDef.START)); 176 return signal(ROOT, StartNodeDef.START); 177 } 178 179 //todo if suspended store signal and use when resuming 180 181 public synchronized boolean signal(String executionPath, String signalValue) throws WorkflowException { 182 ParamChecker.notEmpty(executionPath, "executionPath"); 183 ParamChecker.notNull(signalValue, "signalValue"); 184 185 if (status != Status.RUNNING) { 186 throw new WorkflowException(ErrorCode.E0716); 187 } 188 189 NodeInstance nodeJob = executionPaths.get(executionPath); 190 log.debug(XLog.STD, "Signaling job execution path [{0}] signal value [{1}] for node [{2}]", executionPath, 191 signalValue, (nodeJob == null ? null : nodeJob.nodeName)); 192 if (nodeJob == null) { 193 status = Status.FAILED; 194 log.error("invalid execution path [{0}]", executionPath); 195 } 196 197 NodeDef nodeDef = null; 198 if (!status.isEndState()) { 199 nodeDef = def.getNode(nodeJob.nodeName); 200 if (nodeDef == null) { 201 status = Status.FAILED; 202 log.error("invalid transition [{0}]", nodeJob.nodeName); 203 } 204 } 205 206 if (!status.isEndState()) { 207 NodeHandler nodeHandler = newInstance(nodeDef.getHandlerClass()); 208 boolean exiting = true; 209 210 Context context = new Context(nodeDef, executionPath, signalValue); 211 if (!nodeJob.started) { 212 try { 213 nodeHandler.loopDetection(context); 214 exiting = nodeHandler.enter(context); 215 nodeJob.started = true; 216 } 217 catch (WorkflowException ex) { 218 status = Status.FAILED; 219 List<String> killedNodes = terminateNodes(Status.KILLED); 220 if (killedNodes.size() > 1) { 221 log.warn(XLog.STD, "Workflow completed [{0}], killing [{1}] running nodes", status, killedNodes 222 .size()); 223 } 224 throw ex; 225 } 226 } 227 228 if (exiting) { 229 List<String> pathsToStart = new ArrayList<String>(); 230 List<String> fullTransitions; 231 try { 232 fullTransitions = nodeHandler.multiExit(context); 233 int last = fullTransitions.size() - 1; 234 // TEST THIS 235 if (last >= 0) { 236 String transitionTo = getTransitionNode(fullTransitions.get(last)); 237 if (nodeDef instanceof ForkNodeDef) { 238 transitionTo = "*"; // WF action cannot hold all transitions for a fork. 239 // transitions are hardcoded in the WF app. 240 } 241 persistentVars.put(nodeDef.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + TRANSITION_TO, 242 transitionTo); 243 } 244 } 245 catch (WorkflowException ex) { 246 status = Status.FAILED; 247 throw ex; 248 } 249 250 if (context.status == Status.KILLED) { 251 status = Status.KILLED; 252 log.debug(XLog.STD, "Completing job, kill node [{0}]", nodeJob.nodeName); 253 } else if (context.status == Status.FAILED) { 254 status = Status.FAILED; 255 log.debug(XLog.STD, "Completing job, fail node [{0}]", nodeJob.nodeName); 256 } else if (context.status == Status.SUCCEEDED) { 257 status = Status.SUCCEEDED; 258 log.debug(XLog.STD, "Completing job, end node [{0}]", nodeJob.nodeName); 259 } else { 260 for (String fullTransition : fullTransitions) { 261 //this is the whole trick for forking, we need the executionpath and the transition. 262 //in case of no forking, last element of executionpath is different from transition. 263 //in case of forking, they are the same 264 265 log.debug(XLog.STD, "Exiting node [{0}] with transition[{1}]", nodeJob.nodeName, 266 fullTransition); 267 268 String execPathFromTransition = getExecutionPath(fullTransition); 269 String transition = getTransitionNode(fullTransition); 270 def.validateTransition(nodeJob.nodeName, transition); 271 272 NodeInstance nodeJobInPath = executionPaths.get(execPathFromTransition); 273 if ((nodeJobInPath == null) || (!transition.equals(nodeJobInPath.nodeName))) { 274 // TODO explain this IF better 275 // If the WfJob is signaled with the parent 276 // execution executionPath again 277 // The Fork node will execute again.. and replace 278 // the Node WorkflowJobBean 279 // so this is required to prevent that.. 280 // Question : Should we throw an error in this case 281 // ?? 282 executionPaths.put(execPathFromTransition, new NodeInstance(transition)); 283 pathsToStart.add(execPathFromTransition); 284 } 285 286 } 287 288 // signal all new synch transitions 289 for (String pathToStart : pathsToStart) { 290 signal(pathToStart, "::synch::"); 291 } 292 } 293 } 294 } 295 296 if (status.isEndState()) { 297 if (status == Status.FAILED) { 298 List<String> failedNodes = terminateNodes(status); 299 log.warn(XLog.STD, "Workflow completed [{0}], failing [{1}] running nodes", status, failedNodes 300 .size()); 301 } 302 else { 303 List<String> killedNodes = terminateNodes(Status.KILLED); 304 305 if (killedNodes.size() > 1) { 306 log.warn(XLog.STD, "Workflow completed [{0}], killing [{1}] running nodes", status, killedNodes 307 .size()); 308 } 309 } 310 } 311 312 return status.isEndState(); 313 } 314 315 /** 316 * Get NodeDef from workflow instance 317 * @param executionPath execution path 318 * @return node def 319 */ 320 public NodeDef getNodeDef(String executionPath) { 321 NodeInstance nodeJob = executionPaths.get(executionPath); 322 NodeDef nodeDef = null; 323 if (nodeJob == null) { 324 log.error("invalid execution path [{0}]", executionPath); 325 } 326 else { 327 nodeDef = def.getNode(nodeJob.nodeName); 328 if (nodeDef == null) { 329 log.error("invalid transition [{0}]", nodeJob.nodeName); 330 } 331 } 332 return nodeDef; 333 } 334 335 public synchronized void fail(String nodeName) throws WorkflowException { 336 if (status.isEndState()) { 337 throw new WorkflowException(ErrorCode.E0718); 338 } 339 String failedNode = failNode(nodeName); 340 if (failedNode != null) { 341 log.warn(XLog.STD, "Workflow Failed. Failing node [{0}]", failedNode); 342 } 343 else { 344 //TODO failed attempting to fail the action. EXCEPTION 345 } 346 List<String> killedNodes = killNodes(); 347 if (killedNodes.size() > 1) { 348 log.warn(XLog.STD, "Workflow Failed, killing [{0}] nodes", killedNodes.size()); 349 } 350 status = Status.FAILED; 351 } 352 353 public synchronized void kill() throws WorkflowException { 354 if (status.isEndState()) { 355 throw new WorkflowException(ErrorCode.E0718); 356 } 357 log.debug(XLog.STD, "Killing job"); 358 List<String> killedNodes = killNodes(); 359 if (killedNodes.size() > 1) { 360 log.warn(XLog.STD, "workflow killed, killing [{0}] nodes", killedNodes.size()); 361 } 362 status = Status.KILLED; 363 } 364 365 public synchronized void suspend() throws WorkflowException { 366 if (status != Status.RUNNING) { 367 throw new WorkflowException(ErrorCode.E0716); 368 } 369 log.debug(XLog.STD, "Suspending job"); 370 this.status = Status.SUSPENDED; 371 } 372 373 public boolean isSuspended() { 374 return (status == Status.SUSPENDED); 375 } 376 377 public synchronized void resume() throws WorkflowException { 378 if (status != Status.SUSPENDED) { 379 throw new WorkflowException(ErrorCode.E0717); 380 } 381 log.debug(XLog.STD, "Resuming job"); 382 status = Status.RUNNING; 383 } 384 385 public void setVar(String name, String value) { 386 if (value != null) { 387 persistentVars.put(name, value); 388 } 389 else { 390 persistentVars.remove(name); 391 } 392 } 393 394 @Override 395 public Map<String, String> getAllVars() { 396 return persistentVars; 397 } 398 399 @Override 400 public void setAllVars(Map<String, String> varMap) { 401 persistentVars.putAll(varMap); 402 } 403 404 public String getVar(String name) { 405 return persistentVars.get(name); 406 } 407 408 409 public void setTransientVar(String name, Object value) { 410 if (value != null) { 411 transientVars.put(name, value); 412 } 413 else { 414 transientVars.remove(name); 415 } 416 } 417 418 public boolean hasTransientVar(String name) { 419 return transientVars.containsKey(name); 420 } 421 422 public Object getTransientVar(String name) { 423 return transientVars.get(name); 424 } 425 426 public boolean hasEnded() { 427 return status.isEndState(); 428 } 429 430 private List<String> terminateNodes(Status endStatus) { 431 List<String> endNodes = new ArrayList<String>(); 432 for (Map.Entry<String, NodeInstance> entry : executionPaths.entrySet()) { 433 if (entry.getValue().started) { 434 NodeDef nodeDef = def.getNode(entry.getValue().nodeName); 435 if (!(nodeDef instanceof ControlNodeDef)) { 436 NodeHandler nodeHandler = newInstance(nodeDef.getHandlerClass()); 437 try { 438 if (endStatus == Status.KILLED) { 439 nodeHandler.kill(new Context(nodeDef, entry.getKey(), null)); 440 } 441 else { 442 if (endStatus == Status.FAILED) { 443 nodeHandler.fail(new Context(nodeDef, entry.getKey(), null)); 444 } 445 } 446 endNodes.add(nodeDef.getName()); 447 } 448 catch (Exception ex) { 449 log.warn(XLog.STD, "Error Changing node state to [{0}] for Node [{1}]", endStatus.toString(), 450 nodeDef.getName(), ex); 451 } 452 } 453 } 454 } 455 return endNodes; 456 } 457 458 private String failNode(String nodeName) { 459 String failedNode = null; 460 for (Map.Entry<String, NodeInstance> entry : executionPaths.entrySet()) { 461 String node = entry.getKey(); 462 NodeInstance nodeInstance = entry.getValue(); 463 if (nodeInstance.started && nodeInstance.nodeName.equals(nodeName)) { 464 NodeDef nodeDef = def.getNode(nodeInstance.nodeName); 465 NodeHandler nodeHandler = newInstance(nodeDef.getHandlerClass()); 466 try { 467 nodeHandler.fail(new Context(nodeDef, node, null)); 468 failedNode = nodeDef.getName(); 469 nodeInstance.started = false; 470 } 471 catch (Exception ex) { 472 log.warn(XLog.STD, "Error failing node [{0}]", nodeDef.getName(), ex); 473 } 474 return failedNode; 475 } 476 } 477 return failedNode; 478 } 479 480 private List<String> killNodes() { 481 List<String> killedNodes = new ArrayList<String>(); 482 for (Map.Entry<String, NodeInstance> entry : executionPaths.entrySet()) { 483 String node = entry.getKey(); 484 NodeInstance nodeInstance = entry.getValue(); 485 if (nodeInstance.started) { 486 NodeDef nodeDef = def.getNode(nodeInstance.nodeName); 487 NodeHandler nodeHandler = newInstance(nodeDef.getHandlerClass()); 488 try { 489 nodeHandler.kill(new Context(nodeDef, node, null)); 490 killedNodes.add(nodeDef.getName()); 491 } 492 catch (Exception ex) { 493 log.warn(XLog.STD, "Error killing node [{0}]", nodeDef.getName(), ex); 494 } 495 } 496 } 497 return killedNodes; 498 } 499 500 public LiteWorkflowApp getProcessDefinition() { 501 return def; 502 } 503 504 private static String createChildPath(String path, String child) { 505 return path + child + PATH_SEPARATOR; 506 } 507 508 private static String getParentPath(String path) { 509 path = path.substring(0, path.length() - 1); 510 return (path.length() == 0) ? null : path.substring(0, path.lastIndexOf(PATH_SEPARATOR) + 1); 511 } 512 513 private static String createFullTransition(String executionPath, String transition) { 514 return executionPath + TRANSITION_SEPARATOR + transition; 515 } 516 517 private static String getExecutionPath(String fullTransition) { 518 int index = fullTransition.indexOf(TRANSITION_SEPARATOR); 519 if (index == -1) { 520 throw new IllegalArgumentException("Invalid fullTransition"); 521 } 522 return fullTransition.substring(0, index); 523 } 524 525 private static String getTransitionNode(String fullTransition) { 526 int index = fullTransition.indexOf(TRANSITION_SEPARATOR); 527 if (index == -1) { 528 throw new IllegalArgumentException("Invalid fullTransition"); 529 } 530 return fullTransition.substring(index + 1); 531 } 532 533 private NodeHandler newInstance(Class<? extends NodeHandler> handler) { 534 return (NodeHandler) ReflectionUtils.newInstance(handler, null); 535 } 536 537 private void refreshLog() { 538 XLog.Info.get().setParameter(XLogService.USER, conf.get(OozieClient.USER_NAME)); 539 XLog.Info.get().setParameter(XLogService.GROUP, conf.get(OozieClient.GROUP_NAME)); 540 XLog.Info.get().setParameter(DagXLogInfoService.APP, def.getName()); 541 XLog.Info.get().setParameter(DagXLogInfoService.TOKEN, conf.get(OozieClient.LOG_TOKEN, "")); 542 XLog.Info.get().setParameter(DagXLogInfoService.JOB, instanceId); 543 log = XLog.getLog(getClass()); 544 } 545 546 public Status getStatus() { 547 return status; 548 } 549 550 public void setStatus(Status status) { 551 this.status = status; 552 } 553 554 @Override 555 public void write(DataOutput dOut) throws IOException { 556 557 dOut.writeUTF(instanceId); 558 559 //Hadoop Configuration has to get its act right 560 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 561 conf.writeXml(baos); 562 baos.close(); 563 byte[] array = baos.toByteArray(); 564 dOut.writeInt(array.length); 565 dOut.write(array); 566 567 def.write(dOut); 568 dOut.writeUTF(status.toString()); 569 dOut.writeInt(executionPaths.size()); 570 for (Map.Entry<String, NodeInstance> entry : executionPaths.entrySet()) { 571 dOut.writeUTF(entry.getKey()); 572 dOut.writeUTF(entry.getValue().nodeName); 573 dOut.writeBoolean(entry.getValue().started); 574 } 575 dOut.writeInt(persistentVars.size()); 576 for (Map.Entry<String, String> entry : persistentVars.entrySet()) { 577 dOut.writeUTF(entry.getKey()); 578 StringSerializationUtil.writeString(dOut, entry.getValue()); 579 } 580 } 581 582 @Override 583 public void readFields(DataInput dIn) throws IOException { 584 instanceId = dIn.readUTF(); 585 586 //Hadoop Configuration has to get its act right 587 int len = dIn.readInt(); 588 byte[] array = new byte[len]; 589 dIn.readFully(array); 590 ByteArrayInputStream bais = new ByteArrayInputStream(array); 591 conf = new XConfiguration(bais); 592 593 def = new LiteWorkflowApp(); 594 def.readFields(dIn); 595 status = Status.valueOf(dIn.readUTF()); 596 int numExPaths = dIn.readInt(); 597 for (int x = 0; x < numExPaths; x++) { 598 String path = dIn.readUTF(); 599 String nodeName = dIn.readUTF(); 600 boolean isStarted = dIn.readBoolean(); 601 NodeInstance nodeInstance = new NodeInstance(nodeName); 602 nodeInstance.started = isStarted; 603 executionPaths.put(path, nodeInstance); 604 } 605 int numVars = dIn.readInt(); 606 for (int x = 0; x < numVars; x++) { 607 String vName = dIn.readUTF(); 608 persistentVars.put(vName, StringSerializationUtil.readString(dIn)); 609 } 610 refreshLog(); 611 } 612 613 @Override 614 public Configuration getConf() { 615 return conf; 616 } 617 618 @Override 619 public WorkflowApp getApp() { 620 return def; 621 } 622 623 @Override 624 public String getId() { 625 return instanceId; 626 } 627 628 @Override 629 public String getTransition(String node) { 630 return persistentVars.get(node + WorkflowInstance.NODE_VAR_SEPARATOR + TRANSITION_TO); 631 } 632 633 @Override 634 public boolean equals(Object o) { 635 return (o != null) && (getClass().isInstance(o)) && ((WorkflowInstance) o).getId().equals(instanceId); 636 } 637 638 @Override 639 public int hashCode() { 640 return instanceId.hashCode(); 641 } 642}