001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.workflow.lite; 019 020 import org.apache.oozie.service.XLogService; 021 import org.apache.oozie.service.DagXLogInfoService; 022 import org.apache.oozie.client.OozieClient; 023 import org.apache.hadoop.io.Writable; 024 import org.apache.hadoop.util.ReflectionUtils; 025 import org.apache.hadoop.conf.Configuration; 026 import org.apache.oozie.workflow.WorkflowApp; 027 import org.apache.oozie.workflow.WorkflowException; 028 import org.apache.oozie.workflow.WorkflowInstance; 029 import org.apache.oozie.util.ParamChecker; 030 import org.apache.oozie.util.XLog; 031 import org.apache.oozie.util.XConfiguration; 032 import org.apache.oozie.ErrorCode; 033 034 import java.io.DataInput; 035 import java.io.DataOutput; 036 import java.io.IOException; 037 import java.io.ByteArrayOutputStream; 038 import java.io.ByteArrayInputStream; 039 import java.util.ArrayList; 040 import java.util.HashMap; 041 import java.util.List; 042 import java.util.Map; 043 044 //TODO javadoc 045 public class LiteWorkflowInstance implements Writable, WorkflowInstance { 046 private static final String TRANSITION_TO = "transition.to"; 047 048 private XLog log; 049 050 private static String PATH_SEPARATOR = "/"; 051 private static String ROOT = PATH_SEPARATOR; 052 private static String TRANSITION_SEPARATOR = "#"; 053 054 private static class NodeInstance { 055 String nodeName; 056 boolean started = false; 057 058 private NodeInstance(String nodeName) { 059 this.nodeName = nodeName; 060 } 061 } 062 063 private class Context implements NodeHandler.Context { 064 private NodeDef nodeDef; 065 private String executionPath; 066 private String exitState; 067 private Status status = Status.RUNNING; 068 069 private Context(NodeDef nodeDef, String executionPath, String exitState) { 070 this.nodeDef = nodeDef; 071 this.executionPath = executionPath; 072 this.exitState = exitState; 073 } 074 075 public NodeDef getNodeDef() { 076 return nodeDef; 077 } 078 079 public String getExecutionPath() { 080 return executionPath; 081 } 082 083 public String getParentExecutionPath(String executionPath) { 084 return LiteWorkflowInstance.getParentPath(executionPath); 085 } 086 087 public String getSignalValue() { 088 return exitState; 089 } 090 091 public String createExecutionPath(String name) { 092 return LiteWorkflowInstance.createChildPath(executionPath, name); 093 } 094 095 public String createFullTransition(String executionPath, String transition) { 096 return LiteWorkflowInstance.createFullTransition(executionPath, transition); 097 } 098 099 public void deleteExecutionPath() { 100 if (!executionPaths.containsKey(executionPath)) { 101 throw new IllegalStateException(); 102 } 103 executionPaths.remove(executionPath); 104 executionPath = LiteWorkflowInstance.getParentPath(executionPath); 105 } 106 107 public void failJob() { 108 status = Status.FAILED; 109 } 110 111 public void killJob() { 112 status = Status.KILLED; 113 } 114 115 public void completeJob() { 116 status = Status.SUCCEEDED; 117 } 118 119 @Override 120 public Object getTransientVar(String name) { 121 return LiteWorkflowInstance.this.getTransientVar(name); 122 } 123 124 @Override 125 public String getVar(String name) { 126 return LiteWorkflowInstance.this.getVar(name); 127 } 128 129 @Override 130 public void setTransientVar(String name, Object value) { 131 LiteWorkflowInstance.this.setTransientVar(name, value); 132 } 133 134 @Override 135 public void setVar(String name, String value) { 136 LiteWorkflowInstance.this.setVar(name, value); 137 } 138 139 @Override 140 public LiteWorkflowInstance getProcessInstance() { 141 return LiteWorkflowInstance.this; 142 } 143 144 } 145 146 private LiteWorkflowApp def; 147 private Configuration conf; 148 private String instanceId; 149 private Status status; 150 private Map<String, NodeInstance> executionPaths = new HashMap<String, NodeInstance>(); 151 private Map<String, String> persistentVars = new HashMap<String, String>(); 152 private Map<String, Object> transientVars = new HashMap<String, Object>(); 153 154 protected LiteWorkflowInstance() { 155 log = XLog.getLog(getClass()); 156 } 157 158 public LiteWorkflowInstance(LiteWorkflowApp def, Configuration conf, String instanceId) { 159 this(); 160 this.def = ParamChecker.notNull(def, "def"); 161 this.instanceId = ParamChecker.notNull(instanceId, "instanceId"); 162 this.conf = ParamChecker.notNull(conf, "conf"); 163 refreshLog(); 164 status = Status.PREP; 165 } 166 167 public synchronized boolean start() throws WorkflowException { 168 if (status != Status.PREP) { 169 throw new WorkflowException(ErrorCode.E0719); 170 } 171 log.debug(XLog.STD, "Starting job"); 172 status = Status.RUNNING; 173 executionPaths.put(ROOT, new NodeInstance(StartNodeDef.START)); 174 return signal(ROOT, StartNodeDef.START); 175 } 176 177 //todo if suspended store signal and use when resuming 178 179 public synchronized boolean signal(String executionPath, String signalValue) throws WorkflowException { 180 ParamChecker.notEmpty(executionPath, "executionPath"); 181 ParamChecker.notNull(signalValue, "signalValue"); 182 log.debug(XLog.STD, "Signaling job execution path [{0}] signal value [{1}]", executionPath, signalValue); 183 if (status != Status.RUNNING) { 184 throw new WorkflowException(ErrorCode.E0716); 185 } 186 NodeInstance nodeJob = executionPaths.get(executionPath); 187 if (nodeJob == null) { 188 status = Status.FAILED; 189 log.error("invalid execution path [{0}]", executionPath); 190 } 191 NodeDef nodeDef = null; 192 if (!status.isEndState()) { 193 nodeDef = def.getNode(nodeJob.nodeName); 194 if (nodeDef == null) { 195 status = Status.FAILED; 196 log.error("invalid transition [{0}]", nodeJob.nodeName); 197 } 198 } 199 if (!status.isEndState()) { 200 NodeHandler nodeHandler = newInstance(nodeDef.getHandlerClass()); 201 boolean exiting = true; 202 203 Context context = new Context(nodeDef, executionPath, signalValue); 204 if (!nodeJob.started) { 205 try { 206 nodeHandler.loopDetection(context); 207 exiting = nodeHandler.enter(context); 208 nodeJob.started = true; 209 } 210 catch (WorkflowException ex) { 211 status = Status.FAILED; 212 throw ex; 213 } 214 } 215 216 if (exiting) { 217 List<String> pathsToStart = new ArrayList<String>(); 218 List<String> fullTransitions; 219 try { 220 fullTransitions = nodeHandler.multiExit(context); 221 int last = fullTransitions.size() - 1; 222 // TEST THIS 223 if (last >= 0) { 224 String transitionTo = getTransitionNode(fullTransitions.get(last)); 225 226 persistentVars.put(nodeDef.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + TRANSITION_TO, 227 transitionTo); 228 } 229 } 230 catch (WorkflowException ex) { 231 status = Status.FAILED; 232 throw ex; 233 } 234 235 if (context.status == Status.KILLED) { 236 status = Status.KILLED; 237 log.debug(XLog.STD, "Completing job, kill node [{0}]", nodeJob.nodeName); 238 } 239 else { 240 if (context.status == Status.FAILED) { 241 status = Status.FAILED; 242 log.debug(XLog.STD, "Completing job, fail node [{0}]", nodeJob.nodeName); 243 } 244 else { 245 if (context.status == Status.SUCCEEDED) { 246 status = Status.SUCCEEDED; 247 log.debug(XLog.STD, "Completing job, end node [{0}]", nodeJob.nodeName); 248 } 249 /* 250 else if (context.status == Status.SUSPENDED) { 251 status = Status.SUSPENDED; 252 log.debug(XLog.STD, "Completing job, end node [{0}]", nodeJob.nodeName); 253 } 254 */ 255 else { 256 for (String fullTransition : fullTransitions) { 257 // this is the whole trick for forking, we need the 258 // executionpath and the transition 259 // in the case of no forking last element of 260 // executionpath is different from transition 261 // in the case of forking they are the same 262 263 log.debug(XLog.STD, "Exiting node [{0}] with transition[{1}]", nodeJob.nodeName, 264 fullTransition); 265 266 String execPathFromTransition = getExecutionPath(fullTransition); 267 String transition = getTransitionNode(fullTransition); 268 def.validateTransition(nodeJob.nodeName, transition); 269 270 NodeInstance nodeJobInPath = executionPaths.get(execPathFromTransition); 271 if ((nodeJobInPath == null) || (!transition.equals(nodeJobInPath.nodeName))) { 272 // TODO explain this IF better 273 // If the WfJob is signaled with the parent 274 // execution executionPath again 275 // The Fork node will execute again.. and replace 276 // the Node WorkflowJobBean 277 // so this is required to prevent that.. 278 // Question : Should we throw an error in this case 279 // ?? 280 executionPaths.put(execPathFromTransition, new NodeInstance(transition)); 281 pathsToStart.add(execPathFromTransition); 282 } 283 284 } 285 // signal all new synch transitions 286 for (String pathToStart : pathsToStart) { 287 signal(pathToStart, "::synch::"); 288 } 289 } 290 } 291 } 292 } 293 } 294 if (status.isEndState()) { 295 if (status == Status.FAILED) { 296 List<String> failedNodes = terminateNodes(status); 297 log.warn(XLog.STD, "Workflow completed [{0}], failing [{1}] running nodes", status, failedNodes 298 .size()); 299 } 300 else { 301 List<String> killedNodes = terminateNodes(Status.KILLED); 302 if (killedNodes.size() > 1) { 303 log.warn(XLog.STD, "Workflow completed [{0}], killing [{1}] running nodes", status, killedNodes 304 .size()); 305 } 306 } 307 } 308 return status.isEndState(); 309 } 310 311 /** 312 * Get NodeDef from workflow instance 313 * @param executionPath execution path 314 * @return node def 315 */ 316 public NodeDef getNodeDef(String executionPath) { 317 NodeInstance nodeJob = executionPaths.get(executionPath); 318 NodeDef nodeDef = null; 319 if (nodeJob == null) { 320 log.error("invalid execution path [{0}]", executionPath); 321 } 322 else { 323 nodeDef = def.getNode(nodeJob.nodeName); 324 if (nodeDef == null) { 325 log.error("invalid transition [{0}]", nodeJob.nodeName); 326 } 327 } 328 return nodeDef; 329 } 330 331 public synchronized void fail(String nodeName) throws WorkflowException { 332 if (status.isEndState()) { 333 throw new WorkflowException(ErrorCode.E0718); 334 } 335 String failedNode = failNode(nodeName); 336 if (failedNode != null) { 337 log.warn(XLog.STD, "Workflow Failed. Failing node [{0}]", failedNode); 338 } 339 else { 340 //TODO failed attempting to fail the action. EXCEPTION 341 } 342 List<String> killedNodes = killNodes(); 343 if (killedNodes.size() > 1) { 344 log.warn(XLog.STD, "Workflow Failed, killing [{0}] nodes", killedNodes.size()); 345 } 346 status = Status.FAILED; 347 } 348 349 public synchronized void kill() throws WorkflowException { 350 if (status.isEndState()) { 351 throw new WorkflowException(ErrorCode.E0718); 352 } 353 log.debug(XLog.STD, "Killing job"); 354 List<String> killedNodes = killNodes(); 355 if (killedNodes.size() > 1) { 356 log.warn(XLog.STD, "workflow killed, killing [{0}] nodes", killedNodes.size()); 357 } 358 status = Status.KILLED; 359 } 360 361 public synchronized void suspend() throws WorkflowException { 362 if (status != Status.RUNNING) { 363 throw new WorkflowException(ErrorCode.E0716); 364 } 365 log.debug(XLog.STD, "Suspending job"); 366 this.status = Status.SUSPENDED; 367 } 368 369 public boolean isSuspended() { 370 return (status == Status.SUSPENDED); 371 } 372 373 public synchronized void resume() throws WorkflowException { 374 if (status != Status.SUSPENDED) { 375 throw new WorkflowException(ErrorCode.E0717); 376 } 377 log.debug(XLog.STD, "Resuming job"); 378 status = Status.RUNNING; 379 } 380 381 public void setVar(String name, String value) { 382 if (value != null) { 383 persistentVars.put(name, value); 384 } 385 else { 386 persistentVars.remove(name); 387 } 388 } 389 390 @Override 391 public Map<String, String> getAllVars() { 392 return persistentVars; 393 } 394 395 @Override 396 public void setAllVars(Map<String, String> varMap) { 397 persistentVars.putAll(varMap); 398 } 399 400 public String getVar(String name) { 401 return persistentVars.get(name); 402 } 403 404 405 public void setTransientVar(String name, Object value) { 406 if (value != null) { 407 transientVars.put(name, value); 408 } 409 else { 410 transientVars.remove(name); 411 } 412 } 413 414 public boolean hasTransientVar(String name) { 415 return transientVars.containsKey(name); 416 } 417 418 public Object getTransientVar(String name) { 419 return transientVars.get(name); 420 } 421 422 public boolean hasEnded() { 423 return status.isEndState(); 424 } 425 426 private List<String> terminateNodes(Status endStatus) { 427 List<String> endNodes = new ArrayList<String>(); 428 for (Map.Entry<String, NodeInstance> entry : executionPaths.entrySet()) { 429 if (entry.getValue().started) { 430 NodeDef nodeDef = def.getNode(entry.getValue().nodeName); 431 NodeHandler nodeHandler = newInstance(nodeDef.getHandlerClass()); 432 try { 433 if (endStatus == Status.KILLED) { 434 nodeHandler.kill(new Context(nodeDef, entry.getKey(), null)); 435 } 436 else { 437 if (endStatus == Status.FAILED) { 438 nodeHandler.fail(new Context(nodeDef, entry.getKey(), null)); 439 } 440 } 441 endNodes.add(nodeDef.getName()); 442 } 443 catch (Exception ex) { 444 log.warn(XLog.STD, "Error Changing node state to [{0}] for Node [{1}]", endStatus.toString(), 445 nodeDef.getName(), ex); 446 } 447 } 448 } 449 return endNodes; 450 } 451 452 private String failNode(String nodeName) { 453 String failedNode = null; 454 for (Map.Entry<String, NodeInstance> entry : executionPaths.entrySet()) { 455 String node = entry.getKey(); 456 NodeInstance nodeInstance = entry.getValue(); 457 if (nodeInstance.started && nodeInstance.nodeName.equals(nodeName)) { 458 NodeDef nodeDef = def.getNode(nodeInstance.nodeName); 459 NodeHandler nodeHandler = newInstance(nodeDef.getHandlerClass()); 460 try { 461 nodeHandler.fail(new Context(nodeDef, node, null)); 462 failedNode = nodeDef.getName(); 463 nodeInstance.started = false; 464 } 465 catch (Exception ex) { 466 log.warn(XLog.STD, "Error failing node [{0}]", nodeDef.getName(), ex); 467 } 468 return failedNode; 469 } 470 } 471 return failedNode; 472 } 473 474 private List<String> killNodes() { 475 List<String> killedNodes = new ArrayList<String>(); 476 for (Map.Entry<String, NodeInstance> entry : executionPaths.entrySet()) { 477 String node = entry.getKey(); 478 NodeInstance nodeInstance = entry.getValue(); 479 if (nodeInstance.started) { 480 NodeDef nodeDef = def.getNode(nodeInstance.nodeName); 481 NodeHandler nodeHandler = newInstance(nodeDef.getHandlerClass()); 482 try { 483 nodeHandler.kill(new Context(nodeDef, node, null)); 484 killedNodes.add(nodeDef.getName()); 485 } 486 catch (Exception ex) { 487 log.warn(XLog.STD, "Error killing node [{0}]", nodeDef.getName(), ex); 488 } 489 } 490 } 491 return killedNodes; 492 } 493 494 public LiteWorkflowApp getProcessDefinition() { 495 return def; 496 } 497 498 private static String createChildPath(String path, String child) { 499 return path + child + PATH_SEPARATOR; 500 } 501 502 private static String getParentPath(String path) { 503 path = path.substring(0, path.length() - 1); 504 return (path.length() == 0) ? null : path.substring(0, path.lastIndexOf(PATH_SEPARATOR) + 1); 505 } 506 507 private static String createFullTransition(String executionPath, String transition) { 508 return executionPath + TRANSITION_SEPARATOR + transition; 509 } 510 511 private static String getExecutionPath(String fullTransition) { 512 int index = fullTransition.indexOf(TRANSITION_SEPARATOR); 513 if (index == -1) { 514 throw new IllegalArgumentException("Invalid fullTransition"); 515 } 516 return fullTransition.substring(0, index); 517 } 518 519 private static String getTransitionNode(String fullTransition) { 520 int index = fullTransition.indexOf(TRANSITION_SEPARATOR); 521 if (index == -1) { 522 throw new IllegalArgumentException("Invalid fullTransition"); 523 } 524 return fullTransition.substring(index + 1); 525 } 526 527 private NodeHandler newInstance(Class<? extends NodeHandler> handler) { 528 return (NodeHandler) ReflectionUtils.newInstance(handler, null); 529 } 530 531 private void refreshLog() { 532 XLog.Info.get().setParameter(XLogService.USER, conf.get(OozieClient.USER_NAME)); 533 XLog.Info.get().setParameter(XLogService.GROUP, conf.get(OozieClient.GROUP_NAME)); 534 XLog.Info.get().setParameter(DagXLogInfoService.APP, def.getName()); 535 XLog.Info.get().setParameter(DagXLogInfoService.TOKEN, conf.get(OozieClient.LOG_TOKEN, "")); 536 XLog.Info.get().setParameter(DagXLogInfoService.JOB, instanceId); 537 log = XLog.getLog(getClass()); 538 } 539 540 public Status getStatus() { 541 return status; 542 } 543 544 public void setStatus(Status status) { 545 this.status = status; 546 } 547 548 @Override 549 public void write(DataOutput dOut) throws IOException { 550 dOut.writeUTF(instanceId); 551 552 //Hadoop Configuration has to get its act right 553 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 554 conf.writeXml(baos); 555 baos.close(); 556 byte[] array = baos.toByteArray(); 557 dOut.writeInt(array.length); 558 dOut.write(array); 559 560 def.write(dOut); 561 dOut.writeUTF(status.toString()); 562 dOut.writeInt(executionPaths.size()); 563 for (Map.Entry<String, NodeInstance> entry : executionPaths.entrySet()) { 564 dOut.writeUTF(entry.getKey()); 565 dOut.writeUTF(entry.getValue().nodeName); 566 dOut.writeBoolean(entry.getValue().started); 567 } 568 dOut.writeInt(persistentVars.size()); 569 for (Map.Entry<String, String> entry : persistentVars.entrySet()) { 570 dOut.writeUTF(entry.getKey()); 571 dOut.writeUTF(entry.getValue()); 572 } 573 } 574 575 @Override 576 public void readFields(DataInput dIn) throws IOException { 577 instanceId = dIn.readUTF(); 578 579 //Hadoop Configuration has to get its act right 580 int len = dIn.readInt(); 581 byte[] array = new byte[len]; 582 dIn.readFully(array); 583 ByteArrayInputStream bais = new ByteArrayInputStream(array); 584 conf = new XConfiguration(bais); 585 586 def = new LiteWorkflowApp(); 587 def.readFields(dIn); 588 status = Status.valueOf(dIn.readUTF()); 589 int numExPaths = dIn.readInt(); 590 for (int x = 0; x < numExPaths; x++) { 591 String path = dIn.readUTF(); 592 String nodeName = dIn.readUTF(); 593 boolean isStarted = dIn.readBoolean(); 594 NodeInstance nodeInstance = new NodeInstance(nodeName); 595 nodeInstance.started = isStarted; 596 executionPaths.put(path, nodeInstance); 597 } 598 int numVars = dIn.readInt(); 599 for (int x = 0; x < numVars; x++) { 600 String vName = dIn.readUTF(); 601 String vVal = dIn.readUTF(); 602 persistentVars.put(vName, vVal); 603 } 604 refreshLog(); 605 } 606 607 @Override 608 public Configuration getConf() { 609 return conf; 610 } 611 612 @Override 613 public WorkflowApp getApp() { 614 return def; 615 } 616 617 @Override 618 public String getId() { 619 return instanceId; 620 } 621 622 @Override 623 public String getTransition(String node) { 624 return persistentVars.get(node + WorkflowInstance.NODE_VAR_SEPARATOR + TRANSITION_TO); 625 } 626 627 @Override 628 public boolean equals(Object o) { 629 return (o != null) && (getClass().isInstance(o)) && ((WorkflowInstance) o).getId().equals(instanceId); 630 } 631 632 @Override 633 public int hashCode() { 634 return instanceId.hashCode(); 635 } 636 637 }