001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.workflow.lite; 019 020 import org.apache.oozie.workflow.WorkflowException; 021 import org.apache.oozie.util.IOUtils; 022 import org.apache.oozie.util.XmlUtils; 023 import org.apache.oozie.util.ParamChecker; 024 import org.apache.oozie.ErrorCode; 025 import org.apache.oozie.service.Services; 026 import org.apache.oozie.service.ActionService; 027 import org.jdom.Element; 028 import org.jdom.JDOMException; 029 import org.jdom.Namespace; 030 import org.xml.sax.SAXException; 031 032 import javax.xml.transform.stream.StreamSource; 033 import javax.xml.validation.Schema; 034 import javax.xml.validation.Validator; 035 import java.io.IOException; 036 import java.io.Reader; 037 import java.io.StringReader; 038 import java.io.StringWriter; 039 import java.util.ArrayList; 040 import java.util.HashMap; 041 import java.util.HashSet; 042 import java.util.List; 043 import java.util.Map; 044 import java.util.Set; 045 import java.util.Stack; 046 047 /** 048 * Class to parse and validate workflow xml 049 */ 050 public class LiteWorkflowAppParser { 051 052 private static final String DECISION_E = "decision"; 053 private static final String ACTION_E = "action"; 054 private static final String END_E = "end"; 055 private static final String START_E = "start"; 056 private static final String JOIN_E = "join"; 057 private static final String FORK_E = "fork"; 058 private static final Object KILL_E = "kill"; 059 060 private static final String SLA_INFO = "info"; 061 private static final String CREDENTIALS = "credentials"; 062 063 private static final String NAME_A = "name"; 064 private static final String CRED_A = "cred"; 065 private static final String USER_RETRY_MAX_A = "retry-max"; 066 private static final String USER_RETRY_INTERVAL_A = "retry-interval"; 067 private static final String TO_A = "to"; 068 069 private static final String FORK_PATH_E = "path"; 070 private static final String FORK_START_A = "start"; 071 072 private static final String ACTION_OK_E = "ok"; 073 private static final String ACTION_ERROR_E = "error"; 074 075 private static final String DECISION_SWITCH_E = "switch"; 076 private static final String DECISION_CASE_E = "case"; 077 private static final String DECISION_DEFAULT_E = "default"; 078 079 private static final String KILL_MESSAGE_E = "message"; 080 public static final String VALIDATE_FORK_JOIN = "oozie.validate.ForkJoin"; 081 082 private Schema schema; 083 private Class<? extends DecisionNodeHandler> decisionHandlerClass; 084 private Class<? extends ActionNodeHandler> actionHandlerClass; 085 086 private static enum VisitStatus { 087 VISITING, VISITED 088 } 089 090 private List<String> forkList = new ArrayList<String>(); 091 private List<String> joinList = new ArrayList<String>(); 092 093 public LiteWorkflowAppParser(Schema schema, Class<? extends DecisionNodeHandler> decisionHandlerClass, 094 Class<? extends ActionNodeHandler> actionHandlerClass) throws WorkflowException { 095 this.schema = schema; 096 this.decisionHandlerClass = decisionHandlerClass; 097 this.actionHandlerClass = actionHandlerClass; 098 } 099 100 /** 101 * Parse and validate xml to {@link LiteWorkflowApp} 102 * 103 * @param reader 104 * @return LiteWorkflowApp 105 * @throws WorkflowException 106 */ 107 public LiteWorkflowApp validateAndParse(Reader reader) throws WorkflowException { 108 try { 109 StringWriter writer = new StringWriter(); 110 IOUtils.copyCharStream(reader, writer); 111 String strDef = writer.toString(); 112 113 if (schema != null) { 114 Validator validator = schema.newValidator(); 115 validator.validate(new StreamSource(new StringReader(strDef))); 116 } 117 118 Element wfDefElement = XmlUtils.parseXml(strDef); 119 LiteWorkflowApp app = parse(strDef, wfDefElement); 120 Map<String, VisitStatus> traversed = new HashMap<String, VisitStatus>(); 121 traversed.put(app.getNode(StartNodeDef.START).getName(), VisitStatus.VISITING); 122 validate(app, app.getNode(StartNodeDef.START), traversed); 123 //Validate whether fork/join are in pair or not 124 if (Services.get().getConf().getBoolean(VALIDATE_FORK_JOIN, true)) { 125 validateForkJoin(app); 126 } 127 return app; 128 } 129 catch (JDOMException ex) { 130 throw new WorkflowException(ErrorCode.E0700, ex.getMessage(), ex); 131 } 132 catch (SAXException ex) { 133 throw new WorkflowException(ErrorCode.E0701, ex.getMessage(), ex); 134 } 135 catch (IOException ex) { 136 throw new WorkflowException(ErrorCode.E0702, ex.getMessage(), ex); 137 } 138 } 139 140 /** 141 * Validate whether fork/join are in pair or not 142 * @param app LiteWorkflowApp 143 * @throws WorkflowException 144 */ 145 private void validateForkJoin(LiteWorkflowApp app) throws WorkflowException { 146 // Make sure the number of forks and joins in wf are equal 147 if (forkList.size() != joinList.size()) { 148 throw new WorkflowException(ErrorCode.E0730); 149 } 150 151 while(!forkList.isEmpty()){ 152 // Make sure each of the fork node has a corresponding join; start with the root fork Node first 153 validateFork(app.getNode(forkList.remove(0)), app); 154 } 155 156 } 157 158 /* 159 * Test whether the fork node has a corresponding join 160 * @param node - the fork node 161 * @param app - the WorkflowApp 162 * @return 163 * @throws WorkflowException 164 */ 165 private NodeDef validateFork(NodeDef forkNode, LiteWorkflowApp app) throws WorkflowException { 166 List<String> transitions = new ArrayList<String>(forkNode.getTransitions()); 167 // list for keeping track of "error-to" transitions of Action Node 168 List<String> errorToTransitions = new ArrayList<String>(); 169 String joinNode = null; 170 for (int i = 0; i < transitions.size(); i++) { 171 NodeDef node = app.getNode(transitions.get(i)); 172 if (node instanceof DecisionNodeDef) { 173 Set<String> decisionSet = new HashSet<String>(node.getTransitions()); 174 for (String ds : decisionSet) { 175 if (transitions.contains(ds)) { 176 throw new WorkflowException(ErrorCode.E0734, node.getName(), ds); 177 } else { 178 transitions.add(ds); 179 } 180 } 181 } else if (node instanceof ActionNodeDef) { 182 // Make sure the transition is valid 183 validateTransition(errorToTransitions, transitions, app, node); 184 // Add the "ok-to" transition of node 185 transitions.add(node.getTransitions().get(0)); 186 String errorTo = node.getTransitions().get(1); 187 // Add the "error-to" transition if the transition is a Action Node 188 if (app.getNode(errorTo) instanceof ActionNodeDef) { 189 errorToTransitions.add(errorTo); 190 } 191 } else if (node instanceof ForkNodeDef) { 192 forkList.remove(node.getName()); 193 // Make a recursive call to resolve this fork node 194 NodeDef joinNd = validateFork(node, app); 195 // Make sure the transition is valid 196 validateTransition(errorToTransitions, transitions, app, node); 197 // Add the "ok-to" transition of node 198 transitions.add(joinNd.getTransitions().get(0)); 199 } else if (node instanceof JoinNodeDef) { 200 // If joinNode encountered for the first time, remove it from the joinList and remember it 201 String currentJoin = node.getName(); 202 if (joinList.contains(currentJoin)) { 203 joinList.remove(currentJoin); 204 joinNode = currentJoin; 205 } else { 206 // Make sure this join is the same as the join seen from the first time 207 if (joinNode == null) { 208 throw new WorkflowException(ErrorCode.E0733, forkNode); 209 } 210 if (!joinNode.equals(currentJoin)) { 211 throw new WorkflowException(ErrorCode.E0732, forkNode, joinNode); 212 } 213 } 214 } else { 215 throw new WorkflowException(ErrorCode.E0730); 216 } 217 } 218 return app.getNode(joinNode); 219 220 } 221 222 private void validateTransition(List<String> errorToTransitions, List<String> transitions, LiteWorkflowApp app, NodeDef node) 223 throws WorkflowException { 224 for (String transition : node.getTransitions()) { 225 // Make sure the transition node is either a join node or is not already visited 226 if (transitions.contains(transition) && !(app.getNode(transition) instanceof JoinNodeDef)) { 227 throw new WorkflowException(ErrorCode.E0734, node.getName(), transition); 228 } 229 // Make sure the transition node is not the same as an already visited 'error-to' transition 230 if (errorToTransitions.contains(transition)) { 231 throw new WorkflowException(ErrorCode.E0735, node.getName(), transition); 232 } 233 } 234 235 } 236 237 /** 238 * Parse xml to {@link LiteWorkflowApp} 239 * 240 * @param strDef 241 * @param root 242 * @return LiteWorkflowApp 243 * @throws WorkflowException 244 */ 245 @SuppressWarnings({"unchecked", "ConstantConditions"}) 246 private LiteWorkflowApp parse(String strDef, Element root) throws WorkflowException { 247 Namespace ns = root.getNamespace(); 248 LiteWorkflowApp def = null; 249 for (Element eNode : (List<Element>) root.getChildren()) { 250 if (eNode.getName().equals(START_E)) { 251 def = new LiteWorkflowApp(root.getAttributeValue(NAME_A), strDef, 252 new StartNodeDef(eNode.getAttributeValue(TO_A))); 253 } 254 else { 255 if (eNode.getName().equals(END_E)) { 256 def.addNode(new EndNodeDef(eNode.getAttributeValue(NAME_A))); 257 } 258 else { 259 if (eNode.getName().equals(KILL_E)) { 260 def.addNode(new KillNodeDef(eNode.getAttributeValue(NAME_A), eNode.getChildText(KILL_MESSAGE_E, ns))); 261 } 262 else { 263 if (eNode.getName().equals(FORK_E)) { 264 List<String> paths = new ArrayList<String>(); 265 for (Element tran : (List<Element>) eNode.getChildren(FORK_PATH_E, ns)) { 266 paths.add(tran.getAttributeValue(FORK_START_A)); 267 } 268 def.addNode(new ForkNodeDef(eNode.getAttributeValue(NAME_A), paths)); 269 } 270 else { 271 if (eNode.getName().equals(JOIN_E)) { 272 def.addNode(new JoinNodeDef(eNode.getAttributeValue(NAME_A), eNode.getAttributeValue(TO_A))); 273 } 274 else { 275 if (eNode.getName().equals(DECISION_E)) { 276 Element eSwitch = eNode.getChild(DECISION_SWITCH_E, ns); 277 List<String> transitions = new ArrayList<String>(); 278 for (Element e : (List<Element>) eSwitch.getChildren(DECISION_CASE_E, ns)) { 279 transitions.add(e.getAttributeValue(TO_A)); 280 } 281 transitions.add(eSwitch.getChild(DECISION_DEFAULT_E, ns).getAttributeValue(TO_A)); 282 283 String switchStatement = XmlUtils.prettyPrint(eSwitch).toString(); 284 def.addNode(new DecisionNodeDef(eNode.getAttributeValue(NAME_A), switchStatement, decisionHandlerClass, 285 transitions)); 286 } 287 else { 288 if (ACTION_E.equals(eNode.getName())) { 289 String[] transitions = new String[2]; 290 Element eActionConf = null; 291 for (Element elem : (List<Element>) eNode.getChildren()) { 292 if (ACTION_OK_E.equals(elem.getName())) { 293 transitions[0] = elem.getAttributeValue(TO_A); 294 } 295 else { 296 if (ACTION_ERROR_E.equals(elem.getName())) { 297 transitions[1] = elem.getAttributeValue(TO_A); 298 } 299 else { 300 if (SLA_INFO.equals(elem.getName()) || CREDENTIALS.equals(elem.getName())) { 301 continue; 302 } 303 else { 304 eActionConf = elem; 305 } 306 } 307 } 308 } 309 310 String credStr = eNode.getAttributeValue(CRED_A); 311 String userRetryMaxStr = eNode.getAttributeValue(USER_RETRY_MAX_A); 312 String userRetryIntervalStr = eNode.getAttributeValue(USER_RETRY_INTERVAL_A); 313 314 String actionConf = XmlUtils.prettyPrint(eActionConf).toString(); 315 def.addNode(new ActionNodeDef(eNode.getAttributeValue(NAME_A), actionConf, actionHandlerClass, 316 transitions[0], transitions[1], credStr, 317 userRetryMaxStr, userRetryIntervalStr)); 318 } 319 else { 320 if (SLA_INFO.equals(eNode.getName()) || CREDENTIALS.equals(eNode.getName())) { 321 // No operation is required 322 } 323 else { 324 throw new WorkflowException(ErrorCode.E0703, eNode.getName()); 325 } 326 } 327 } 328 } 329 } 330 } 331 } 332 } 333 } 334 return def; 335 } 336 337 /** 338 * Validate workflow xml 339 * 340 * @param app 341 * @param node 342 * @param traversed 343 * @throws WorkflowException 344 */ 345 private void validate(LiteWorkflowApp app, NodeDef node, Map<String, VisitStatus> traversed) throws WorkflowException { 346 if (!(node instanceof StartNodeDef)) { 347 try { 348 ParamChecker.validateActionName(node.getName()); 349 } 350 catch (IllegalArgumentException ex) { 351 throw new WorkflowException(ErrorCode.E0724, ex.getMessage()); 352 } 353 } 354 if (node instanceof ActionNodeDef) { 355 try { 356 Element action = XmlUtils.parseXml(node.getConf()); 357 boolean supportedAction = Services.get().get(ActionService.class).getExecutor(action.getName()) != null; 358 if (!supportedAction) { 359 throw new WorkflowException(ErrorCode.E0723, node.getName(), action.getName()); 360 } 361 } 362 catch (JDOMException ex) { 363 throw new RuntimeException("It should never happen, " + ex.getMessage(), ex); 364 } 365 } 366 367 if(node instanceof ForkNodeDef){ 368 forkList.add(node.getName()); 369 } 370 371 if(node instanceof JoinNodeDef){ 372 joinList.add(node.getName()); 373 } 374 375 if (node instanceof EndNodeDef) { 376 traversed.put(node.getName(), VisitStatus.VISITED); 377 return; 378 } 379 if (node instanceof KillNodeDef) { 380 traversed.put(node.getName(), VisitStatus.VISITED); 381 return; 382 } 383 for (String transition : node.getTransitions()) { 384 385 if (app.getNode(transition) == null) { 386 throw new WorkflowException(ErrorCode.E0708, node.getName(), transition); 387 } 388 389 //check if it is a cycle 390 if (traversed.get(app.getNode(transition).getName()) == VisitStatus.VISITING) { 391 throw new WorkflowException(ErrorCode.E0707, app.getNode(transition).getName()); 392 } 393 //ignore validated one 394 if (traversed.get(app.getNode(transition).getName()) == VisitStatus.VISITED) { 395 continue; 396 } 397 398 traversed.put(app.getNode(transition).getName(), VisitStatus.VISITING); 399 validate(app, app.getNode(transition), traversed); 400 } 401 traversed.put(node.getName(), VisitStatus.VISITED); 402 } 403 }