001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.workflow.lite; 019 020 import org.apache.oozie.workflow.WorkflowException; 021 import org.apache.oozie.util.IOUtils; 022 import org.apache.oozie.util.XmlUtils; 023 import org.apache.oozie.util.ParamChecker; 024 import org.apache.oozie.util.ParameterVerifier; 025 import org.apache.oozie.util.ParameterVerifierException; 026 import org.apache.oozie.ErrorCode; 027 import org.apache.oozie.action.ActionExecutor; 028 import org.apache.oozie.service.Services; 029 import org.apache.oozie.service.ActionService; 030 import org.apache.hadoop.conf.Configuration; 031 import org.jdom.Element; 032 import org.jdom.JDOMException; 033 import org.jdom.Namespace; 034 import org.xml.sax.SAXException; 035 036 import javax.xml.transform.stream.StreamSource; 037 import javax.xml.validation.Schema; 038 import javax.xml.validation.Validator; 039 import java.io.IOException; 040 import java.io.Reader; 041 import java.io.StringReader; 042 import java.io.StringWriter; 043 import java.util.ArrayList; 044 import java.util.HashMap; 045 import java.util.HashSet; 046 import java.util.List; 047 import java.util.Map; 048 049 /** 050 * Class to parse and validate workflow xml 051 */ 052 public class LiteWorkflowAppParser { 053 054 private static final String DECISION_E = "decision"; 055 private static final String ACTION_E = "action"; 056 private static final String END_E = "end"; 057 private static final String START_E = "start"; 058 private static final String JOIN_E = "join"; 059 private static final String FORK_E = "fork"; 060 private static final Object KILL_E = "kill"; 061 062 private static final String SLA_INFO = "info"; 063 private static final String CREDENTIALS = "credentials"; 064 private static final String GLOBAL = "global"; 065 private static final String PARAMETERS = "parameters"; 066 067 private static final String NAME_A = "name"; 068 private static final String CRED_A = "cred"; 069 private static final String USER_RETRY_MAX_A = "retry-max"; 070 private static final String USER_RETRY_INTERVAL_A = "retry-interval"; 071 private static final String TO_A = "to"; 072 073 private static final String FORK_PATH_E = "path"; 074 private static final String FORK_START_A = "start"; 075 076 private static final String ACTION_OK_E = "ok"; 077 private static final String ACTION_ERROR_E = "error"; 078 079 private static final String DECISION_SWITCH_E = "switch"; 080 private static final String DECISION_CASE_E = "case"; 081 private static final String DECISION_DEFAULT_E = "default"; 082 083 private static final String KILL_MESSAGE_E = "message"; 084 public static final String VALIDATE_FORK_JOIN = "oozie.validate.ForkJoin"; 085 086 private Schema schema; 087 private Class<? extends ControlNodeHandler> controlNodeHandler; 088 private Class<? extends DecisionNodeHandler> decisionHandlerClass; 089 private Class<? extends ActionNodeHandler> actionHandlerClass; 090 091 private static enum VisitStatus { 092 VISITING, VISITED 093 } 094 095 private List<String> forkList = new ArrayList<String>(); 096 private List<String> joinList = new ArrayList<String>(); 097 098 public LiteWorkflowAppParser(Schema schema, 099 Class<? extends ControlNodeHandler> controlNodeHandler, 100 Class<? extends DecisionNodeHandler> decisionHandlerClass, 101 Class<? extends ActionNodeHandler> actionHandlerClass) throws WorkflowException { 102 this.schema = schema; 103 this.controlNodeHandler = controlNodeHandler; 104 this.decisionHandlerClass = decisionHandlerClass; 105 this.actionHandlerClass = actionHandlerClass; 106 } 107 108 /** 109 * Parse and validate xml to {@link LiteWorkflowApp} 110 * 111 * @param reader 112 * @return LiteWorkflowApp 113 * @throws WorkflowException 114 */ 115 public LiteWorkflowApp validateAndParse(Reader reader, Configuration jobConf) throws WorkflowException { 116 try { 117 StringWriter writer = new StringWriter(); 118 IOUtils.copyCharStream(reader, writer); 119 String strDef = writer.toString(); 120 121 if (schema != null) { 122 Validator validator = schema.newValidator(); 123 validator.validate(new StreamSource(new StringReader(strDef))); 124 } 125 126 Element wfDefElement = XmlUtils.parseXml(strDef); 127 ParameterVerifier.verifyParameters(jobConf, wfDefElement); 128 LiteWorkflowApp app = parse(strDef, wfDefElement); 129 Map<String, VisitStatus> traversed = new HashMap<String, VisitStatus>(); 130 traversed.put(app.getNode(StartNodeDef.START).getName(), VisitStatus.VISITING); 131 validate(app, app.getNode(StartNodeDef.START), traversed); 132 //Validate whether fork/join are in pair or not 133 if (Services.get().getConf().getBoolean(VALIDATE_FORK_JOIN, true)) { 134 validateForkJoin(app); 135 } 136 return app; 137 } 138 catch (ParameterVerifierException ex) { 139 throw new WorkflowException(ex); 140 } 141 catch (JDOMException ex) { 142 throw new WorkflowException(ErrorCode.E0700, ex.getMessage(), ex); 143 } 144 catch (SAXException ex) { 145 throw new WorkflowException(ErrorCode.E0701, ex.getMessage(), ex); 146 } 147 catch (IOException ex) { 148 throw new WorkflowException(ErrorCode.E0702, ex.getMessage(), ex); 149 } 150 } 151 152 /** 153 * Validate whether fork/join are in pair or not 154 * @param app LiteWorkflowApp 155 * @throws WorkflowException 156 */ 157 private void validateForkJoin(LiteWorkflowApp app) throws WorkflowException { 158 // Make sure the number of forks and joins in wf are equal 159 if (forkList.size() != joinList.size()) { 160 throw new WorkflowException(ErrorCode.E0730); 161 } 162 163 while(!forkList.isEmpty()){ 164 // Make sure each of the fork node has a corresponding join; start with the root fork Node first 165 validateFork(app.getNode(forkList.remove(0)), app); 166 } 167 168 } 169 170 /* 171 * Test whether the fork node has a corresponding join 172 * @param node - the fork node 173 * @param app - the WorkflowApp 174 * @return 175 * @throws WorkflowException 176 */ 177 private NodeDef validateFork(NodeDef forkNode, LiteWorkflowApp app) throws WorkflowException { 178 List<String> transitions = new ArrayList<String>(forkNode.getTransitions()); 179 // list for keeping track of "error-to" transitions of Action Node 180 List<String> errorToTransitions = new ArrayList<String>(); 181 String joinNode = null; 182 for (int i = 0; i < transitions.size(); i++) { 183 NodeDef node = app.getNode(transitions.get(i)); 184 if (node instanceof DecisionNodeDef) { 185 // Make sure the transition is valid 186 validateTransition(errorToTransitions, transitions, app, node); 187 // Add each transition to transitions (once) if they are not a kill node 188 HashSet<String> decisionSet = new HashSet<String>(node.getTransitions()); 189 for (String ds : decisionSet) { 190 if (!(app.getNode(ds) instanceof KillNodeDef)) { 191 transitions.add(ds); 192 } 193 } 194 } else if (node instanceof ActionNodeDef) { 195 // Make sure the transition is valid 196 validateTransition(errorToTransitions, transitions, app, node); 197 // Add the "ok-to" transition of node if its not a kill node 198 String okTo = node.getTransitions().get(0); 199 if (!(app.getNode(okTo) instanceof KillNodeDef)) { 200 transitions.add(okTo); 201 } 202 String errorTo = node.getTransitions().get(1); 203 // Add the "error-to" transition if the transition is a Action Node 204 if (app.getNode(errorTo) instanceof ActionNodeDef) { 205 errorToTransitions.add(errorTo); 206 } 207 } else if (node instanceof ForkNodeDef) { 208 forkList.remove(node.getName()); 209 // Make a recursive call to resolve this fork node 210 NodeDef joinNd = validateFork(node, app); 211 // Make sure the transition is valid 212 validateTransition(errorToTransitions, transitions, app, node); 213 // Add the "ok-to" transition of node 214 transitions.add(joinNd.getTransitions().get(0)); 215 } else if (node instanceof JoinNodeDef) { 216 // If joinNode encountered for the first time, remove it from the joinList and remember it 217 String currentJoin = node.getName(); 218 if (joinList.contains(currentJoin)) { 219 joinList.remove(currentJoin); 220 joinNode = currentJoin; 221 } else { 222 // Make sure this join is the same as the join seen from the first time 223 if (joinNode == null) { 224 throw new WorkflowException(ErrorCode.E0733, forkNode); 225 } 226 if (!joinNode.equals(currentJoin)) { 227 throw new WorkflowException(ErrorCode.E0732, forkNode, joinNode); 228 } 229 } 230 } else { 231 throw new WorkflowException(ErrorCode.E0730); 232 } 233 } 234 return app.getNode(joinNode); 235 236 } 237 238 private void validateTransition(List<String> errorToTransitions, List<String> transitions, LiteWorkflowApp app, NodeDef node) 239 throws WorkflowException { 240 for (String transition : node.getTransitions()) { 241 // Make sure the transition node is not an end node 242 NodeDef tNode = app.getNode(transition); 243 if (tNode instanceof EndNodeDef) { 244 throw new WorkflowException(ErrorCode.E0737, node.getName(), transition, "end"); 245 } 246 // Make sure the transition node is either a join node or is not already visited 247 if (transitions.contains(transition) && !(tNode instanceof JoinNodeDef)) { 248 throw new WorkflowException(ErrorCode.E0734, node.getName(), transition); 249 } 250 // Make sure the transition node is not the same as an already visited 'error-to' transition 251 if (errorToTransitions.contains(transition)) { 252 throw new WorkflowException(ErrorCode.E0735, node.getName(), transition); 253 } 254 } 255 256 } 257 258 /** 259 * Parse xml to {@link LiteWorkflowApp} 260 * 261 * @param strDef 262 * @param root 263 * @return LiteWorkflowApp 264 * @throws WorkflowException 265 */ 266 @SuppressWarnings({"unchecked", "ConstantConditions"}) 267 private LiteWorkflowApp parse(String strDef, Element root) throws WorkflowException { 268 Namespace ns = root.getNamespace(); 269 LiteWorkflowApp def = null; 270 Element global = null; 271 for (Element eNode : (List<Element>) root.getChildren()) { 272 if (eNode.getName().equals(START_E)) { 273 def = new LiteWorkflowApp(root.getAttributeValue(NAME_A), strDef, 274 new StartNodeDef(controlNodeHandler, eNode.getAttributeValue(TO_A))); 275 } 276 else { 277 if (eNode.getName().equals(END_E)) { 278 def.addNode(new EndNodeDef(eNode.getAttributeValue(NAME_A), controlNodeHandler)); 279 } 280 else { 281 if (eNode.getName().equals(KILL_E)) { 282 def.addNode(new KillNodeDef(eNode.getAttributeValue(NAME_A), 283 eNode.getChildText(KILL_MESSAGE_E, ns), controlNodeHandler)); 284 } 285 else { 286 if (eNode.getName().equals(FORK_E)) { 287 List<String> paths = new ArrayList<String>(); 288 for (Element tran : (List<Element>) eNode.getChildren(FORK_PATH_E, ns)) { 289 paths.add(tran.getAttributeValue(FORK_START_A)); 290 } 291 def.addNode(new ForkNodeDef(eNode.getAttributeValue(NAME_A), controlNodeHandler, paths)); 292 } 293 else { 294 if (eNode.getName().equals(JOIN_E)) { 295 def.addNode(new JoinNodeDef(eNode.getAttributeValue(NAME_A), controlNodeHandler, 296 eNode.getAttributeValue(TO_A))); 297 } 298 else { 299 if (eNode.getName().equals(DECISION_E)) { 300 Element eSwitch = eNode.getChild(DECISION_SWITCH_E, ns); 301 List<String> transitions = new ArrayList<String>(); 302 for (Element e : (List<Element>) eSwitch.getChildren(DECISION_CASE_E, ns)) { 303 transitions.add(e.getAttributeValue(TO_A)); 304 } 305 transitions.add(eSwitch.getChild(DECISION_DEFAULT_E, ns).getAttributeValue(TO_A)); 306 307 String switchStatement = XmlUtils.prettyPrint(eSwitch).toString(); 308 def.addNode(new DecisionNodeDef(eNode.getAttributeValue(NAME_A), switchStatement, decisionHandlerClass, 309 transitions)); 310 } 311 else { 312 if (ACTION_E.equals(eNode.getName())) { 313 String[] transitions = new String[2]; 314 Element eActionConf = null; 315 for (Element elem : (List<Element>) eNode.getChildren()) { 316 if (ACTION_OK_E.equals(elem.getName())) { 317 transitions[0] = elem.getAttributeValue(TO_A); 318 } 319 else { 320 if (ACTION_ERROR_E.equals(elem.getName())) { 321 transitions[1] = elem.getAttributeValue(TO_A); 322 } 323 else { 324 if (SLA_INFO.equals(elem.getName()) || CREDENTIALS.equals(elem.getName())) { 325 continue; 326 } 327 else { 328 eActionConf = elem; 329 handleGlobal(ns, global, elem); 330 } 331 } 332 } 333 } 334 335 String credStr = eNode.getAttributeValue(CRED_A); 336 String userRetryMaxStr = eNode.getAttributeValue(USER_RETRY_MAX_A); 337 String userRetryIntervalStr = eNode.getAttributeValue(USER_RETRY_INTERVAL_A); 338 339 String actionConf = XmlUtils.prettyPrint(eActionConf).toString(); 340 def.addNode(new ActionNodeDef(eNode.getAttributeValue(NAME_A), actionConf, actionHandlerClass, 341 transitions[0], transitions[1], credStr, 342 userRetryMaxStr, userRetryIntervalStr)); 343 } 344 else { 345 if (SLA_INFO.equals(eNode.getName()) || CREDENTIALS.equals(eNode.getName())) { 346 // No operation is required 347 } 348 else { 349 if (eNode.getName().equals(GLOBAL)) { 350 global = eNode; 351 } 352 else { 353 if (eNode.getName().equals(PARAMETERS)) { 354 // No operation is required 355 } 356 else { 357 throw new WorkflowException(ErrorCode.E0703, eNode.getName()); 358 } 359 } 360 } 361 } 362 } 363 } 364 } 365 } 366 } 367 } 368 } 369 return def; 370 } 371 372 /** 373 * Validate workflow xml 374 * 375 * @param app 376 * @param node 377 * @param traversed 378 * @throws WorkflowException 379 */ 380 private void validate(LiteWorkflowApp app, NodeDef node, Map<String, VisitStatus> traversed) throws WorkflowException { 381 if (!(node instanceof StartNodeDef)) { 382 try { 383 ParamChecker.validateActionName(node.getName()); 384 } 385 catch (IllegalArgumentException ex) { 386 throw new WorkflowException(ErrorCode.E0724, ex.getMessage()); 387 } 388 } 389 if (node instanceof ActionNodeDef) { 390 try { 391 Element action = XmlUtils.parseXml(node.getConf()); 392 boolean supportedAction = Services.get().get(ActionService.class).getExecutor(action.getName()) != null; 393 if (!supportedAction) { 394 throw new WorkflowException(ErrorCode.E0723, node.getName(), action.getName()); 395 } 396 } 397 catch (JDOMException ex) { 398 throw new RuntimeException("It should never happen, " + ex.getMessage(), ex); 399 } 400 } 401 402 if(node instanceof ForkNodeDef){ 403 forkList.add(node.getName()); 404 } 405 406 if(node instanceof JoinNodeDef){ 407 joinList.add(node.getName()); 408 } 409 410 if (node instanceof EndNodeDef) { 411 traversed.put(node.getName(), VisitStatus.VISITED); 412 return; 413 } 414 if (node instanceof KillNodeDef) { 415 traversed.put(node.getName(), VisitStatus.VISITED); 416 return; 417 } 418 for (String transition : node.getTransitions()) { 419 420 if (app.getNode(transition) == null) { 421 throw new WorkflowException(ErrorCode.E0708, node.getName(), transition); 422 } 423 424 //check if it is a cycle 425 if (traversed.get(app.getNode(transition).getName()) == VisitStatus.VISITING) { 426 throw new WorkflowException(ErrorCode.E0707, app.getNode(transition).getName()); 427 } 428 //ignore validated one 429 if (traversed.get(app.getNode(transition).getName()) == VisitStatus.VISITED) { 430 continue; 431 } 432 433 traversed.put(app.getNode(transition).getName(), VisitStatus.VISITING); 434 validate(app, app.getNode(transition), traversed); 435 } 436 traversed.put(node.getName(), VisitStatus.VISITED); 437 } 438 439 /** 440 * Handle the global section 441 * 442 * @param ns 443 * @param global 444 * @param eActionConf 445 * @throws WorkflowException 446 */ 447 448 private void handleGlobal(Namespace ns, Element global, Element eActionConf) throws WorkflowException { 449 450 // Use the action's namespace when getting children of the action (will be different than ns for extension actions) 451 Namespace actionNs = eActionConf.getNamespace(); 452 453 if (global != null) { 454 Element globalJobTracker = global.getChild("job-tracker", ns); 455 Element globalNameNode = global.getChild("name-node", ns); 456 List<Element> globalJobXml = global.getChildren("job-xml", ns); 457 Element globalConfiguration = global.getChild("configuration", ns); 458 459 if (globalJobTracker != null && eActionConf.getChild("job-tracker", actionNs) == null) { 460 Element jobTracker = new Element("job-tracker", actionNs); 461 jobTracker.setText(globalJobTracker.getText()); 462 eActionConf.addContent(jobTracker); 463 } 464 465 if (globalNameNode != null && eActionConf.getChild("name-node", actionNs) == null) { 466 Element nameNode = new Element("name-node", actionNs); 467 nameNode.setText(globalNameNode.getText()); 468 eActionConf.addContent(nameNode); 469 } 470 471 if (!globalJobXml.isEmpty()) { 472 List<Element> actionJobXml = eActionConf.getChildren("job-xml", actionNs); 473 for(Element jobXml: globalJobXml){ 474 boolean alreadyExists = false; 475 for(Element actionXml: actionJobXml){ 476 if(jobXml.getText().equals(actionXml.getText())){ 477 alreadyExists = true; 478 } 479 } 480 481 if (!alreadyExists){ 482 Element ejobXml = new Element("job-xml", actionNs); 483 ejobXml.setText(jobXml.getText()); 484 eActionConf.addContent(ejobXml); 485 } 486 487 } 488 } 489 490 if (globalConfiguration != null) { 491 Element actionConfiguration = eActionConf.getChild("configuration", actionNs); 492 if (actionConfiguration == null) { 493 actionConfiguration = new Element("configuration", actionNs); 494 eActionConf.addContent(actionConfiguration); 495 } 496 for (Element globalConfig : (List<Element>) globalConfiguration.getChildren()) { 497 boolean isSet = false; 498 String globalVarName = globalConfig.getChildText("name", ns); 499 for (Element local : (List<Element>) actionConfiguration.getChildren()) { 500 if (local.getChildText("name", actionNs).equals(globalVarName)) { 501 isSet = true; 502 } 503 } 504 if (!isSet) { 505 Element varToCopy = new Element("property", actionNs); 506 Element varName = new Element("name", actionNs); 507 Element varValue = new Element("value", actionNs); 508 509 varName.setText(globalConfig.getChildText("name", ns)); 510 varValue.setText(globalConfig.getChildText("value", ns)); 511 512 varToCopy.addContent(varName); 513 varToCopy.addContent(varValue); 514 515 actionConfiguration.addContent(varToCopy); 516 } 517 } 518 } 519 } 520 else { 521 ActionExecutor ae = Services.get().get(ActionService.class).getExecutor(eActionConf.getName()); 522 if (ae == null) { 523 throw new WorkflowException(ErrorCode.E0723, "Unsupported action type"); 524 } 525 if (ae.requiresNNJT) { 526 527 if (eActionConf.getChild("name-node", actionNs) == null) { 528 throw new WorkflowException(ErrorCode.E0701, "No name-node defined"); 529 } 530 if (eActionConf.getChild("job-tracker", actionNs) == null) { 531 throw new WorkflowException(ErrorCode.E0701, "No job-tracker defined"); 532 } 533 } 534 } 535 } 536 537 }