001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.workflow.lite; 020 021import org.apache.commons.codec.binary.Base64; 022import org.apache.commons.lang.StringUtils; 023import org.apache.hadoop.conf.Configuration; 024import org.apache.hadoop.io.Writable; 025import org.apache.oozie.ErrorCode; 026import org.apache.oozie.action.ActionExecutor; 027import org.apache.oozie.action.hadoop.FsActionExecutor; 028import org.apache.oozie.action.oozie.SubWorkflowActionExecutor; 029import org.apache.oozie.service.ActionService; 030import org.apache.oozie.service.ConfigurationService; 031import org.apache.oozie.service.SchemaService; 032import org.apache.oozie.service.Services; 033import org.apache.oozie.util.ELUtils; 034import org.apache.oozie.util.IOUtils; 035import org.apache.oozie.util.ParameterVerifier; 036import org.apache.oozie.util.ParameterVerifierException; 037import org.apache.oozie.util.WritableUtils; 038import org.apache.oozie.util.XConfiguration; 039import org.apache.oozie.util.XmlUtils; 040import org.apache.oozie.workflow.WorkflowException; 041import org.jdom.Element; 042import org.jdom.JDOMException; 043import org.jdom.Namespace; 044import org.xml.sax.SAXException; 045 046import javax.xml.transform.stream.StreamSource; 047import javax.xml.validation.Schema; 048import javax.xml.validation.Validator; 049import java.io.ByteArrayInputStream; 050import java.io.ByteArrayOutputStream; 051import java.io.DataInput; 052import java.io.DataInputStream; 053import java.io.DataOutput; 054import java.io.DataOutputStream; 055import java.io.IOException; 056import java.io.Reader; 057import java.io.StringReader; 058import java.io.StringWriter; 059import java.util.ArrayList; 060import java.util.List; 061import java.util.zip.Deflater; 062import java.util.zip.DeflaterOutputStream; 063import java.util.zip.Inflater; 064import java.util.zip.InflaterInputStream; 065 066/** 067 * Class to parse and validate workflow xml 068 */ 069public class LiteWorkflowAppParser { 070 071 private static final String DECISION_E = "decision"; 072 private static final String ACTION_E = "action"; 073 private static final String END_E = "end"; 074 private static final String START_E = "start"; 075 private static final String JOIN_E = "join"; 076 private static final String FORK_E = "fork"; 077 private static final Object KILL_E = "kill"; 078 079 private static final String SLA_INFO = "info"; 080 private static final String CREDENTIALS = "credentials"; 081 private static final String GLOBAL = "global"; 082 private static final String PARAMETERS = "parameters"; 083 084 private static final String NAME_A = "name"; 085 private static final String CRED_A = "cred"; 086 private static final String USER_RETRY_MAX_A = "retry-max"; 087 private static final String USER_RETRY_INTERVAL_A = "retry-interval"; 088 private static final String TO_A = "to"; 089 private static final String USER_RETRY_POLICY_A = "retry-policy"; 090 091 private static final String FORK_PATH_E = "path"; 092 private static final String FORK_START_A = "start"; 093 094 private static final String ACTION_OK_E = "ok"; 095 private static final String ACTION_ERROR_E = "error"; 096 097 private static final String DECISION_SWITCH_E = "switch"; 098 private static final String DECISION_CASE_E = "case"; 099 private static final String DECISION_DEFAULT_E = "default"; 100 101 private static final String SUBWORKFLOW_E = "sub-workflow"; 102 103 private static final String KILL_MESSAGE_E = "message"; 104 public static final String VALIDATE_FORK_JOIN = "oozie.validate.ForkJoin"; 105 public static final String WF_VALIDATE_FORK_JOIN = "oozie.wf.validate.ForkJoin"; 106 107 public static final String DEFAULT_NAME_NODE = "oozie.actions.default.name-node"; 108 public static final String DEFAULT_JOB_TRACKER = "oozie.actions.default.job-tracker"; 109 public static final String OOZIE_GLOBAL = "oozie.wf.globalconf"; 110 111 private static final String JOB_TRACKER = "job-tracker"; 112 private static final String NAME_NODE = "name-node"; 113 private static final String JOB_XML = "job-xml"; 114 private static final String CONFIGURATION = "configuration"; 115 116 private Schema schema; 117 private Class<? extends ControlNodeHandler> controlNodeHandler; 118 private Class<? extends DecisionNodeHandler> decisionHandlerClass; 119 private Class<? extends ActionNodeHandler> actionHandlerClass; 120 121 private String defaultNameNode; 122 private String defaultJobTracker; 123 124 public LiteWorkflowAppParser(Schema schema, 125 Class<? extends ControlNodeHandler> controlNodeHandler, 126 Class<? extends DecisionNodeHandler> decisionHandlerClass, 127 Class<? extends ActionNodeHandler> actionHandlerClass) throws WorkflowException { 128 this.schema = schema; 129 this.controlNodeHandler = controlNodeHandler; 130 this.decisionHandlerClass = decisionHandlerClass; 131 this.actionHandlerClass = actionHandlerClass; 132 133 defaultNameNode = ConfigurationService.get(DEFAULT_NAME_NODE); 134 if (defaultNameNode != null) { 135 defaultNameNode = defaultNameNode.trim(); 136 if (defaultNameNode.isEmpty()) { 137 defaultNameNode = null; 138 } 139 } 140 defaultJobTracker = ConfigurationService.get(DEFAULT_JOB_TRACKER); 141 if (defaultJobTracker != null) { 142 defaultJobTracker = defaultJobTracker.trim(); 143 if (defaultJobTracker.isEmpty()) { 144 defaultJobTracker = null; 145 } 146 } 147 } 148 149 public LiteWorkflowApp validateAndParse(Reader reader, Configuration jobConf) throws WorkflowException { 150 return validateAndParse(reader, jobConf, null); 151 } 152 153 /** 154 * Parse and validate xml to {@link LiteWorkflowApp} 155 * 156 * @param reader 157 * @return LiteWorkflowApp 158 * @throws WorkflowException 159 */ 160 public LiteWorkflowApp validateAndParse(Reader reader, Configuration jobConf, Configuration configDefault) 161 throws WorkflowException { 162 try { 163 StringWriter writer = new StringWriter(); 164 IOUtils.copyCharStream(reader, writer); 165 String strDef = writer.toString(); 166 167 if (schema != null) { 168 Validator validator = SchemaService.getValidator(schema); 169 validator.validate(new StreamSource(new StringReader(strDef))); 170 } 171 172 Element wfDefElement = XmlUtils.parseXml(strDef); 173 ParameterVerifier.verifyParameters(jobConf, wfDefElement); 174 LiteWorkflowApp app = parse(strDef, wfDefElement, configDefault, jobConf); 175 176 177 boolean validateForkJoin = false; 178 179 if (jobConf.getBoolean(WF_VALIDATE_FORK_JOIN, true) 180 && ConfigurationService.getBoolean(VALIDATE_FORK_JOIN)) { 181 validateForkJoin = true; 182 } 183 184 LiteWorkflowValidator validator = new LiteWorkflowValidator(); 185 validator.validateWorkflow(app, validateForkJoin); 186 187 return app; 188 } 189 catch (ParameterVerifierException ex) { 190 throw new WorkflowException(ex); 191 } 192 catch (JDOMException ex) { 193 throw new WorkflowException(ErrorCode.E0700, ex.getMessage(), ex); 194 } 195 catch (SAXException ex) { 196 throw new WorkflowException(ErrorCode.E0701, ex.getMessage(), ex); 197 } 198 catch (IOException ex) { 199 throw new WorkflowException(ErrorCode.E0702, ex.getMessage(), ex); 200 } 201 } 202 203 /** 204 * Parse xml to {@link LiteWorkflowApp} 205 * 206 * @param strDef 207 * @param root 208 * @param configDefault 209 * @param jobConf 210 * @return LiteWorkflowApp 211 * @throws WorkflowException 212 */ 213 @SuppressWarnings({"unchecked"}) 214 private LiteWorkflowApp parse(String strDef, Element root, Configuration configDefault, Configuration jobConf) 215 throws WorkflowException { 216 Namespace ns = root.getNamespace(); 217 LiteWorkflowApp def = null; 218 GlobalSectionData gData = jobConf.get(OOZIE_GLOBAL) == null ? 219 null : getGlobalFromString(jobConf.get(OOZIE_GLOBAL)); 220 boolean serializedGlobalConf = false; 221 for (Element eNode : (List<Element>) root.getChildren()) { 222 if (eNode.getName().equals(START_E)) { 223 def = new LiteWorkflowApp(root.getAttributeValue(NAME_A), strDef, 224 new StartNodeDef(controlNodeHandler, eNode.getAttributeValue(TO_A))); 225 } else if (eNode.getName().equals(END_E)) { 226 def.addNode(new EndNodeDef(eNode.getAttributeValue(NAME_A), controlNodeHandler)); 227 } else if (eNode.getName().equals(KILL_E)) { 228 def.addNode(new KillNodeDef(eNode.getAttributeValue(NAME_A), 229 eNode.getChildText(KILL_MESSAGE_E, ns), controlNodeHandler)); 230 } else if (eNode.getName().equals(FORK_E)) { 231 List<String> paths = new ArrayList<String>(); 232 for (Element tran : (List<Element>) eNode.getChildren(FORK_PATH_E, ns)) { 233 paths.add(tran.getAttributeValue(FORK_START_A)); 234 } 235 def.addNode(new ForkNodeDef(eNode.getAttributeValue(NAME_A), controlNodeHandler, paths)); 236 } else if (eNode.getName().equals(JOIN_E)) { 237 def.addNode(new JoinNodeDef(eNode.getAttributeValue(NAME_A), controlNodeHandler, eNode.getAttributeValue(TO_A))); 238 } else if (eNode.getName().equals(DECISION_E)) { 239 Element eSwitch = eNode.getChild(DECISION_SWITCH_E, ns); 240 List<String> transitions = new ArrayList<String>(); 241 for (Element e : (List<Element>) eSwitch.getChildren(DECISION_CASE_E, ns)) { 242 transitions.add(e.getAttributeValue(TO_A)); 243 } 244 transitions.add(eSwitch.getChild(DECISION_DEFAULT_E, ns).getAttributeValue(TO_A)); 245 246 String switchStatement = XmlUtils.prettyPrint(eSwitch).toString(); 247 def.addNode(new DecisionNodeDef(eNode.getAttributeValue(NAME_A), switchStatement, decisionHandlerClass, 248 transitions)); 249 } else if (ACTION_E.equals(eNode.getName())) { 250 String[] transitions = new String[2]; 251 Element eActionConf = null; 252 for (Element elem : (List<Element>) eNode.getChildren()) { 253 if (ACTION_OK_E.equals(elem.getName())) { 254 transitions[0] = elem.getAttributeValue(TO_A); 255 } else if (ACTION_ERROR_E.equals(elem.getName())) { 256 transitions[1] = elem.getAttributeValue(TO_A); 257 } else if (SLA_INFO.equals(elem.getName()) || CREDENTIALS.equals(elem.getName())) { 258 continue; 259 } else { 260 if (!serializedGlobalConf && elem.getName().equals(SubWorkflowActionExecutor.ACTION_TYPE) && 261 elem.getChild(("propagate-configuration"), ns) != null && gData != null) { 262 serializedGlobalConf = true; 263 jobConf.set(OOZIE_GLOBAL, getGlobalString(gData)); 264 } 265 eActionConf = elem; 266 if (SUBWORKFLOW_E.equals(elem.getName())) { 267 handleDefaultsAndGlobal(gData, null, elem); 268 } 269 else { 270 handleDefaultsAndGlobal(gData, configDefault, elem); 271 } 272 } 273 } 274 275 String credStr = eNode.getAttributeValue(CRED_A); 276 String userRetryMaxStr = eNode.getAttributeValue(USER_RETRY_MAX_A); 277 String userRetryIntervalStr = eNode.getAttributeValue(USER_RETRY_INTERVAL_A); 278 String userRetryPolicyStr = eNode.getAttributeValue(USER_RETRY_POLICY_A); 279 try { 280 if (!StringUtils.isEmpty(userRetryMaxStr)) { 281 userRetryMaxStr = ELUtils.resolveAppName(userRetryMaxStr, jobConf); 282 } 283 if (!StringUtils.isEmpty(userRetryIntervalStr)) { 284 userRetryIntervalStr = ELUtils.resolveAppName(userRetryIntervalStr, jobConf); 285 } 286 if (!StringUtils.isEmpty(userRetryPolicyStr)) { 287 userRetryPolicyStr = ELUtils.resolveAppName(userRetryPolicyStr, jobConf); 288 } 289 } 290 catch (Exception e) { 291 throw new WorkflowException(ErrorCode.E0703, e.getMessage()); 292 } 293 294 String actionConf = XmlUtils.prettyPrint(eActionConf).toString(); 295 def.addNode(new ActionNodeDef(eNode.getAttributeValue(NAME_A), actionConf, actionHandlerClass, 296 transitions[0], transitions[1], credStr, userRetryMaxStr, userRetryIntervalStr, 297 userRetryPolicyStr)); 298 } else if (SLA_INFO.equals(eNode.getName()) || CREDENTIALS.equals(eNode.getName())) { 299 // No operation is required 300 } else if (eNode.getName().equals(GLOBAL)) { 301 if(jobConf.get(OOZIE_GLOBAL) != null) { 302 gData = getGlobalFromString(jobConf.get(OOZIE_GLOBAL)); 303 handleDefaultsAndGlobal(gData, null, eNode); 304 } 305 gData = parseGlobalSection(ns, eNode); 306 } else if (eNode.getName().equals(PARAMETERS)) { 307 // No operation is required 308 } else { 309 throw new WorkflowException(ErrorCode.E0703, eNode.getName()); 310 } 311 } 312 return def; 313 } 314 315 /** 316 * Read the GlobalSectionData from Base64 string. 317 * @param globalStr 318 * @return GlobalSectionData 319 * @throws WorkflowException 320 */ 321 private GlobalSectionData getGlobalFromString(String globalStr) throws WorkflowException { 322 GlobalSectionData globalSectionData = new GlobalSectionData(); 323 try { 324 byte[] data = Base64.decodeBase64(globalStr); 325 Inflater inflater = new Inflater(); 326 DataInputStream ois = new DataInputStream(new InflaterInputStream(new ByteArrayInputStream(data), inflater)); 327 globalSectionData.readFields(ois); 328 ois.close(); 329 } catch (Exception ex) { 330 throw new WorkflowException(ErrorCode.E0700, "Error while processing global section conf"); 331 } 332 return globalSectionData; 333 } 334 335 336 /** 337 * Write the GlobalSectionData to a Base64 string. 338 * @param globalSectionData 339 * @return String 340 * @throws WorkflowException 341 */ 342 private String getGlobalString(GlobalSectionData globalSectionData) throws WorkflowException { 343 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 344 DataOutputStream oos = null; 345 try { 346 Deflater def = new Deflater(); 347 oos = new DataOutputStream(new DeflaterOutputStream(baos, def)); 348 globalSectionData.write(oos); 349 oos.close(); 350 } catch (IOException e) { 351 throw new WorkflowException(ErrorCode.E0700, "Error while processing global section conf"); 352 } 353 return Base64.encodeBase64String(baos.toByteArray()); 354 } 355 356 private void addChildElement(Element parent, Namespace ns, String childName, String childValue) { 357 Element child = new Element(childName, ns); 358 child.setText(childValue); 359 parent.addContent(child); 360 } 361 362 private class GlobalSectionData implements Writable { 363 String jobTracker; 364 String nameNode; 365 List<String> jobXmls; 366 Configuration conf; 367 368 public GlobalSectionData() { 369 } 370 371 public GlobalSectionData(String jobTracker, String nameNode, List<String> jobXmls, Configuration conf) { 372 this.jobTracker = jobTracker; 373 this.nameNode = nameNode; 374 this.jobXmls = jobXmls; 375 this.conf = conf; 376 } 377 378 @Override 379 public void write(DataOutput dataOutput) throws IOException { 380 WritableUtils.writeStr(dataOutput, jobTracker); 381 WritableUtils.writeStr(dataOutput, nameNode); 382 383 if(jobXmls != null && !jobXmls.isEmpty()) { 384 dataOutput.writeInt(jobXmls.size()); 385 for (String content : jobXmls) { 386 WritableUtils.writeStr(dataOutput, content); 387 } 388 } else { 389 dataOutput.writeInt(0); 390 } 391 if(conf != null) { 392 WritableUtils.writeStr(dataOutput, XmlUtils.prettyPrint(conf).toString()); 393 } else { 394 WritableUtils.writeStr(dataOutput, null); 395 } 396 } 397 398 @Override 399 public void readFields(DataInput dataInput) throws IOException { 400 jobTracker = WritableUtils.readStr(dataInput); 401 nameNode = WritableUtils.readStr(dataInput); 402 int length = dataInput.readInt(); 403 if (length > 0) { 404 jobXmls = new ArrayList<String>(); 405 for (int i = 0; i < length; i++) { 406 jobXmls.add(WritableUtils.readStr(dataInput)); 407 } 408 } 409 String confString = WritableUtils.readStr(dataInput); 410 if(confString != null) { 411 conf = new XConfiguration(new StringReader(confString)); 412 } 413 } 414 } 415 416 private GlobalSectionData parseGlobalSection(Namespace ns, Element global) throws WorkflowException { 417 GlobalSectionData gData = null; 418 if (global != null) { 419 String globalJobTracker = null; 420 Element globalJobTrackerElement = global.getChild(JOB_TRACKER, ns); 421 if (globalJobTrackerElement != null) { 422 globalJobTracker = globalJobTrackerElement.getValue(); 423 } 424 425 String globalNameNode = null; 426 Element globalNameNodeElement = global.getChild(NAME_NODE, ns); 427 if (globalNameNodeElement != null) { 428 globalNameNode = globalNameNodeElement.getValue(); 429 } 430 431 List<String> globalJobXmls = null; 432 @SuppressWarnings("unchecked") 433 List<Element> globalJobXmlElements = global.getChildren(JOB_XML, ns); 434 if (!globalJobXmlElements.isEmpty()) { 435 globalJobXmls = new ArrayList<String>(globalJobXmlElements.size()); 436 for(Element jobXmlElement: globalJobXmlElements) { 437 globalJobXmls.add(jobXmlElement.getText()); 438 } 439 } 440 441 Configuration globalConf = null; 442 Element globalConfigurationElement = global.getChild(CONFIGURATION, ns); 443 if (globalConfigurationElement != null) { 444 try { 445 globalConf = new XConfiguration(new StringReader(XmlUtils.prettyPrint(globalConfigurationElement).toString())); 446 } catch (IOException ioe) { 447 throw new WorkflowException(ErrorCode.E0700, "Error while processing global section conf"); 448 } 449 } 450 gData = new GlobalSectionData(globalJobTracker, globalNameNode, globalJobXmls, globalConf); 451 } 452 return gData; 453 } 454 455 private void handleDefaultsAndGlobal(GlobalSectionData gData, Configuration configDefault, Element actionElement) 456 throws WorkflowException { 457 458 ActionExecutor ae = Services.get().get(ActionService.class).getExecutor(actionElement.getName()); 459 if (ae == null && !GLOBAL.equals(actionElement.getName())) { 460 throw new WorkflowException(ErrorCode.E0723, actionElement.getName(), ActionService.class.getName()); 461 } 462 463 Namespace actionNs = actionElement.getNamespace(); 464 465 // If this is the global section or ActionExecutor.requiresNameNodeJobTracker() returns true, we parse the action's 466 // <name-node> and <job-tracker> fields. If those aren't defined, we take them from the <global> section. If those 467 // aren't defined, we take them from the oozie-site defaults. If those aren't defined, we throw a WorkflowException. 468 // However, for the SubWorkflow and FS Actions, as well as the <global> section, we don't throw the WorkflowException. 469 // Also, we only parse the NN (not the JT) for the FS Action. 470 if (SubWorkflowActionExecutor.ACTION_TYPE.equals(actionElement.getName()) || 471 FsActionExecutor.ACTION_TYPE.equals(actionElement.getName()) || 472 GLOBAL.equals(actionElement.getName()) || ae.requiresNameNodeJobTracker()) { 473 if (actionElement.getChild(NAME_NODE, actionNs) == null) { 474 if (gData != null && gData.nameNode != null) { 475 addChildElement(actionElement, actionNs, NAME_NODE, gData.nameNode); 476 } else if (defaultNameNode != null) { 477 addChildElement(actionElement, actionNs, NAME_NODE, defaultNameNode); 478 } else if (!(SubWorkflowActionExecutor.ACTION_TYPE.equals(actionElement.getName()) || 479 FsActionExecutor.ACTION_TYPE.equals(actionElement.getName()) || 480 GLOBAL.equals(actionElement.getName()))) { 481 throw new WorkflowException(ErrorCode.E0701, "No " + NAME_NODE + " defined"); 482 } 483 } 484 if (actionElement.getChild(JOB_TRACKER, actionNs) == null && 485 !FsActionExecutor.ACTION_TYPE.equals(actionElement.getName())) { 486 if (gData != null && gData.jobTracker != null) { 487 addChildElement(actionElement, actionNs, JOB_TRACKER, gData.jobTracker); 488 } else if (defaultJobTracker != null) { 489 addChildElement(actionElement, actionNs, JOB_TRACKER, defaultJobTracker); 490 } else if (!(SubWorkflowActionExecutor.ACTION_TYPE.equals(actionElement.getName()) || 491 GLOBAL.equals(actionElement.getName()))) { 492 throw new WorkflowException(ErrorCode.E0701, "No " + JOB_TRACKER + " defined"); 493 } 494 } 495 } 496 497 // If this is the global section or ActionExecutor.supportsConfigurationJobXML() returns true, we parse the action's 498 // <configuration> and <job-xml> fields. We also merge this with those from the <global> section, if given. If none are 499 // defined, empty values are placed. Exceptions are thrown if there's an error parsing, but not if they're not given. 500 if ( GLOBAL.equals(actionElement.getName()) || ae.supportsConfigurationJobXML()) { 501 @SuppressWarnings("unchecked") 502 List<Element> actionJobXmls = actionElement.getChildren(JOB_XML, actionNs); 503 if (gData != null && gData.jobXmls != null) { 504 for(String gJobXml : gData.jobXmls) { 505 boolean alreadyExists = false; 506 for (Element actionXml : actionJobXmls) { 507 if (gJobXml.equals(actionXml.getText())) { 508 alreadyExists = true; 509 break; 510 } 511 } 512 if (!alreadyExists) { 513 Element ejobXml = new Element(JOB_XML, actionNs); 514 ejobXml.setText(gJobXml); 515 actionElement.addContent(ejobXml); 516 } 517 } 518 } 519 520 try { 521 XConfiguration actionConf = new XConfiguration(); 522 if (configDefault != null) 523 XConfiguration.copy(configDefault, actionConf); 524 if (gData != null && gData.conf != null) { 525 XConfiguration.copy(gData.conf, actionConf); 526 } 527 Element actionConfiguration = actionElement.getChild(CONFIGURATION, actionNs); 528 if (actionConfiguration != null) { 529 //copy and override 530 XConfiguration.copy(new XConfiguration(new StringReader(XmlUtils.prettyPrint(actionConfiguration).toString())), 531 actionConf); 532 } 533 int position = actionElement.indexOf(actionConfiguration); 534 actionElement.removeContent(actionConfiguration); //replace with enhanced one 535 Element eConfXml = XmlUtils.parseXml(actionConf.toXmlString(false)); 536 eConfXml.detach(); 537 eConfXml.setNamespace(actionNs); 538 if (position > 0) { 539 actionElement.addContent(position, eConfXml); 540 } 541 else { 542 actionElement.addContent(eConfXml); 543 } 544 } 545 catch (IOException e) { 546 throw new WorkflowException(ErrorCode.E0700, "Error while processing action conf"); 547 } 548 catch (JDOMException e) { 549 throw new WorkflowException(ErrorCode.E0700, "Error while processing action conf"); 550 } 551 } 552 } 553}