001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.workflow.lite; 019 020 import org.apache.oozie.workflow.WorkflowException; 021 import org.apache.oozie.util.IOUtils; 022 import org.apache.oozie.util.XmlUtils; 023 import org.apache.oozie.util.ParamChecker; 024 import org.apache.oozie.util.ParameterVerifier; 025 import org.apache.oozie.util.ParameterVerifierException; 026 import org.apache.oozie.ErrorCode; 027 import org.apache.oozie.action.ActionExecutor; 028 import org.apache.oozie.service.Services; 029 import org.apache.oozie.service.ActionService; 030 import org.apache.hadoop.conf.Configuration; 031 import org.jdom.Element; 032 import org.jdom.JDOMException; 033 import org.jdom.Namespace; 034 import org.xml.sax.SAXException; 035 036 import javax.xml.transform.stream.StreamSource; 037 import javax.xml.validation.Schema; 038 import javax.xml.validation.Validator; 039 import java.io.IOException; 040 import java.io.Reader; 041 import java.io.StringReader; 042 import java.io.StringWriter; 043 import java.util.ArrayList; 044 import java.util.Arrays; 045 import java.util.Deque; 046 import java.util.HashMap; 047 import java.util.HashSet; 048 import java.util.LinkedList; 049 import java.util.List; 050 import java.util.Map; 051 052 /** 053 * Class to parse and validate workflow xml 054 */ 055 public class LiteWorkflowAppParser { 056 057 private static final String DECISION_E = "decision"; 058 private static final String ACTION_E = "action"; 059 private static final String END_E = "end"; 060 private static final String START_E = "start"; 061 private static final String JOIN_E = "join"; 062 private static final String FORK_E = "fork"; 063 private static final Object KILL_E = "kill"; 064 065 private static final String SLA_INFO = "info"; 066 private static final String CREDENTIALS = "credentials"; 067 private static final String GLOBAL = "global"; 068 private static final String PARAMETERS = "parameters"; 069 070 private static final String NAME_A = "name"; 071 private static final String CRED_A = "cred"; 072 private static final String USER_RETRY_MAX_A = "retry-max"; 073 private static final String USER_RETRY_INTERVAL_A = "retry-interval"; 074 private static final String TO_A = "to"; 075 076 private static final String FORK_PATH_E = "path"; 077 private static final String FORK_START_A = "start"; 078 079 private static final String ACTION_OK_E = "ok"; 080 private static final String ACTION_ERROR_E = "error"; 081 082 private static final String DECISION_SWITCH_E = "switch"; 083 private static final String DECISION_CASE_E = "case"; 084 private static final String DECISION_DEFAULT_E = "default"; 085 086 private static final String KILL_MESSAGE_E = "message"; 087 public static final String VALIDATE_FORK_JOIN = "oozie.validate.ForkJoin"; 088 public static final String WF_VALIDATE_FORK_JOIN = "oozie.wf.validate.ForkJoin"; 089 090 private Schema schema; 091 private Class<? extends ControlNodeHandler> controlNodeHandler; 092 private Class<? extends DecisionNodeHandler> decisionHandlerClass; 093 private Class<? extends ActionNodeHandler> actionHandlerClass; 094 095 private static enum VisitStatus { 096 VISITING, VISITED 097 } 098 099 private List<String> forkList = new ArrayList<String>(); 100 private List<String> joinList = new ArrayList<String>(); 101 private StartNodeDef startNode; 102 private List<String> visitedOkNodes = new ArrayList<String>(); 103 private List<String> visitedJoinNodes = new ArrayList<String>(); 104 105 public LiteWorkflowAppParser(Schema schema, 106 Class<? extends ControlNodeHandler> controlNodeHandler, 107 Class<? extends DecisionNodeHandler> decisionHandlerClass, 108 Class<? extends ActionNodeHandler> actionHandlerClass) throws WorkflowException { 109 this.schema = schema; 110 this.controlNodeHandler = controlNodeHandler; 111 this.decisionHandlerClass = decisionHandlerClass; 112 this.actionHandlerClass = actionHandlerClass; 113 } 114 115 /** 116 * Parse and validate xml to {@link LiteWorkflowApp} 117 * 118 * @param reader 119 * @return LiteWorkflowApp 120 * @throws WorkflowException 121 */ 122 public LiteWorkflowApp validateAndParse(Reader reader, Configuration jobConf) throws WorkflowException { 123 try { 124 StringWriter writer = new StringWriter(); 125 IOUtils.copyCharStream(reader, writer); 126 String strDef = writer.toString(); 127 128 if (schema != null) { 129 Validator validator = schema.newValidator(); 130 validator.validate(new StreamSource(new StringReader(strDef))); 131 } 132 133 Element wfDefElement = XmlUtils.parseXml(strDef); 134 ParameterVerifier.verifyParameters(jobConf, wfDefElement); 135 LiteWorkflowApp app = parse(strDef, wfDefElement); 136 Map<String, VisitStatus> traversed = new HashMap<String, VisitStatus>(); 137 traversed.put(app.getNode(StartNodeDef.START).getName(), VisitStatus.VISITING); 138 validate(app, app.getNode(StartNodeDef.START), traversed); 139 //Validate whether fork/join are in pair or not 140 if (jobConf.getBoolean(WF_VALIDATE_FORK_JOIN, true) && Services.get().getConf().getBoolean(VALIDATE_FORK_JOIN, true)) { 141 validateForkJoin(app); 142 } 143 return app; 144 } 145 catch (ParameterVerifierException ex) { 146 throw new WorkflowException(ex); 147 } 148 catch (JDOMException ex) { 149 throw new WorkflowException(ErrorCode.E0700, ex.getMessage(), ex); 150 } 151 catch (SAXException ex) { 152 throw new WorkflowException(ErrorCode.E0701, ex.getMessage(), ex); 153 } 154 catch (IOException ex) { 155 throw new WorkflowException(ErrorCode.E0702, ex.getMessage(), ex); 156 } 157 } 158 159 /** 160 * Validate whether fork/join are in pair or not 161 * @param app LiteWorkflowApp 162 * @throws WorkflowException 163 */ 164 private void validateForkJoin(LiteWorkflowApp app) throws WorkflowException { 165 // Make sure the number of forks and joins in wf are equal 166 if (forkList.size() != joinList.size()) { 167 throw new WorkflowException(ErrorCode.E0730); 168 } 169 170 // No need to bother going through all of this if there are no fork/join nodes 171 if (!forkList.isEmpty()) { 172 visitedOkNodes.clear(); 173 visitedJoinNodes.clear(); 174 validateForkJoin(startNode, app, new LinkedList<String>(), new LinkedList<String>(), new LinkedList<String>(), true); 175 } 176 } 177 178 /* 179 * Recursively walk through the DAG and make sure that all fork paths are valid. 180 * This should be called from validateForkJoin(LiteWorkflowApp app). It assumes that visitedOkNodes and visitedJoinNodes are 181 * both empty ArrayLists on the first call. 182 * 183 * @param node the current node; use the startNode on the first call 184 * @param app the WorkflowApp 185 * @param forkNodes a stack of the current fork nodes 186 * @param joinNodes a stack of the current join nodes 187 * @param path a stack of the current path 188 * @param okTo false if node (or an ancestor of node) was gotten to via an "error to" transition or via a join node that has 189 * already been visited at least once before 190 * @throws WorkflowException 191 */ 192 private void validateForkJoin(NodeDef node, LiteWorkflowApp app, Deque<String> forkNodes, Deque<String> joinNodes, 193 Deque<String> path, boolean okTo) throws WorkflowException { 194 if (path.contains(node.getName())) { 195 // cycle 196 throw new WorkflowException(ErrorCode.E0741, node.getName(), Arrays.toString(path.toArray())); 197 } 198 path.push(node.getName()); 199 200 // Make sure that we're not revisiting a node (that's not a Kill, Join, or End type) that's been visited before from an 201 // "ok to" transition; if its from an "error to" transition, then its okay to visit it multiple times. Also, because we 202 // traverse through join nodes multiple times, we have to make sure not to throw an exception here when we're really just 203 // re-walking the same execution path (this is why we need the visitedJoinNodes list used later) 204 if (okTo && !(node instanceof KillNodeDef) && !(node instanceof JoinNodeDef) && !(node instanceof EndNodeDef)) { 205 if (visitedOkNodes.contains(node.getName())) { 206 throw new WorkflowException(ErrorCode.E0743, node.getName()); 207 } 208 visitedOkNodes.add(node.getName()); 209 } 210 211 if (node instanceof StartNodeDef) { 212 String transition = node.getTransitions().get(0); // start always has only 1 transition 213 NodeDef tranNode = app.getNode(transition); 214 validateForkJoin(tranNode, app, forkNodes, joinNodes, path, okTo); 215 } 216 else if (node instanceof ActionNodeDef) { 217 String transition = node.getTransitions().get(0); // "ok to" transition 218 NodeDef tranNode = app.getNode(transition); 219 validateForkJoin(tranNode, app, forkNodes, joinNodes, path, okTo); // propogate okTo 220 transition = node.getTransitions().get(1); // "error to" transition 221 tranNode = app.getNode(transition); 222 validateForkJoin(tranNode, app, forkNodes, joinNodes, path, false); // use false 223 } 224 else if (node instanceof DecisionNodeDef) { 225 for(String transition : (new HashSet<String>(node.getTransitions()))) { 226 NodeDef tranNode = app.getNode(transition); 227 validateForkJoin(tranNode, app, forkNodes, joinNodes, path, okTo); 228 } 229 } 230 else if (node instanceof ForkNodeDef) { 231 forkNodes.push(node.getName()); 232 for(String transition : (new HashSet<String>(node.getTransitions()))) { 233 NodeDef tranNode = app.getNode(transition); 234 validateForkJoin(tranNode, app, forkNodes, joinNodes, path, okTo); 235 } 236 forkNodes.pop(); 237 if (!joinNodes.isEmpty()) { 238 joinNodes.pop(); 239 } 240 } 241 else if (node instanceof JoinNodeDef) { 242 if (forkNodes.isEmpty()) { 243 // no fork for join to match with 244 throw new WorkflowException(ErrorCode.E0742, node.getName()); 245 } 246 if (forkNodes.size() > joinNodes.size() && (joinNodes.isEmpty() || !joinNodes.peek().equals(node.getName()))) { 247 joinNodes.push(node.getName()); 248 } 249 if (!joinNodes.peek().equals(node.getName())) { 250 // join doesn't match fork 251 throw new WorkflowException(ErrorCode.E0732, forkNodes.peek(), node.getName(), joinNodes.peek()); 252 } 253 joinNodes.pop(); 254 String currentForkNode = forkNodes.pop(); 255 String transition = node.getTransitions().get(0); // join always has only 1 transition 256 NodeDef tranNode = app.getNode(transition); 257 // If we're already under a situation where okTo is false, use false (propogate it) 258 // Or if we've already visited this join node, use false (because we've already traversed this path before and we don't 259 // want to throw an exception from the check against visitedOkNodes) 260 if (!okTo || visitedJoinNodes.contains(node.getName())) { 261 validateForkJoin(tranNode, app, forkNodes, joinNodes, path, false); 262 // Else, use true because this is either the first time we've gone through this join node or okTo was already false 263 } else { 264 visitedJoinNodes.add(node.getName()); 265 validateForkJoin(tranNode, app, forkNodes, joinNodes, path, true); 266 } 267 forkNodes.push(currentForkNode); 268 joinNodes.push(node.getName()); 269 } 270 else if (node instanceof KillNodeDef) { 271 // do nothing 272 } 273 else if (node instanceof EndNodeDef) { 274 if (!forkNodes.isEmpty()) { 275 path.pop(); // = node 276 String parent = path.peek(); 277 // can't go to an end node in a fork 278 throw new WorkflowException(ErrorCode.E0737, parent, node.getName()); 279 } 280 } 281 else { 282 // invalid node type (shouldn't happen) 283 throw new WorkflowException(ErrorCode.E0740, node.getName()); 284 } 285 path.pop(); 286 } 287 288 /** 289 * Parse xml to {@link LiteWorkflowApp} 290 * 291 * @param strDef 292 * @param root 293 * @return LiteWorkflowApp 294 * @throws WorkflowException 295 */ 296 @SuppressWarnings({"unchecked", "ConstantConditions"}) 297 private LiteWorkflowApp parse(String strDef, Element root) throws WorkflowException { 298 Namespace ns = root.getNamespace(); 299 LiteWorkflowApp def = null; 300 Element global = null; 301 for (Element eNode : (List<Element>) root.getChildren()) { 302 if (eNode.getName().equals(START_E)) { 303 def = new LiteWorkflowApp(root.getAttributeValue(NAME_A), strDef, 304 new StartNodeDef(controlNodeHandler, eNode.getAttributeValue(TO_A))); 305 } 306 else { 307 if (eNode.getName().equals(END_E)) { 308 def.addNode(new EndNodeDef(eNode.getAttributeValue(NAME_A), controlNodeHandler)); 309 } 310 else { 311 if (eNode.getName().equals(KILL_E)) { 312 def.addNode(new KillNodeDef(eNode.getAttributeValue(NAME_A), 313 eNode.getChildText(KILL_MESSAGE_E, ns), controlNodeHandler)); 314 } 315 else { 316 if (eNode.getName().equals(FORK_E)) { 317 List<String> paths = new ArrayList<String>(); 318 for (Element tran : (List<Element>) eNode.getChildren(FORK_PATH_E, ns)) { 319 paths.add(tran.getAttributeValue(FORK_START_A)); 320 } 321 def.addNode(new ForkNodeDef(eNode.getAttributeValue(NAME_A), controlNodeHandler, paths)); 322 } 323 else { 324 if (eNode.getName().equals(JOIN_E)) { 325 def.addNode(new JoinNodeDef(eNode.getAttributeValue(NAME_A), controlNodeHandler, 326 eNode.getAttributeValue(TO_A))); 327 } 328 else { 329 if (eNode.getName().equals(DECISION_E)) { 330 Element eSwitch = eNode.getChild(DECISION_SWITCH_E, ns); 331 List<String> transitions = new ArrayList<String>(); 332 for (Element e : (List<Element>) eSwitch.getChildren(DECISION_CASE_E, ns)) { 333 transitions.add(e.getAttributeValue(TO_A)); 334 } 335 transitions.add(eSwitch.getChild(DECISION_DEFAULT_E, ns).getAttributeValue(TO_A)); 336 337 String switchStatement = XmlUtils.prettyPrint(eSwitch).toString(); 338 def.addNode(new DecisionNodeDef(eNode.getAttributeValue(NAME_A), switchStatement, decisionHandlerClass, 339 transitions)); 340 } 341 else { 342 if (ACTION_E.equals(eNode.getName())) { 343 String[] transitions = new String[2]; 344 Element eActionConf = null; 345 for (Element elem : (List<Element>) eNode.getChildren()) { 346 if (ACTION_OK_E.equals(elem.getName())) { 347 transitions[0] = elem.getAttributeValue(TO_A); 348 } 349 else { 350 if (ACTION_ERROR_E.equals(elem.getName())) { 351 transitions[1] = elem.getAttributeValue(TO_A); 352 } 353 else { 354 if (SLA_INFO.equals(elem.getName()) || CREDENTIALS.equals(elem.getName())) { 355 continue; 356 } 357 else { 358 eActionConf = elem; 359 handleGlobal(ns, global, elem); 360 } 361 } 362 } 363 } 364 365 String credStr = eNode.getAttributeValue(CRED_A); 366 String userRetryMaxStr = eNode.getAttributeValue(USER_RETRY_MAX_A); 367 String userRetryIntervalStr = eNode.getAttributeValue(USER_RETRY_INTERVAL_A); 368 369 String actionConf = XmlUtils.prettyPrint(eActionConf).toString(); 370 def.addNode(new ActionNodeDef(eNode.getAttributeValue(NAME_A), actionConf, actionHandlerClass, 371 transitions[0], transitions[1], credStr, 372 userRetryMaxStr, userRetryIntervalStr)); 373 } 374 else { 375 if (SLA_INFO.equals(eNode.getName()) || CREDENTIALS.equals(eNode.getName())) { 376 // No operation is required 377 } 378 else { 379 if (eNode.getName().equals(GLOBAL)) { 380 global = eNode; 381 } 382 else { 383 if (eNode.getName().equals(PARAMETERS)) { 384 // No operation is required 385 } 386 else { 387 throw new WorkflowException(ErrorCode.E0703, eNode.getName()); 388 } 389 } 390 } 391 } 392 } 393 } 394 } 395 } 396 } 397 } 398 } 399 return def; 400 } 401 402 /** 403 * Validate workflow xml 404 * 405 * @param app 406 * @param node 407 * @param traversed 408 * @throws WorkflowException 409 */ 410 private void validate(LiteWorkflowApp app, NodeDef node, Map<String, VisitStatus> traversed) throws WorkflowException { 411 if (node instanceof StartNodeDef) { 412 startNode = (StartNodeDef) node; 413 } 414 else { 415 try { 416 ParamChecker.validateActionName(node.getName()); 417 } 418 catch (IllegalArgumentException ex) { 419 throw new WorkflowException(ErrorCode.E0724, ex.getMessage()); 420 } 421 } 422 if (node instanceof ActionNodeDef) { 423 try { 424 Element action = XmlUtils.parseXml(node.getConf()); 425 boolean supportedAction = Services.get().get(ActionService.class).getExecutor(action.getName()) != null; 426 if (!supportedAction) { 427 throw new WorkflowException(ErrorCode.E0723, node.getName(), action.getName()); 428 } 429 } 430 catch (JDOMException ex) { 431 throw new RuntimeException("It should never happen, " + ex.getMessage(), ex); 432 } 433 } 434 435 if(node instanceof ForkNodeDef){ 436 forkList.add(node.getName()); 437 } 438 439 if(node instanceof JoinNodeDef){ 440 joinList.add(node.getName()); 441 } 442 443 if (node instanceof EndNodeDef) { 444 traversed.put(node.getName(), VisitStatus.VISITED); 445 return; 446 } 447 if (node instanceof KillNodeDef) { 448 traversed.put(node.getName(), VisitStatus.VISITED); 449 return; 450 } 451 for (String transition : node.getTransitions()) { 452 453 if (app.getNode(transition) == null) { 454 throw new WorkflowException(ErrorCode.E0708, node.getName(), transition); 455 } 456 457 //check if it is a cycle 458 if (traversed.get(app.getNode(transition).getName()) == VisitStatus.VISITING) { 459 throw new WorkflowException(ErrorCode.E0707, app.getNode(transition).getName()); 460 } 461 //ignore validated one 462 if (traversed.get(app.getNode(transition).getName()) == VisitStatus.VISITED) { 463 continue; 464 } 465 466 traversed.put(app.getNode(transition).getName(), VisitStatus.VISITING); 467 validate(app, app.getNode(transition), traversed); 468 } 469 traversed.put(node.getName(), VisitStatus.VISITED); 470 } 471 472 /** 473 * Handle the global section 474 * 475 * @param ns 476 * @param global 477 * @param eActionConf 478 * @throws WorkflowException 479 */ 480 481 private void handleGlobal(Namespace ns, Element global, Element eActionConf) throws WorkflowException { 482 483 // Use the action's namespace when getting children of the action (will be different than ns for extension actions) 484 Namespace actionNs = eActionConf.getNamespace(); 485 486 if (global != null) { 487 Element globalJobTracker = global.getChild("job-tracker", ns); 488 Element globalNameNode = global.getChild("name-node", ns); 489 List<Element> globalJobXml = global.getChildren("job-xml", ns); 490 Element globalConfiguration = global.getChild("configuration", ns); 491 492 if (globalJobTracker != null && eActionConf.getChild("job-tracker", actionNs) == null) { 493 Element jobTracker = new Element("job-tracker", actionNs); 494 jobTracker.setText(globalJobTracker.getText()); 495 eActionConf.addContent(jobTracker); 496 } 497 498 if (globalNameNode != null && eActionConf.getChild("name-node", actionNs) == null) { 499 Element nameNode = new Element("name-node", actionNs); 500 nameNode.setText(globalNameNode.getText()); 501 eActionConf.addContent(nameNode); 502 } 503 504 if (!globalJobXml.isEmpty()) { 505 List<Element> actionJobXml = eActionConf.getChildren("job-xml", actionNs); 506 for(Element jobXml: globalJobXml){ 507 boolean alreadyExists = false; 508 for(Element actionXml: actionJobXml){ 509 if(jobXml.getText().equals(actionXml.getText())){ 510 alreadyExists = true; 511 } 512 } 513 514 if (!alreadyExists){ 515 Element ejobXml = new Element("job-xml", actionNs); 516 ejobXml.setText(jobXml.getText()); 517 eActionConf.addContent(ejobXml); 518 } 519 520 } 521 } 522 523 if (globalConfiguration != null) { 524 Element actionConfiguration = eActionConf.getChild("configuration", actionNs); 525 if (actionConfiguration == null) { 526 actionConfiguration = new Element("configuration", actionNs); 527 eActionConf.addContent(actionConfiguration); 528 } 529 for (Element globalConfig : (List<Element>) globalConfiguration.getChildren()) { 530 boolean isSet = false; 531 String globalVarName = globalConfig.getChildText("name", ns); 532 for (Element local : (List<Element>) actionConfiguration.getChildren()) { 533 if (local.getChildText("name", actionNs).equals(globalVarName)) { 534 isSet = true; 535 } 536 } 537 if (!isSet) { 538 Element varToCopy = new Element("property", actionNs); 539 Element varName = new Element("name", actionNs); 540 Element varValue = new Element("value", actionNs); 541 542 varName.setText(globalConfig.getChildText("name", ns)); 543 varValue.setText(globalConfig.getChildText("value", ns)); 544 545 varToCopy.addContent(varName); 546 varToCopy.addContent(varValue); 547 548 actionConfiguration.addContent(varToCopy); 549 } 550 } 551 } 552 } 553 else { 554 ActionExecutor ae = Services.get().get(ActionService.class).getExecutor(eActionConf.getName()); 555 if (ae == null) { 556 throw new WorkflowException(ErrorCode.E0723, eActionConf.getName(), ActionService.class.getName()); 557 } 558 if (ae.requiresNNJT) { 559 560 if (eActionConf.getChild("name-node", actionNs) == null) { 561 throw new WorkflowException(ErrorCode.E0701, "No name-node defined"); 562 } 563 if (eActionConf.getChild("job-tracker", actionNs) == null) { 564 throw new WorkflowException(ErrorCode.E0701, "No job-tracker defined"); 565 } 566 } 567 } 568 } 569 570 }