001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.workflow.lite; 019 020import org.apache.oozie.workflow.WorkflowException; 021import org.apache.oozie.util.ELUtils; 022import org.apache.oozie.util.IOUtils; 023import org.apache.oozie.util.XConfiguration; 024import org.apache.oozie.util.XmlUtils; 025import org.apache.oozie.util.ParamChecker; 026import org.apache.oozie.util.ParameterVerifier; 027import org.apache.oozie.util.ParameterVerifierException; 028import org.apache.oozie.ErrorCode; 029import org.apache.oozie.action.ActionExecutor; 030import org.apache.oozie.service.Services; 031import org.apache.oozie.service.ActionService; 032import org.apache.commons.lang.StringUtils; 033import org.apache.hadoop.conf.Configuration; 034import org.jdom.Element; 035import org.jdom.JDOMException; 036import org.jdom.Namespace; 037import org.xml.sax.SAXException; 038 039import javax.xml.transform.stream.StreamSource; 040import javax.xml.validation.Schema; 041import javax.xml.validation.Validator; 042 043import java.io.IOException; 044import java.io.Reader; 045import java.io.StringReader; 046import java.io.StringWriter; 047import java.util.ArrayList; 048import java.util.Arrays; 049import java.util.Deque; 050import java.util.HashMap; 051import java.util.HashSet; 052import java.util.LinkedList; 053import java.util.List; 054import java.util.Map; 055 056/** 057 * Class to parse and validate workflow xml 058 */ 059public class LiteWorkflowAppParser { 060 061 private static final String DECISION_E = "decision"; 062 private static final String ACTION_E = "action"; 063 private static final String END_E = "end"; 064 private static final String START_E = "start"; 065 private static final String JOIN_E = "join"; 066 private static final String FORK_E = "fork"; 067 private static final Object KILL_E = "kill"; 068 069 private static final String SLA_INFO = "info"; 070 private static final String CREDENTIALS = "credentials"; 071 private static final String GLOBAL = "global"; 072 private static final String PARAMETERS = "parameters"; 073 074 private static final String NAME_A = "name"; 075 private static final String CRED_A = "cred"; 076 private static final String USER_RETRY_MAX_A = "retry-max"; 077 private static final String USER_RETRY_INTERVAL_A = "retry-interval"; 078 private static final String TO_A = "to"; 079 080 private static final String FORK_PATH_E = "path"; 081 private static final String FORK_START_A = "start"; 082 083 private static final String ACTION_OK_E = "ok"; 084 private static final String ACTION_ERROR_E = "error"; 085 086 private static final String DECISION_SWITCH_E = "switch"; 087 private static final String DECISION_CASE_E = "case"; 088 private static final String DECISION_DEFAULT_E = "default"; 089 090 private static final String KILL_MESSAGE_E = "message"; 091 public static final String VALIDATE_FORK_JOIN = "oozie.validate.ForkJoin"; 092 public static final String WF_VALIDATE_FORK_JOIN = "oozie.wf.validate.ForkJoin"; 093 094 private Schema schema; 095 private Class<? extends ControlNodeHandler> controlNodeHandler; 096 private Class<? extends DecisionNodeHandler> decisionHandlerClass; 097 private Class<? extends ActionNodeHandler> actionHandlerClass; 098 099 private static enum VisitStatus { 100 VISITING, VISITED 101 } 102 103 /** 104 * We use this to store a node name and its top (eldest) decision parent node name for the forkjoin validation 105 */ 106 class NodeAndTopDecisionParent { 107 String node; 108 String topDecisionParent; 109 110 public NodeAndTopDecisionParent(String node, String topDecisionParent) { 111 this.node = node; 112 this.topDecisionParent = topDecisionParent; 113 } 114 } 115 116 private List<String> forkList = new ArrayList<String>(); 117 private List<String> joinList = new ArrayList<String>(); 118 private StartNodeDef startNode; 119 private List<NodeAndTopDecisionParent> visitedOkNodes = new ArrayList<NodeAndTopDecisionParent>(); 120 private List<String> visitedJoinNodes = new ArrayList<String>(); 121 122 public LiteWorkflowAppParser(Schema schema, 123 Class<? extends ControlNodeHandler> controlNodeHandler, 124 Class<? extends DecisionNodeHandler> decisionHandlerClass, 125 Class<? extends ActionNodeHandler> actionHandlerClass) throws WorkflowException { 126 this.schema = schema; 127 this.controlNodeHandler = controlNodeHandler; 128 this.decisionHandlerClass = decisionHandlerClass; 129 this.actionHandlerClass = actionHandlerClass; 130 } 131 132 public LiteWorkflowApp validateAndParse(Reader reader, Configuration jobConf) throws WorkflowException { 133 return validateAndParse(reader, jobConf, null); 134 } 135 136 /** 137 * Parse and validate xml to {@link LiteWorkflowApp} 138 * 139 * @param reader 140 * @return LiteWorkflowApp 141 * @throws WorkflowException 142 */ 143 public LiteWorkflowApp validateAndParse(Reader reader, Configuration jobConf, Configuration configDefault) 144 throws WorkflowException { 145 try { 146 StringWriter writer = new StringWriter(); 147 IOUtils.copyCharStream(reader, writer); 148 String strDef = writer.toString(); 149 150 if (schema != null) { 151 Validator validator = schema.newValidator(); 152 validator.validate(new StreamSource(new StringReader(strDef))); 153 } 154 155 Element wfDefElement = XmlUtils.parseXml(strDef); 156 ParameterVerifier.verifyParameters(jobConf, wfDefElement); 157 LiteWorkflowApp app = parse(strDef, wfDefElement, configDefault, jobConf); 158 Map<String, VisitStatus> traversed = new HashMap<String, VisitStatus>(); 159 traversed.put(app.getNode(StartNodeDef.START).getName(), VisitStatus.VISITING); 160 validate(app, app.getNode(StartNodeDef.START), traversed); 161 //Validate whether fork/join are in pair or not 162 if (jobConf.getBoolean(WF_VALIDATE_FORK_JOIN, true) && Services.get().getConf().getBoolean(VALIDATE_FORK_JOIN, true)) { 163 validateForkJoin(app); 164 } 165 return app; 166 } 167 catch (ParameterVerifierException ex) { 168 throw new WorkflowException(ex); 169 } 170 catch (JDOMException ex) { 171 throw new WorkflowException(ErrorCode.E0700, ex.getMessage(), ex); 172 } 173 catch (SAXException ex) { 174 throw new WorkflowException(ErrorCode.E0701, ex.getMessage(), ex); 175 } 176 catch (IOException ex) { 177 throw new WorkflowException(ErrorCode.E0702, ex.getMessage(), ex); 178 } 179 } 180 181 /** 182 * Validate whether fork/join are in pair or not 183 * @param app LiteWorkflowApp 184 * @throws WorkflowException 185 */ 186 private void validateForkJoin(LiteWorkflowApp app) throws WorkflowException { 187 // Make sure the number of forks and joins in wf are equal 188 if (forkList.size() != joinList.size()) { 189 throw new WorkflowException(ErrorCode.E0730); 190 } 191 192 // No need to bother going through all of this if there are no fork/join nodes 193 if (!forkList.isEmpty()) { 194 visitedOkNodes.clear(); 195 visitedJoinNodes.clear(); 196 validateForkJoin(startNode, app, new LinkedList<String>(), new LinkedList<String>(), new LinkedList<String>(), true, 197 null); 198 } 199 } 200 201 /* 202 * Recursively walk through the DAG and make sure that all fork paths are valid. 203 * This should be called from validateForkJoin(LiteWorkflowApp app). It assumes that visitedOkNodes and visitedJoinNodes are 204 * both empty ArrayLists on the first call. 205 * 206 * @param node the current node; use the startNode on the first call 207 * @param app the WorkflowApp 208 * @param forkNodes a stack of the current fork nodes 209 * @param joinNodes a stack of the current join nodes 210 * @param path a stack of the current path 211 * @param okTo false if node (or an ancestor of node) was gotten to via an "error to" transition or via a join node that has 212 * already been visited at least once before 213 * @param topDecisionParent The top (eldest) decision node along the path to this node, or null if there isn't one 214 * @throws WorkflowException 215 */ 216 private void validateForkJoin(NodeDef node, LiteWorkflowApp app, Deque<String> forkNodes, Deque<String> joinNodes, 217 Deque<String> path, boolean okTo, String topDecisionParent) throws WorkflowException { 218 if (path.contains(node.getName())) { 219 // cycle 220 throw new WorkflowException(ErrorCode.E0741, node.getName(), Arrays.toString(path.toArray())); 221 } 222 path.push(node.getName()); 223 224 // Make sure that we're not revisiting a node (that's not a Kill, Join, or End type) that's been visited before from an 225 // "ok to" transition; if its from an "error to" transition, then its okay to visit it multiple times. Also, because we 226 // traverse through join nodes multiple times, we have to make sure not to throw an exception here when we're really just 227 // re-walking the same execution path (this is why we need the visitedJoinNodes list used later) 228 if (okTo && !(node instanceof KillNodeDef) && !(node instanceof JoinNodeDef) && !(node instanceof EndNodeDef)) { 229 NodeAndTopDecisionParent natdp = findInVisitedOkNodes(node.getName()); 230 if (natdp != null) { 231 // However, if we've visited the node and it's under a decision node, we may be seeing it again and it's only 232 // illegal if that decision node is not the same as what we're seeing now (because during execution we only go 233 // down one path of the decision node, so while we're seeing the node multiple times here, during runtime it will 234 // only be executed once). Also, this decision node should be the top (eldest) decision node. As null indicates 235 // that there isn't a decision node, when this happens they must both be null to be valid. Here is a good example 236 // to visualize a node ("actionX") that has three "ok to" paths to it, but should still be a valid workflow (it may 237 // be easier to see if you draw it): 238 // decisionA --> {actionX, decisionB} 239 // decisionB --> {actionX, actionY} 240 // actionY --> {actionX} 241 // And, if we visit this node twice under the same decision node in an invalid way, the path cycle checking code 242 // will catch it, so we don't have to worry about that here. 243 if ((natdp.topDecisionParent == null && topDecisionParent == null) 244 || (natdp.topDecisionParent == null && topDecisionParent != null) 245 || (natdp.topDecisionParent != null && topDecisionParent == null) 246 || !natdp.topDecisionParent.equals(topDecisionParent)) { 247 // If we get here, then we've seen this node before from an "ok to" transition but they don't have the same 248 // decision node top parent, which means that this node will be executed twice, which is illegal 249 throw new WorkflowException(ErrorCode.E0743, node.getName()); 250 } 251 } 252 else { 253 // If we haven't transitioned to this node before, add it and its top decision parent node 254 visitedOkNodes.add(new NodeAndTopDecisionParent(node.getName(), topDecisionParent)); 255 } 256 } 257 258 if (node instanceof StartNodeDef) { 259 String transition = node.getTransitions().get(0); // start always has only 1 transition 260 NodeDef tranNode = app.getNode(transition); 261 validateForkJoin(tranNode, app, forkNodes, joinNodes, path, okTo, topDecisionParent); 262 } 263 else if (node instanceof ActionNodeDef) { 264 String transition = node.getTransitions().get(0); // "ok to" transition 265 NodeDef tranNode = app.getNode(transition); 266 validateForkJoin(tranNode, app, forkNodes, joinNodes, path, okTo, topDecisionParent); // propogate okTo 267 transition = node.getTransitions().get(1); // "error to" transition 268 tranNode = app.getNode(transition); 269 validateForkJoin(tranNode, app, forkNodes, joinNodes, path, false, topDecisionParent); // use false 270 } 271 else if (node instanceof DecisionNodeDef) { 272 for(String transition : (new HashSet<String>(node.getTransitions()))) { 273 NodeDef tranNode = app.getNode(transition); 274 // if there currently isn't a topDecisionParent (i.e. null), then use this node instead of propagating null 275 String parentDecisionNode = topDecisionParent; 276 if (parentDecisionNode == null) { 277 parentDecisionNode = node.getName(); 278 } 279 validateForkJoin(tranNode, app, forkNodes, joinNodes, path, okTo, parentDecisionNode); 280 } 281 } 282 else if (node instanceof ForkNodeDef) { 283 forkNodes.push(node.getName()); 284 List<String> transitionsList = node.getTransitions(); 285 HashSet<String> transitionsSet = new HashSet<String>(transitionsList); 286 // Check that a fork doesn't go to the same node more than once 287 if (!transitionsList.isEmpty() && transitionsList.size() != transitionsSet.size()) { 288 // Now we have to figure out which node is the problem and what type of node they are (join and kill are ok) 289 for (int i = 0; i < transitionsList.size(); i++) { 290 String a = transitionsList.get(i); 291 NodeDef aNode = app.getNode(a); 292 if (!(aNode instanceof JoinNodeDef) && !(aNode instanceof KillNodeDef)) { 293 for (int k = i+1; k < transitionsList.size(); k++) { 294 String b = transitionsList.get(k); 295 if (a.equals(b)) { 296 throw new WorkflowException(ErrorCode.E0744, node.getName(), a); 297 } 298 } 299 } 300 } 301 } 302 for(String transition : transitionsSet) { 303 NodeDef tranNode = app.getNode(transition); 304 validateForkJoin(tranNode, app, forkNodes, joinNodes, path, okTo, topDecisionParent); 305 } 306 forkNodes.pop(); 307 if (!joinNodes.isEmpty()) { 308 joinNodes.pop(); 309 } 310 } 311 else if (node instanceof JoinNodeDef) { 312 if (forkNodes.isEmpty()) { 313 // no fork for join to match with 314 throw new WorkflowException(ErrorCode.E0742, node.getName()); 315 } 316 if (forkNodes.size() > joinNodes.size() && (joinNodes.isEmpty() || !joinNodes.peek().equals(node.getName()))) { 317 joinNodes.push(node.getName()); 318 } 319 if (!joinNodes.peek().equals(node.getName())) { 320 // join doesn't match fork 321 throw new WorkflowException(ErrorCode.E0732, forkNodes.peek(), node.getName(), joinNodes.peek()); 322 } 323 joinNodes.pop(); 324 String currentForkNode = forkNodes.pop(); 325 String transition = node.getTransitions().get(0); // join always has only 1 transition 326 NodeDef tranNode = app.getNode(transition); 327 // If we're already under a situation where okTo is false, use false (propogate it) 328 // Or if we've already visited this join node, use false (because we've already traversed this path before and we don't 329 // want to throw an exception from the check against visitedOkNodes) 330 if (!okTo || visitedJoinNodes.contains(node.getName())) { 331 validateForkJoin(tranNode, app, forkNodes, joinNodes, path, false, topDecisionParent); 332 // Else, use true because this is either the first time we've gone through this join node or okTo was already false 333 } else { 334 visitedJoinNodes.add(node.getName()); 335 validateForkJoin(tranNode, app, forkNodes, joinNodes, path, true, topDecisionParent); 336 } 337 forkNodes.push(currentForkNode); 338 joinNodes.push(node.getName()); 339 } 340 else if (node instanceof KillNodeDef) { 341 // do nothing 342 } 343 else if (node instanceof EndNodeDef) { 344 if (!forkNodes.isEmpty()) { 345 path.pop(); // = node 346 String parent = path.peek(); 347 // can't go to an end node in a fork 348 throw new WorkflowException(ErrorCode.E0737, parent, node.getName()); 349 } 350 } 351 else { 352 // invalid node type (shouldn't happen) 353 throw new WorkflowException(ErrorCode.E0740, node.getName()); 354 } 355 path.pop(); 356 } 357 358 /** 359 * Return a {@link NodeAndTopDecisionParent} whose {@link NodeAndTopDecisionParent#node} is equal to the passed in name, or null 360 * if it isn't in the {@link LiteWorkflowAppParser#visitedOkNodes} list. 361 * 362 * @param name The name to search for 363 * @return a NodeAndTopDecisionParent or null 364 */ 365 private NodeAndTopDecisionParent findInVisitedOkNodes(String name) { 366 NodeAndTopDecisionParent natdp = null; 367 for (NodeAndTopDecisionParent v : visitedOkNodes) { 368 if (v.node.equals(name)) { 369 natdp = v; 370 break; 371 } 372 } 373 return natdp; 374 } 375 376 /** 377 * Parse xml to {@link LiteWorkflowApp} 378 * 379 * @param strDef 380 * @param root 381 * @param configDefault 382 * @param jobConf 383 * @return LiteWorkflowApp 384 * @throws WorkflowException 385 */ 386 @SuppressWarnings({"unchecked"}) 387 private LiteWorkflowApp parse(String strDef, Element root, Configuration configDefault, Configuration jobConf) 388 throws WorkflowException { 389 Namespace ns = root.getNamespace(); 390 LiteWorkflowApp def = null; 391 Element global = null; 392 for (Element eNode : (List<Element>) root.getChildren()) { 393 if (eNode.getName().equals(START_E)) { 394 def = new LiteWorkflowApp(root.getAttributeValue(NAME_A), strDef, 395 new StartNodeDef(controlNodeHandler, eNode.getAttributeValue(TO_A))); 396 } 397 else { 398 if (eNode.getName().equals(END_E)) { 399 def.addNode(new EndNodeDef(eNode.getAttributeValue(NAME_A), controlNodeHandler)); 400 } 401 else { 402 if (eNode.getName().equals(KILL_E)) { 403 def.addNode(new KillNodeDef(eNode.getAttributeValue(NAME_A), 404 eNode.getChildText(KILL_MESSAGE_E, ns), controlNodeHandler)); 405 } 406 else { 407 if (eNode.getName().equals(FORK_E)) { 408 List<String> paths = new ArrayList<String>(); 409 for (Element tran : (List<Element>) eNode.getChildren(FORK_PATH_E, ns)) { 410 paths.add(tran.getAttributeValue(FORK_START_A)); 411 } 412 def.addNode(new ForkNodeDef(eNode.getAttributeValue(NAME_A), controlNodeHandler, paths)); 413 } 414 else { 415 if (eNode.getName().equals(JOIN_E)) { 416 def.addNode(new JoinNodeDef(eNode.getAttributeValue(NAME_A), controlNodeHandler, 417 eNode.getAttributeValue(TO_A))); 418 } 419 else { 420 if (eNode.getName().equals(DECISION_E)) { 421 Element eSwitch = eNode.getChild(DECISION_SWITCH_E, ns); 422 List<String> transitions = new ArrayList<String>(); 423 for (Element e : (List<Element>) eSwitch.getChildren(DECISION_CASE_E, ns)) { 424 transitions.add(e.getAttributeValue(TO_A)); 425 } 426 transitions.add(eSwitch.getChild(DECISION_DEFAULT_E, ns).getAttributeValue(TO_A)); 427 428 String switchStatement = XmlUtils.prettyPrint(eSwitch).toString(); 429 def.addNode(new DecisionNodeDef(eNode.getAttributeValue(NAME_A), switchStatement, decisionHandlerClass, 430 transitions)); 431 } 432 else { 433 if (ACTION_E.equals(eNode.getName())) { 434 String[] transitions = new String[2]; 435 Element eActionConf = null; 436 for (Element elem : (List<Element>) eNode.getChildren()) { 437 if (ACTION_OK_E.equals(elem.getName())) { 438 transitions[0] = elem.getAttributeValue(TO_A); 439 } 440 else { 441 if (ACTION_ERROR_E.equals(elem.getName())) { 442 transitions[1] = elem.getAttributeValue(TO_A); 443 } 444 else { 445 if (SLA_INFO.equals(elem.getName()) || CREDENTIALS.equals(elem.getName())) { 446 continue; 447 } 448 else { 449 eActionConf = elem; 450 handleGlobal(ns, global, configDefault, elem); 451 } 452 } 453 } 454 } 455 456 String credStr = eNode.getAttributeValue(CRED_A); 457 String userRetryMaxStr = eNode.getAttributeValue(USER_RETRY_MAX_A); 458 String userRetryIntervalStr = eNode.getAttributeValue(USER_RETRY_INTERVAL_A); 459 try { 460 if (!StringUtils.isEmpty(userRetryMaxStr)) { 461 userRetryMaxStr = ELUtils.resolveAppName(userRetryMaxStr, jobConf); 462 } 463 if (!StringUtils.isEmpty(userRetryIntervalStr)) { 464 userRetryIntervalStr = ELUtils.resolveAppName(userRetryIntervalStr, 465 jobConf); 466 } 467 } 468 catch (Exception e) { 469 throw new WorkflowException(ErrorCode.E0703, e.getMessage()); 470 } 471 472 String actionConf = XmlUtils.prettyPrint(eActionConf).toString(); 473 def.addNode(new ActionNodeDef(eNode.getAttributeValue(NAME_A), actionConf, actionHandlerClass, 474 transitions[0], transitions[1], credStr, 475 userRetryMaxStr, userRetryIntervalStr)); 476 } 477 else { 478 if (SLA_INFO.equals(eNode.getName()) || CREDENTIALS.equals(eNode.getName())) { 479 // No operation is required 480 } 481 else { 482 if (eNode.getName().equals(GLOBAL)) { 483 global = eNode; 484 } 485 else { 486 if (eNode.getName().equals(PARAMETERS)) { 487 // No operation is required 488 } 489 else { 490 throw new WorkflowException(ErrorCode.E0703, eNode.getName()); 491 } 492 } 493 } 494 } 495 } 496 } 497 } 498 } 499 } 500 } 501 } 502 return def; 503 } 504 505 /** 506 * Validate workflow xml 507 * 508 * @param app 509 * @param node 510 * @param traversed 511 * @throws WorkflowException 512 */ 513 private void validate(LiteWorkflowApp app, NodeDef node, Map<String, VisitStatus> traversed) throws WorkflowException { 514 if (node instanceof StartNodeDef) { 515 startNode = (StartNodeDef) node; 516 } 517 else { 518 try { 519 ParamChecker.validateActionName(node.getName()); 520 } 521 catch (IllegalArgumentException ex) { 522 throw new WorkflowException(ErrorCode.E0724, ex.getMessage()); 523 } 524 } 525 if (node instanceof ActionNodeDef) { 526 try { 527 Element action = XmlUtils.parseXml(node.getConf()); 528 boolean supportedAction = Services.get().get(ActionService.class).getExecutor(action.getName()) != null; 529 if (!supportedAction) { 530 throw new WorkflowException(ErrorCode.E0723, node.getName(), action.getName()); 531 } 532 } 533 catch (JDOMException ex) { 534 throw new RuntimeException("It should never happen, " + ex.getMessage(), ex); 535 } 536 } 537 538 if(node instanceof ForkNodeDef){ 539 forkList.add(node.getName()); 540 } 541 542 if(node instanceof JoinNodeDef){ 543 joinList.add(node.getName()); 544 } 545 546 if (node instanceof EndNodeDef) { 547 traversed.put(node.getName(), VisitStatus.VISITED); 548 return; 549 } 550 if (node instanceof KillNodeDef) { 551 traversed.put(node.getName(), VisitStatus.VISITED); 552 return; 553 } 554 for (String transition : node.getTransitions()) { 555 556 if (app.getNode(transition) == null) { 557 throw new WorkflowException(ErrorCode.E0708, node.getName(), transition); 558 } 559 560 //check if it is a cycle 561 if (traversed.get(app.getNode(transition).getName()) == VisitStatus.VISITING) { 562 throw new WorkflowException(ErrorCode.E0707, app.getNode(transition).getName()); 563 } 564 //ignore validated one 565 if (traversed.get(app.getNode(transition).getName()) == VisitStatus.VISITED) { 566 continue; 567 } 568 569 traversed.put(app.getNode(transition).getName(), VisitStatus.VISITING); 570 validate(app, app.getNode(transition), traversed); 571 } 572 traversed.put(node.getName(), VisitStatus.VISITED); 573 } 574 575 /** 576 * Handle the global section 577 * 578 * @param ns 579 * @param global 580 * @param eActionConf 581 * @throws WorkflowException 582 */ 583 584 @SuppressWarnings("unchecked") 585 private void handleGlobal(Namespace ns, Element global, Configuration configDefault, Element eActionConf) 586 throws WorkflowException { 587 588 // Use the action's namespace when getting children of the action (will 589 // be different than ns for extension actions) 590 Namespace actionNs = eActionConf.getNamespace(); 591 592 if (global != null) { 593 Element globalJobTracker = global.getChild("job-tracker", ns); 594 Element globalNameNode = global.getChild("name-node", ns); 595 List<Element> globalJobXml = global.getChildren("job-xml", ns); 596 Element globalConfiguration = global.getChild("configuration", ns); 597 598 if (globalJobTracker != null && eActionConf.getChild("job-tracker", actionNs) == null) { 599 Element jobTracker = new Element("job-tracker", actionNs); 600 jobTracker.setText(globalJobTracker.getText()); 601 eActionConf.addContent(jobTracker); 602 } 603 604 if (globalNameNode != null && eActionConf.getChild("name-node", actionNs) == null) { 605 Element nameNode = new Element("name-node", actionNs); 606 nameNode.setText(globalNameNode.getText()); 607 eActionConf.addContent(nameNode); 608 } 609 610 if (!globalJobXml.isEmpty()) { 611 List<Element> actionJobXml = eActionConf.getChildren("job-xml", actionNs); 612 for(Element jobXml: globalJobXml){ 613 boolean alreadyExists = false; 614 for(Element actionXml: actionJobXml){ 615 if(jobXml.getText().equals(actionXml.getText())){ 616 alreadyExists = true; 617 break; 618 } 619 } 620 621 if (!alreadyExists){ 622 Element ejobXml = new Element("job-xml", actionNs); 623 ejobXml.setText(jobXml.getText()); 624 eActionConf.addContent(ejobXml); 625 } 626 627 } 628 } 629 try { 630 XConfiguration actionConf = new XConfiguration(); 631 if (configDefault != null) 632 XConfiguration.copy(configDefault, actionConf); 633 if (globalConfiguration != null) { 634 Configuration globalConf = new XConfiguration(new StringReader(XmlUtils.prettyPrint( 635 globalConfiguration).toString())); 636 XConfiguration.copy(globalConf, actionConf); 637 } 638 Element actionConfiguration = eActionConf.getChild("configuration", actionNs); 639 if (actionConfiguration != null) { 640 //copy and override 641 XConfiguration.copy(new XConfiguration(new StringReader(XmlUtils.prettyPrint( 642 actionConfiguration).toString())), actionConf); 643 } 644 int position = eActionConf.indexOf(actionConfiguration); 645 eActionConf.removeContent(actionConfiguration); //replace with enhanced one 646 Element eConfXml = XmlUtils.parseXml(actionConf.toXmlString(false)); 647 eConfXml.detach(); 648 eConfXml.setNamespace(actionNs); 649 if (position > 0) { 650 eActionConf.addContent(position, eConfXml); 651 } 652 else { 653 eActionConf.addContent(eConfXml); 654 } 655 } 656 catch (IOException e) { 657 throw new WorkflowException(ErrorCode.E0700, "Error while processing action conf"); 658 } 659 catch (JDOMException e) { 660 throw new WorkflowException(ErrorCode.E0700, "Error while processing action conf"); 661 } 662 } 663 else { 664 ActionExecutor ae = Services.get().get(ActionService.class).getExecutor(eActionConf.getName()); 665 if (ae == null) { 666 throw new WorkflowException(ErrorCode.E0723, eActionConf.getName(), ActionService.class.getName()); 667 } 668 if (ae.requiresNNJT) { 669 670 if (eActionConf.getChild("name-node", actionNs) == null) { 671 throw new WorkflowException(ErrorCode.E0701, "No name-node defined"); 672 } 673 if (eActionConf.getChild("job-tracker", actionNs) == null) { 674 throw new WorkflowException(ErrorCode.E0701, "No job-tracker defined"); 675 } 676 } 677 } 678 } 679 680}