001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.workflow.lite; 019 020 import org.apache.oozie.service.XLogService; 021 import org.apache.oozie.service.DagXLogInfoService; 022 import org.apache.oozie.client.OozieClient; 023 import org.apache.hadoop.io.Writable; 024 import org.apache.hadoop.util.ReflectionUtils; 025 import org.apache.hadoop.conf.Configuration; 026 import org.apache.oozie.workflow.WorkflowApp; 027 import org.apache.oozie.workflow.WorkflowException; 028 import org.apache.oozie.workflow.WorkflowInstance; 029 import org.apache.oozie.util.ParamChecker; 030 import org.apache.oozie.util.XLog; 031 import org.apache.oozie.util.XConfiguration; 032 import org.apache.oozie.ErrorCode; 033 034 import java.io.DataInput; 035 import java.io.DataOutput; 036 import java.io.IOException; 037 import java.io.ByteArrayOutputStream; 038 import java.io.ByteArrayInputStream; 039 import java.util.ArrayList; 040 import java.util.HashMap; 041 import java.util.List; 042 import java.util.Map; 043 044 //TODO javadoc 045 public class LiteWorkflowInstance implements Writable, WorkflowInstance { 046 private static final String TRANSITION_TO = "transition.to"; 047 048 private XLog log; 049 050 private static String PATH_SEPARATOR = "/"; 051 private static String ROOT = PATH_SEPARATOR; 052 private static String TRANSITION_SEPARATOR = "#"; 053 054 private static class NodeInstance { 055 String nodeName; 056 boolean started = false; 057 058 private NodeInstance(String nodeName) { 059 this.nodeName = nodeName; 060 } 061 } 062 063 private class Context implements NodeHandler.Context { 064 private NodeDef nodeDef; 065 private String executionPath; 066 private String exitState; 067 private Status status = Status.RUNNING; 068 069 private Context(NodeDef nodeDef, String executionPath, String exitState) { 070 this.nodeDef = nodeDef; 071 this.executionPath = executionPath; 072 this.exitState = exitState; 073 } 074 075 public NodeDef getNodeDef() { 076 return nodeDef; 077 } 078 079 public String getExecutionPath() { 080 return executionPath; 081 } 082 083 public String getParentExecutionPath(String executionPath) { 084 return LiteWorkflowInstance.getParentPath(executionPath); 085 } 086 087 public String getSignalValue() { 088 return exitState; 089 } 090 091 public String createExecutionPath(String name) { 092 return LiteWorkflowInstance.createChildPath(executionPath, name); 093 } 094 095 public String createFullTransition(String executionPath, String transition) { 096 return LiteWorkflowInstance.createFullTransition(executionPath, transition); 097 } 098 099 public void deleteExecutionPath() { 100 if (!executionPaths.containsKey(executionPath)) { 101 throw new IllegalStateException(); 102 } 103 executionPaths.remove(executionPath); 104 executionPath = LiteWorkflowInstance.getParentPath(executionPath); 105 } 106 107 public void failJob() { 108 status = Status.FAILED; 109 } 110 111 public void killJob() { 112 status = Status.KILLED; 113 } 114 115 public void completeJob() { 116 status = Status.SUCCEEDED; 117 } 118 119 @Override 120 public Object getTransientVar(String name) { 121 return LiteWorkflowInstance.this.getTransientVar(name); 122 } 123 124 @Override 125 public String getVar(String name) { 126 return LiteWorkflowInstance.this.getVar(name); 127 } 128 129 @Override 130 public void setTransientVar(String name, Object value) { 131 LiteWorkflowInstance.this.setTransientVar(name, value); 132 } 133 134 @Override 135 public void setVar(String name, String value) { 136 LiteWorkflowInstance.this.setVar(name, value); 137 } 138 139 @Override 140 public LiteWorkflowInstance getProcessInstance() { 141 return LiteWorkflowInstance.this; 142 } 143 144 } 145 146 private LiteWorkflowApp def; 147 private Configuration conf; 148 private String instanceId; 149 private Status status; 150 private Map<String, NodeInstance> executionPaths = new HashMap<String, NodeInstance>(); 151 private Map<String, String> persistentVars = new HashMap<String, String>(); 152 private Map<String, Object> transientVars = new HashMap<String, Object>(); 153 154 protected LiteWorkflowInstance() { 155 log = XLog.getLog(getClass()); 156 } 157 158 public LiteWorkflowInstance(LiteWorkflowApp def, Configuration conf, String instanceId) { 159 this(); 160 this.def = ParamChecker.notNull(def, "def"); 161 this.instanceId = ParamChecker.notNull(instanceId, "instanceId"); 162 this.conf = ParamChecker.notNull(conf, "conf"); 163 refreshLog(); 164 status = Status.PREP; 165 } 166 167 public synchronized boolean start() throws WorkflowException { 168 if (status != Status.PREP) { 169 throw new WorkflowException(ErrorCode.E0719); 170 } 171 log.debug(XLog.STD, "Starting job"); 172 status = Status.RUNNING; 173 executionPaths.put(ROOT, new NodeInstance(StartNodeDef.START)); 174 return signal(ROOT, StartNodeDef.START); 175 } 176 177 //todo if suspended store signal and use when resuming 178 179 public synchronized boolean signal(String executionPath, String signalValue) throws WorkflowException { 180 ParamChecker.notEmpty(executionPath, "executionPath"); 181 ParamChecker.notNull(signalValue, "signalValue"); 182 log.debug(XLog.STD, "Signaling job execution path [{0}] signal value [{1}]", executionPath, signalValue); 183 if (status != Status.RUNNING) { 184 throw new WorkflowException(ErrorCode.E0716); 185 } 186 NodeInstance nodeJob = executionPaths.get(executionPath); 187 if (nodeJob == null) { 188 status = Status.FAILED; 189 log.error("invalid execution path [{0}]", executionPath); 190 } 191 NodeDef nodeDef = null; 192 if (!status.isEndState()) { 193 nodeDef = def.getNode(nodeJob.nodeName); 194 if (nodeDef == null) { 195 status = Status.FAILED; 196 log.error("invalid transition [{0}]", nodeJob.nodeName); 197 } 198 } 199 if (!status.isEndState()) { 200 NodeHandler nodeHandler = newInstance(nodeDef.getHandlerClass()); 201 boolean exiting = true; 202 203 Context context = new Context(nodeDef, executionPath, signalValue); 204 if (!nodeJob.started) { 205 try { 206 nodeHandler.loopDetection(context); 207 exiting = nodeHandler.enter(context); 208 nodeJob.started = true; 209 } 210 catch (WorkflowException ex) { 211 status = Status.FAILED; 212 throw ex; 213 } 214 } 215 216 if (exiting) { 217 List<String> pathsToStart = new ArrayList<String>(); 218 List<String> fullTransitions; 219 try { 220 fullTransitions = nodeHandler.multiExit(context); 221 int last = fullTransitions.size() - 1; 222 // TEST THIS 223 if (last >= 0) { 224 String transitionTo = getTransitionNode(fullTransitions.get(last)); 225 if (nodeDef instanceof ForkNodeDef) { 226 transitionTo = "*"; // WF action cannot hold all transitions for a fork. 227 // transitions are hardcoded in the WF app. 228 } 229 persistentVars.put(nodeDef.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + TRANSITION_TO, 230 transitionTo); 231 } 232 } 233 catch (WorkflowException ex) { 234 status = Status.FAILED; 235 throw ex; 236 } 237 238 if (context.status == Status.KILLED) { 239 status = Status.KILLED; 240 log.debug(XLog.STD, "Completing job, kill node [{0}]", nodeJob.nodeName); 241 } 242 else { 243 if (context.status == Status.FAILED) { 244 status = Status.FAILED; 245 log.debug(XLog.STD, "Completing job, fail node [{0}]", nodeJob.nodeName); 246 } 247 else { 248 if (context.status == Status.SUCCEEDED) { 249 status = Status.SUCCEEDED; 250 log.debug(XLog.STD, "Completing job, end node [{0}]", nodeJob.nodeName); 251 } 252 /* 253 else if (context.status == Status.SUSPENDED) { 254 status = Status.SUSPENDED; 255 log.debug(XLog.STD, "Completing job, end node [{0}]", nodeJob.nodeName); 256 } 257 */ 258 else { 259 for (String fullTransition : fullTransitions) { 260 // this is the whole trick for forking, we need the 261 // executionpath and the transition 262 // in the case of no forking last element of 263 // executionpath is different from transition 264 // in the case of forking they are the same 265 266 log.debug(XLog.STD, "Exiting node [{0}] with transition[{1}]", nodeJob.nodeName, 267 fullTransition); 268 269 String execPathFromTransition = getExecutionPath(fullTransition); 270 String transition = getTransitionNode(fullTransition); 271 def.validateTransition(nodeJob.nodeName, transition); 272 273 NodeInstance nodeJobInPath = executionPaths.get(execPathFromTransition); 274 if ((nodeJobInPath == null) || (!transition.equals(nodeJobInPath.nodeName))) { 275 // TODO explain this IF better 276 // If the WfJob is signaled with the parent 277 // execution executionPath again 278 // The Fork node will execute again.. and replace 279 // the Node WorkflowJobBean 280 // so this is required to prevent that.. 281 // Question : Should we throw an error in this case 282 // ?? 283 executionPaths.put(execPathFromTransition, new NodeInstance(transition)); 284 pathsToStart.add(execPathFromTransition); 285 } 286 287 } 288 // signal all new synch transitions 289 for (String pathToStart : pathsToStart) { 290 signal(pathToStart, "::synch::"); 291 } 292 } 293 } 294 } 295 } 296 } 297 if (status.isEndState()) { 298 if (status == Status.FAILED) { 299 List<String> failedNodes = terminateNodes(status); 300 log.warn(XLog.STD, "Workflow completed [{0}], failing [{1}] running nodes", status, failedNodes 301 .size()); 302 } 303 else { 304 List<String> killedNodes = terminateNodes(Status.KILLED); 305 306 if (killedNodes.size() > 1) { 307 log.warn(XLog.STD, "Workflow completed [{0}], killing [{1}] running nodes", status, killedNodes 308 .size()); 309 } 310 } 311 } 312 return status.isEndState(); 313 } 314 315 /** 316 * Get NodeDef from workflow instance 317 * @param executionPath execution path 318 * @return node def 319 */ 320 public NodeDef getNodeDef(String executionPath) { 321 NodeInstance nodeJob = executionPaths.get(executionPath); 322 NodeDef nodeDef = null; 323 if (nodeJob == null) { 324 log.error("invalid execution path [{0}]", executionPath); 325 } 326 else { 327 nodeDef = def.getNode(nodeJob.nodeName); 328 if (nodeDef == null) { 329 log.error("invalid transition [{0}]", nodeJob.nodeName); 330 } 331 } 332 return nodeDef; 333 } 334 335 public synchronized void fail(String nodeName) throws WorkflowException { 336 if (status.isEndState()) { 337 throw new WorkflowException(ErrorCode.E0718); 338 } 339 String failedNode = failNode(nodeName); 340 if (failedNode != null) { 341 log.warn(XLog.STD, "Workflow Failed. Failing node [{0}]", failedNode); 342 } 343 else { 344 //TODO failed attempting to fail the action. EXCEPTION 345 } 346 List<String> killedNodes = killNodes(); 347 if (killedNodes.size() > 1) { 348 log.warn(XLog.STD, "Workflow Failed, killing [{0}] nodes", killedNodes.size()); 349 } 350 status = Status.FAILED; 351 } 352 353 public synchronized void kill() throws WorkflowException { 354 if (status.isEndState()) { 355 throw new WorkflowException(ErrorCode.E0718); 356 } 357 log.debug(XLog.STD, "Killing job"); 358 List<String> killedNodes = killNodes(); 359 if (killedNodes.size() > 1) { 360 log.warn(XLog.STD, "workflow killed, killing [{0}] nodes", killedNodes.size()); 361 } 362 status = Status.KILLED; 363 } 364 365 public synchronized void suspend() throws WorkflowException { 366 if (status != Status.RUNNING) { 367 throw new WorkflowException(ErrorCode.E0716); 368 } 369 log.debug(XLog.STD, "Suspending job"); 370 this.status = Status.SUSPENDED; 371 } 372 373 public boolean isSuspended() { 374 return (status == Status.SUSPENDED); 375 } 376 377 public synchronized void resume() throws WorkflowException { 378 if (status != Status.SUSPENDED) { 379 throw new WorkflowException(ErrorCode.E0717); 380 } 381 log.debug(XLog.STD, "Resuming job"); 382 status = Status.RUNNING; 383 } 384 385 public void setVar(String name, String value) { 386 if (value != null) { 387 persistentVars.put(name, value); 388 } 389 else { 390 persistentVars.remove(name); 391 } 392 } 393 394 @Override 395 public Map<String, String> getAllVars() { 396 return persistentVars; 397 } 398 399 @Override 400 public void setAllVars(Map<String, String> varMap) { 401 persistentVars.putAll(varMap); 402 } 403 404 public String getVar(String name) { 405 return persistentVars.get(name); 406 } 407 408 409 public void setTransientVar(String name, Object value) { 410 if (value != null) { 411 transientVars.put(name, value); 412 } 413 else { 414 transientVars.remove(name); 415 } 416 } 417 418 public boolean hasTransientVar(String name) { 419 return transientVars.containsKey(name); 420 } 421 422 public Object getTransientVar(String name) { 423 return transientVars.get(name); 424 } 425 426 public boolean hasEnded() { 427 return status.isEndState(); 428 } 429 430 private List<String> terminateNodes(Status endStatus) { 431 List<String> endNodes = new ArrayList<String>(); 432 for (Map.Entry<String, NodeInstance> entry : executionPaths.entrySet()) { 433 if (entry.getValue().started) { 434 NodeDef nodeDef = def.getNode(entry.getValue().nodeName); 435 if (!(nodeDef instanceof ControlNodeDef)) { 436 NodeHandler nodeHandler = newInstance(nodeDef.getHandlerClass()); 437 try { 438 if (endStatus == Status.KILLED) { 439 nodeHandler.kill(new Context(nodeDef, entry.getKey(), null)); 440 } 441 else { 442 if (endStatus == Status.FAILED) { 443 nodeHandler.fail(new Context(nodeDef, entry.getKey(), null)); 444 } 445 } 446 endNodes.add(nodeDef.getName()); 447 } 448 catch (Exception ex) { 449 log.warn(XLog.STD, "Error Changing node state to [{0}] for Node [{1}]", endStatus.toString(), 450 nodeDef.getName(), ex); 451 } 452 } 453 } 454 } 455 return endNodes; 456 } 457 458 private String failNode(String nodeName) { 459 String failedNode = null; 460 for (Map.Entry<String, NodeInstance> entry : executionPaths.entrySet()) { 461 String node = entry.getKey(); 462 NodeInstance nodeInstance = entry.getValue(); 463 if (nodeInstance.started && nodeInstance.nodeName.equals(nodeName)) { 464 NodeDef nodeDef = def.getNode(nodeInstance.nodeName); 465 NodeHandler nodeHandler = newInstance(nodeDef.getHandlerClass()); 466 try { 467 nodeHandler.fail(new Context(nodeDef, node, null)); 468 failedNode = nodeDef.getName(); 469 nodeInstance.started = false; 470 } 471 catch (Exception ex) { 472 log.warn(XLog.STD, "Error failing node [{0}]", nodeDef.getName(), ex); 473 } 474 return failedNode; 475 } 476 } 477 return failedNode; 478 } 479 480 private List<String> killNodes() { 481 List<String> killedNodes = new ArrayList<String>(); 482 for (Map.Entry<String, NodeInstance> entry : executionPaths.entrySet()) { 483 String node = entry.getKey(); 484 NodeInstance nodeInstance = entry.getValue(); 485 if (nodeInstance.started) { 486 NodeDef nodeDef = def.getNode(nodeInstance.nodeName); 487 NodeHandler nodeHandler = newInstance(nodeDef.getHandlerClass()); 488 try { 489 nodeHandler.kill(new Context(nodeDef, node, null)); 490 killedNodes.add(nodeDef.getName()); 491 } 492 catch (Exception ex) { 493 log.warn(XLog.STD, "Error killing node [{0}]", nodeDef.getName(), ex); 494 } 495 } 496 } 497 return killedNodes; 498 } 499 500 public LiteWorkflowApp getProcessDefinition() { 501 return def; 502 } 503 504 private static String createChildPath(String path, String child) { 505 return path + child + PATH_SEPARATOR; 506 } 507 508 private static String getParentPath(String path) { 509 path = path.substring(0, path.length() - 1); 510 return (path.length() == 0) ? null : path.substring(0, path.lastIndexOf(PATH_SEPARATOR) + 1); 511 } 512 513 private static String createFullTransition(String executionPath, String transition) { 514 return executionPath + TRANSITION_SEPARATOR + transition; 515 } 516 517 private static String getExecutionPath(String fullTransition) { 518 int index = fullTransition.indexOf(TRANSITION_SEPARATOR); 519 if (index == -1) { 520 throw new IllegalArgumentException("Invalid fullTransition"); 521 } 522 return fullTransition.substring(0, index); 523 } 524 525 private static String getTransitionNode(String fullTransition) { 526 int index = fullTransition.indexOf(TRANSITION_SEPARATOR); 527 if (index == -1) { 528 throw new IllegalArgumentException("Invalid fullTransition"); 529 } 530 return fullTransition.substring(index + 1); 531 } 532 533 private NodeHandler newInstance(Class<? extends NodeHandler> handler) { 534 return (NodeHandler) ReflectionUtils.newInstance(handler, null); 535 } 536 537 private void refreshLog() { 538 XLog.Info.get().setParameter(XLogService.USER, conf.get(OozieClient.USER_NAME)); 539 XLog.Info.get().setParameter(XLogService.GROUP, conf.get(OozieClient.GROUP_NAME)); 540 XLog.Info.get().setParameter(DagXLogInfoService.APP, def.getName()); 541 XLog.Info.get().setParameter(DagXLogInfoService.TOKEN, conf.get(OozieClient.LOG_TOKEN, "")); 542 XLog.Info.get().setParameter(DagXLogInfoService.JOB, instanceId); 543 log = XLog.getLog(getClass()); 544 } 545 546 public Status getStatus() { 547 return status; 548 } 549 550 public void setStatus(Status status) { 551 this.status = status; 552 } 553 554 @Override 555 public void write(DataOutput dOut) throws IOException { 556 dOut.writeUTF(instanceId); 557 558 //Hadoop Configuration has to get its act right 559 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 560 conf.writeXml(baos); 561 baos.close(); 562 byte[] array = baos.toByteArray(); 563 dOut.writeInt(array.length); 564 dOut.write(array); 565 566 def.write(dOut); 567 dOut.writeUTF(status.toString()); 568 dOut.writeInt(executionPaths.size()); 569 for (Map.Entry<String, NodeInstance> entry : executionPaths.entrySet()) { 570 dOut.writeUTF(entry.getKey()); 571 dOut.writeUTF(entry.getValue().nodeName); 572 dOut.writeBoolean(entry.getValue().started); 573 } 574 dOut.writeInt(persistentVars.size()); 575 for (Map.Entry<String, String> entry : persistentVars.entrySet()) { 576 dOut.writeUTF(entry.getKey()); 577 dOut.writeUTF(entry.getValue()); 578 } 579 } 580 581 @Override 582 public void readFields(DataInput dIn) throws IOException { 583 instanceId = dIn.readUTF(); 584 585 //Hadoop Configuration has to get its act right 586 int len = dIn.readInt(); 587 byte[] array = new byte[len]; 588 dIn.readFully(array); 589 ByteArrayInputStream bais = new ByteArrayInputStream(array); 590 conf = new XConfiguration(bais); 591 592 def = new LiteWorkflowApp(); 593 def.readFields(dIn); 594 status = Status.valueOf(dIn.readUTF()); 595 int numExPaths = dIn.readInt(); 596 for (int x = 0; x < numExPaths; x++) { 597 String path = dIn.readUTF(); 598 String nodeName = dIn.readUTF(); 599 boolean isStarted = dIn.readBoolean(); 600 NodeInstance nodeInstance = new NodeInstance(nodeName); 601 nodeInstance.started = isStarted; 602 executionPaths.put(path, nodeInstance); 603 } 604 int numVars = dIn.readInt(); 605 for (int x = 0; x < numVars; x++) { 606 String vName = dIn.readUTF(); 607 String vVal = dIn.readUTF(); 608 persistentVars.put(vName, vVal); 609 } 610 refreshLog(); 611 } 612 613 @Override 614 public Configuration getConf() { 615 return conf; 616 } 617 618 @Override 619 public WorkflowApp getApp() { 620 return def; 621 } 622 623 @Override 624 public String getId() { 625 return instanceId; 626 } 627 628 @Override 629 public String getTransition(String node) { 630 return persistentVars.get(node + WorkflowInstance.NODE_VAR_SEPARATOR + TRANSITION_TO); 631 } 632 633 @Override 634 public boolean equals(Object o) { 635 return (o != null) && (getClass().isInstance(o)) && ((WorkflowInstance) o).getId().equals(instanceId); 636 } 637 638 @Override 639 public int hashCode() { 640 return instanceId.hashCode(); 641 } 642 643 }