This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.workflow.lite;
019
020 import org.apache.oozie.workflow.WorkflowException;
021 import org.apache.oozie.util.IOUtils;
022 import org.apache.oozie.util.XmlUtils;
023 import org.apache.oozie.util.ParamChecker;
024 import org.apache.oozie.util.ParameterVerifier;
025 import org.apache.oozie.util.ParameterVerifierException;
026 import org.apache.oozie.ErrorCode;
027 import org.apache.oozie.action.ActionExecutor;
028 import org.apache.oozie.service.Services;
029 import org.apache.oozie.service.ActionService;
030 import org.apache.hadoop.conf.Configuration;
031 import org.jdom.Element;
032 import org.jdom.JDOMException;
033 import org.jdom.Namespace;
034 import org.xml.sax.SAXException;
035
036 import javax.xml.transform.stream.StreamSource;
037 import javax.xml.validation.Schema;
038 import javax.xml.validation.Validator;
039 import java.io.IOException;
040 import java.io.Reader;
041 import java.io.StringReader;
042 import java.io.StringWriter;
043 import java.util.ArrayList;
044 import java.util.Arrays;
045 import java.util.Deque;
046 import java.util.HashMap;
047 import java.util.HashSet;
048 import java.util.LinkedList;
049 import java.util.List;
050 import java.util.Map;
051
052 /**
053 * Class to parse and validate workflow xml
054 */
055 public class LiteWorkflowAppParser {
056
057 private static final String DECISION_E = "decision";
058 private static final String ACTION_E = "action";
059 private static final String END_E = "end";
060 private static final String START_E = "start";
061 private static final String JOIN_E = "join";
062 private static final String FORK_E = "fork";
063 private static final Object KILL_E = "kill";
064
065 private static final String SLA_INFO = "info";
066 private static final String CREDENTIALS = "credentials";
067 private static final String GLOBAL = "global";
068 private static final String PARAMETERS = "parameters";
069
070 private static final String NAME_A = "name";
071 private static final String CRED_A = "cred";
072 private static final String USER_RETRY_MAX_A = "retry-max";
073 private static final String USER_RETRY_INTERVAL_A = "retry-interval";
074 private static final String TO_A = "to";
075
076 private static final String FORK_PATH_E = "path";
077 private static final String FORK_START_A = "start";
078
079 private static final String ACTION_OK_E = "ok";
080 private static final String ACTION_ERROR_E = "error";
081
082 private static final String DECISION_SWITCH_E = "switch";
083 private static final String DECISION_CASE_E = "case";
084 private static final String DECISION_DEFAULT_E = "default";
085
086 private static final String KILL_MESSAGE_E = "message";
087 public static final String VALIDATE_FORK_JOIN = "oozie.validate.ForkJoin";
088 public static final String WF_VALIDATE_FORK_JOIN = "oozie.wf.validate.ForkJoin";
089
090 private Schema schema;
091 private Class<? extends ControlNodeHandler> controlNodeHandler;
092 private Class<? extends DecisionNodeHandler> decisionHandlerClass;
093 private Class<? extends ActionNodeHandler> actionHandlerClass;
094
095 private static enum VisitStatus {
096 VISITING, VISITED
097 }
098
099 private List<String> forkList = new ArrayList<String>();
100 private List<String> joinList = new ArrayList<String>();
101 private StartNodeDef startNode;
102 private List<String> visitedOkNodes = new ArrayList<String>();
103 private List<String> visitedJoinNodes = new ArrayList<String>();
104
105 public LiteWorkflowAppParser(Schema schema,
106 Class<? extends ControlNodeHandler> controlNodeHandler,
107 Class<? extends DecisionNodeHandler> decisionHandlerClass,
108 Class<? extends ActionNodeHandler> actionHandlerClass) throws WorkflowException {
109 this.schema = schema;
110 this.controlNodeHandler = controlNodeHandler;
111 this.decisionHandlerClass = decisionHandlerClass;
112 this.actionHandlerClass = actionHandlerClass;
113 }
114
115 /**
116 * Parse and validate xml to {@link LiteWorkflowApp}
117 *
118 * @param reader
119 * @return LiteWorkflowApp
120 * @throws WorkflowException
121 */
122 public LiteWorkflowApp validateAndParse(Reader reader, Configuration jobConf) throws WorkflowException {
123 try {
124 StringWriter writer = new StringWriter();
125 IOUtils.copyCharStream(reader, writer);
126 String strDef = writer.toString();
127
128 if (schema != null) {
129 Validator validator = schema.newValidator();
130 validator.validate(new StreamSource(new StringReader(strDef)));
131 }
132
133 Element wfDefElement = XmlUtils.parseXml(strDef);
134 ParameterVerifier.verifyParameters(jobConf, wfDefElement);
135 LiteWorkflowApp app = parse(strDef, wfDefElement);
136 Map<String, VisitStatus> traversed = new HashMap<String, VisitStatus>();
137 traversed.put(app.getNode(StartNodeDef.START).getName(), VisitStatus.VISITING);
138 validate(app, app.getNode(StartNodeDef.START), traversed);
139 //Validate whether fork/join are in pair or not
140 if (jobConf.getBoolean(WF_VALIDATE_FORK_JOIN, true) && Services.get().getConf().getBoolean(VALIDATE_FORK_JOIN, true)) {
141 validateForkJoin(app);
142 }
143 return app;
144 }
145 catch (ParameterVerifierException ex) {
146 throw new WorkflowException(ex);
147 }
148 catch (JDOMException ex) {
149 throw new WorkflowException(ErrorCode.E0700, ex.getMessage(), ex);
150 }
151 catch (SAXException ex) {
152 throw new WorkflowException(ErrorCode.E0701, ex.getMessage(), ex);
153 }
154 catch (IOException ex) {
155 throw new WorkflowException(ErrorCode.E0702, ex.getMessage(), ex);
156 }
157 }
158
159 /**
160 * Validate whether fork/join are in pair or not
161 * @param app LiteWorkflowApp
162 * @throws WorkflowException
163 */
164 private void validateForkJoin(LiteWorkflowApp app) throws WorkflowException {
165 // Make sure the number of forks and joins in wf are equal
166 if (forkList.size() != joinList.size()) {
167 throw new WorkflowException(ErrorCode.E0730);
168 }
169
170 // No need to bother going through all of this if there are no fork/join nodes
171 if (!forkList.isEmpty()) {
172 visitedOkNodes.clear();
173 visitedJoinNodes.clear();
174 validateForkJoin(startNode, app, new LinkedList<String>(), new LinkedList<String>(), new LinkedList<String>(), true);
175 }
176 }
177
178 /*
179 * Recursively walk through the DAG and make sure that all fork paths are valid.
180 * This should be called from validateForkJoin(LiteWorkflowApp app). It assumes that visitedOkNodes and visitedJoinNodes are
181 * both empty ArrayLists on the first call.
182 *
183 * @param node the current node; use the startNode on the first call
184 * @param app the WorkflowApp
185 * @param forkNodes a stack of the current fork nodes
186 * @param joinNodes a stack of the current join nodes
187 * @param path a stack of the current path
188 * @param okTo false if node (or an ancestor of node) was gotten to via an "error to" transition or via a join node that has
189 * already been visited at least once before
190 * @throws WorkflowException
191 */
192 private void validateForkJoin(NodeDef node, LiteWorkflowApp app, Deque<String> forkNodes, Deque<String> joinNodes,
193 Deque<String> path, boolean okTo) throws WorkflowException {
194 if (path.contains(node.getName())) {
195 // cycle
196 throw new WorkflowException(ErrorCode.E0741, node.getName(), Arrays.toString(path.toArray()));
197 }
198 path.push(node.getName());
199
200 // Make sure that we're not revisiting a node (that's not a Kill, Join, or End type) that's been visited before from an
201 // "ok to" transition; if its from an "error to" transition, then its okay to visit it multiple times. Also, because we
202 // traverse through join nodes multiple times, we have to make sure not to throw an exception here when we're really just
203 // re-walking the same execution path (this is why we need the visitedJoinNodes list used later)
204 if (okTo && !(node instanceof KillNodeDef) && !(node instanceof JoinNodeDef) && !(node instanceof EndNodeDef)) {
205 if (visitedOkNodes.contains(node.getName())) {
206 throw new WorkflowException(ErrorCode.E0743, node.getName());
207 }
208 visitedOkNodes.add(node.getName());
209 }
210
211 if (node instanceof StartNodeDef) {
212 String transition = node.getTransitions().get(0); // start always has only 1 transition
213 NodeDef tranNode = app.getNode(transition);
214 validateForkJoin(tranNode, app, forkNodes, joinNodes, path, okTo);
215 }
216 else if (node instanceof ActionNodeDef) {
217 String transition = node.getTransitions().get(0); // "ok to" transition
218 NodeDef tranNode = app.getNode(transition);
219 validateForkJoin(tranNode, app, forkNodes, joinNodes, path, okTo); // propogate okTo
220 transition = node.getTransitions().get(1); // "error to" transition
221 tranNode = app.getNode(transition);
222 validateForkJoin(tranNode, app, forkNodes, joinNodes, path, false); // use false
223 }
224 else if (node instanceof DecisionNodeDef) {
225 for(String transition : (new HashSet<String>(node.getTransitions()))) {
226 NodeDef tranNode = app.getNode(transition);
227 validateForkJoin(tranNode, app, forkNodes, joinNodes, path, okTo);
228 }
229 }
230 else if (node instanceof ForkNodeDef) {
231 forkNodes.push(node.getName());
232 for(String transition : (new HashSet<String>(node.getTransitions()))) {
233 NodeDef tranNode = app.getNode(transition);
234 validateForkJoin(tranNode, app, forkNodes, joinNodes, path, okTo);
235 }
236 forkNodes.pop();
237 if (!joinNodes.isEmpty()) {
238 joinNodes.pop();
239 }
240 }
241 else if (node instanceof JoinNodeDef) {
242 if (forkNodes.isEmpty()) {
243 // no fork for join to match with
244 throw new WorkflowException(ErrorCode.E0742, node.getName());
245 }
246 if (forkNodes.size() > joinNodes.size() && (joinNodes.isEmpty() || !joinNodes.peek().equals(node.getName()))) {
247 joinNodes.push(node.getName());
248 }
249 if (!joinNodes.peek().equals(node.getName())) {
250 // join doesn't match fork
251 throw new WorkflowException(ErrorCode.E0732, forkNodes.peek(), node.getName(), joinNodes.peek());
252 }
253 joinNodes.pop();
254 String currentForkNode = forkNodes.pop();
255 String transition = node.getTransitions().get(0); // join always has only 1 transition
256 NodeDef tranNode = app.getNode(transition);
257 // If we're already under a situation where okTo is false, use false (propogate it)
258 // Or if we've already visited this join node, use false (because we've already traversed this path before and we don't
259 // want to throw an exception from the check against visitedOkNodes)
260 if (!okTo || visitedJoinNodes.contains(node.getName())) {
261 validateForkJoin(tranNode, app, forkNodes, joinNodes, path, false);
262 // Else, use true because this is either the first time we've gone through this join node or okTo was already false
263 } else {
264 visitedJoinNodes.add(node.getName());
265 validateForkJoin(tranNode, app, forkNodes, joinNodes, path, true);
266 }
267 forkNodes.push(currentForkNode);
268 joinNodes.push(node.getName());
269 }
270 else if (node instanceof KillNodeDef) {
271 // do nothing
272 }
273 else if (node instanceof EndNodeDef) {
274 if (!forkNodes.isEmpty()) {
275 path.pop(); // = node
276 String parent = path.peek();
277 // can't go to an end node in a fork
278 throw new WorkflowException(ErrorCode.E0737, parent, node.getName());
279 }
280 }
281 else {
282 // invalid node type (shouldn't happen)
283 throw new WorkflowException(ErrorCode.E0740, node.getName());
284 }
285 path.pop();
286 }
287
288 /**
289 * Parse xml to {@link LiteWorkflowApp}
290 *
291 * @param strDef
292 * @param root
293 * @return LiteWorkflowApp
294 * @throws WorkflowException
295 */
296 @SuppressWarnings({"unchecked", "ConstantConditions"})
297 private LiteWorkflowApp parse(String strDef, Element root) throws WorkflowException {
298 Namespace ns = root.getNamespace();
299 LiteWorkflowApp def = null;
300 Element global = null;
301 for (Element eNode : (List<Element>) root.getChildren()) {
302 if (eNode.getName().equals(START_E)) {
303 def = new LiteWorkflowApp(root.getAttributeValue(NAME_A), strDef,
304 new StartNodeDef(controlNodeHandler, eNode.getAttributeValue(TO_A)));
305 }
306 else {
307 if (eNode.getName().equals(END_E)) {
308 def.addNode(new EndNodeDef(eNode.getAttributeValue(NAME_A), controlNodeHandler));
309 }
310 else {
311 if (eNode.getName().equals(KILL_E)) {
312 def.addNode(new KillNodeDef(eNode.getAttributeValue(NAME_A),
313 eNode.getChildText(KILL_MESSAGE_E, ns), controlNodeHandler));
314 }
315 else {
316 if (eNode.getName().equals(FORK_E)) {
317 List<String> paths = new ArrayList<String>();
318 for (Element tran : (List<Element>) eNode.getChildren(FORK_PATH_E, ns)) {
319 paths.add(tran.getAttributeValue(FORK_START_A));
320 }
321 def.addNode(new ForkNodeDef(eNode.getAttributeValue(NAME_A), controlNodeHandler, paths));
322 }
323 else {
324 if (eNode.getName().equals(JOIN_E)) {
325 def.addNode(new JoinNodeDef(eNode.getAttributeValue(NAME_A), controlNodeHandler,
326 eNode.getAttributeValue(TO_A)));
327 }
328 else {
329 if (eNode.getName().equals(DECISION_E)) {
330 Element eSwitch = eNode.getChild(DECISION_SWITCH_E, ns);
331 List<String> transitions = new ArrayList<String>();
332 for (Element e : (List<Element>) eSwitch.getChildren(DECISION_CASE_E, ns)) {
333 transitions.add(e.getAttributeValue(TO_A));
334 }
335 transitions.add(eSwitch.getChild(DECISION_DEFAULT_E, ns).getAttributeValue(TO_A));
336
337 String switchStatement = XmlUtils.prettyPrint(eSwitch).toString();
338 def.addNode(new DecisionNodeDef(eNode.getAttributeValue(NAME_A), switchStatement, decisionHandlerClass,
339 transitions));
340 }
341 else {
342 if (ACTION_E.equals(eNode.getName())) {
343 String[] transitions = new String[2];
344 Element eActionConf = null;
345 for (Element elem : (List<Element>) eNode.getChildren()) {
346 if (ACTION_OK_E.equals(elem.getName())) {
347 transitions[0] = elem.getAttributeValue(TO_A);
348 }
349 else {
350 if (ACTION_ERROR_E.equals(elem.getName())) {
351 transitions[1] = elem.getAttributeValue(TO_A);
352 }
353 else {
354 if (SLA_INFO.equals(elem.getName()) || CREDENTIALS.equals(elem.getName())) {
355 continue;
356 }
357 else {
358 eActionConf = elem;
359 handleGlobal(ns, global, elem);
360 }
361 }
362 }
363 }
364
365 String credStr = eNode.getAttributeValue(CRED_A);
366 String userRetryMaxStr = eNode.getAttributeValue(USER_RETRY_MAX_A);
367 String userRetryIntervalStr = eNode.getAttributeValue(USER_RETRY_INTERVAL_A);
368
369 String actionConf = XmlUtils.prettyPrint(eActionConf).toString();
370 def.addNode(new ActionNodeDef(eNode.getAttributeValue(NAME_A), actionConf, actionHandlerClass,
371 transitions[0], transitions[1], credStr,
372 userRetryMaxStr, userRetryIntervalStr));
373 }
374 else {
375 if (SLA_INFO.equals(eNode.getName()) || CREDENTIALS.equals(eNode.getName())) {
376 // No operation is required
377 }
378 else {
379 if (eNode.getName().equals(GLOBAL)) {
380 global = eNode;
381 }
382 else {
383 if (eNode.getName().equals(PARAMETERS)) {
384 // No operation is required
385 }
386 else {
387 throw new WorkflowException(ErrorCode.E0703, eNode.getName());
388 }
389 }
390 }
391 }
392 }
393 }
394 }
395 }
396 }
397 }
398 }
399 return def;
400 }
401
402 /**
403 * Validate workflow xml
404 *
405 * @param app
406 * @param node
407 * @param traversed
408 * @throws WorkflowException
409 */
410 private void validate(LiteWorkflowApp app, NodeDef node, Map<String, VisitStatus> traversed) throws WorkflowException {
411 if (node instanceof StartNodeDef) {
412 startNode = (StartNodeDef) node;
413 }
414 else {
415 try {
416 ParamChecker.validateActionName(node.getName());
417 }
418 catch (IllegalArgumentException ex) {
419 throw new WorkflowException(ErrorCode.E0724, ex.getMessage());
420 }
421 }
422 if (node instanceof ActionNodeDef) {
423 try {
424 Element action = XmlUtils.parseXml(node.getConf());
425 boolean supportedAction = Services.get().get(ActionService.class).getExecutor(action.getName()) != null;
426 if (!supportedAction) {
427 throw new WorkflowException(ErrorCode.E0723, node.getName(), action.getName());
428 }
429 }
430 catch (JDOMException ex) {
431 throw new RuntimeException("It should never happen, " + ex.getMessage(), ex);
432 }
433 }
434
435 if(node instanceof ForkNodeDef){
436 forkList.add(node.getName());
437 }
438
439 if(node instanceof JoinNodeDef){
440 joinList.add(node.getName());
441 }
442
443 if (node instanceof EndNodeDef) {
444 traversed.put(node.getName(), VisitStatus.VISITED);
445 return;
446 }
447 if (node instanceof KillNodeDef) {
448 traversed.put(node.getName(), VisitStatus.VISITED);
449 return;
450 }
451 for (String transition : node.getTransitions()) {
452
453 if (app.getNode(transition) == null) {
454 throw new WorkflowException(ErrorCode.E0708, node.getName(), transition);
455 }
456
457 //check if it is a cycle
458 if (traversed.get(app.getNode(transition).getName()) == VisitStatus.VISITING) {
459 throw new WorkflowException(ErrorCode.E0707, app.getNode(transition).getName());
460 }
461 //ignore validated one
462 if (traversed.get(app.getNode(transition).getName()) == VisitStatus.VISITED) {
463 continue;
464 }
465
466 traversed.put(app.getNode(transition).getName(), VisitStatus.VISITING);
467 validate(app, app.getNode(transition), traversed);
468 }
469 traversed.put(node.getName(), VisitStatus.VISITED);
470 }
471
472 /**
473 * Handle the global section
474 *
475 * @param ns
476 * @param global
477 * @param eActionConf
478 * @throws WorkflowException
479 */
480
481 private void handleGlobal(Namespace ns, Element global, Element eActionConf) throws WorkflowException {
482
483 // Use the action's namespace when getting children of the action (will be different than ns for extension actions)
484 Namespace actionNs = eActionConf.getNamespace();
485
486 if (global != null) {
487 Element globalJobTracker = global.getChild("job-tracker", ns);
488 Element globalNameNode = global.getChild("name-node", ns);
489 List<Element> globalJobXml = global.getChildren("job-xml", ns);
490 Element globalConfiguration = global.getChild("configuration", ns);
491
492 if (globalJobTracker != null && eActionConf.getChild("job-tracker", actionNs) == null) {
493 Element jobTracker = new Element("job-tracker", actionNs);
494 jobTracker.setText(globalJobTracker.getText());
495 eActionConf.addContent(jobTracker);
496 }
497
498 if (globalNameNode != null && eActionConf.getChild("name-node", actionNs) == null) {
499 Element nameNode = new Element("name-node", actionNs);
500 nameNode.setText(globalNameNode.getText());
501 eActionConf.addContent(nameNode);
502 }
503
504 if (!globalJobXml.isEmpty()) {
505 List<Element> actionJobXml = eActionConf.getChildren("job-xml", actionNs);
506 for(Element jobXml: globalJobXml){
507 boolean alreadyExists = false;
508 for(Element actionXml: actionJobXml){
509 if(jobXml.getText().equals(actionXml.getText())){
510 alreadyExists = true;
511 }
512 }
513
514 if (!alreadyExists){
515 Element ejobXml = new Element("job-xml", actionNs);
516 ejobXml.setText(jobXml.getText());
517 eActionConf.addContent(ejobXml);
518 }
519
520 }
521 }
522
523 if (globalConfiguration != null) {
524 Element actionConfiguration = eActionConf.getChild("configuration", actionNs);
525 if (actionConfiguration == null) {
526 actionConfiguration = new Element("configuration", actionNs);
527 eActionConf.addContent(actionConfiguration);
528 }
529 for (Element globalConfig : (List<Element>) globalConfiguration.getChildren()) {
530 boolean isSet = false;
531 String globalVarName = globalConfig.getChildText("name", ns);
532 for (Element local : (List<Element>) actionConfiguration.getChildren()) {
533 if (local.getChildText("name", actionNs).equals(globalVarName)) {
534 isSet = true;
535 }
536 }
537 if (!isSet) {
538 Element varToCopy = new Element("property", actionNs);
539 Element varName = new Element("name", actionNs);
540 Element varValue = new Element("value", actionNs);
541
542 varName.setText(globalConfig.getChildText("name", ns));
543 varValue.setText(globalConfig.getChildText("value", ns));
544
545 varToCopy.addContent(varName);
546 varToCopy.addContent(varValue);
547
548 actionConfiguration.addContent(varToCopy);
549 }
550 }
551 }
552 }
553 else {
554 ActionExecutor ae = Services.get().get(ActionService.class).getExecutor(eActionConf.getName());
555 if (ae == null) {
556 throw new WorkflowException(ErrorCode.E0723, eActionConf.getName(), ActionService.class.getName());
557 }
558 if (ae.requiresNNJT) {
559
560 if (eActionConf.getChild("name-node", actionNs) == null) {
561 throw new WorkflowException(ErrorCode.E0701, "No name-node defined");
562 }
563 if (eActionConf.getChild("job-tracker", actionNs) == null) {
564 throw new WorkflowException(ErrorCode.E0701, "No job-tracker defined");
565 }
566 }
567 }
568 }
569
570 }