This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.workflow.lite;
019
020 import org.apache.oozie.workflow.WorkflowException;
021 import org.apache.oozie.util.IOUtils;
022 import org.apache.oozie.util.XmlUtils;
023 import org.apache.oozie.util.ParamChecker;
024 import org.apache.oozie.util.ParameterVerifier;
025 import org.apache.oozie.util.ParameterVerifierException;
026 import org.apache.oozie.ErrorCode;
027 import org.apache.oozie.action.ActionExecutor;
028 import org.apache.oozie.service.Services;
029 import org.apache.oozie.service.ActionService;
030 import org.apache.hadoop.conf.Configuration;
031 import org.jdom.Element;
032 import org.jdom.JDOMException;
033 import org.jdom.Namespace;
034 import org.xml.sax.SAXException;
035
036 import javax.xml.transform.stream.StreamSource;
037 import javax.xml.validation.Schema;
038 import javax.xml.validation.Validator;
039 import java.io.IOException;
040 import java.io.Reader;
041 import java.io.StringReader;
042 import java.io.StringWriter;
043 import java.util.ArrayList;
044 import java.util.HashMap;
045 import java.util.HashSet;
046 import java.util.List;
047 import java.util.Map;
048
049 /**
050 * Class to parse and validate workflow xml
051 */
052 public class LiteWorkflowAppParser {
053
054 private static final String DECISION_E = "decision";
055 private static final String ACTION_E = "action";
056 private static final String END_E = "end";
057 private static final String START_E = "start";
058 private static final String JOIN_E = "join";
059 private static final String FORK_E = "fork";
060 private static final Object KILL_E = "kill";
061
062 private static final String SLA_INFO = "info";
063 private static final String CREDENTIALS = "credentials";
064 private static final String GLOBAL = "global";
065 private static final String PARAMETERS = "parameters";
066
067 private static final String NAME_A = "name";
068 private static final String CRED_A = "cred";
069 private static final String USER_RETRY_MAX_A = "retry-max";
070 private static final String USER_RETRY_INTERVAL_A = "retry-interval";
071 private static final String TO_A = "to";
072
073 private static final String FORK_PATH_E = "path";
074 private static final String FORK_START_A = "start";
075
076 private static final String ACTION_OK_E = "ok";
077 private static final String ACTION_ERROR_E = "error";
078
079 private static final String DECISION_SWITCH_E = "switch";
080 private static final String DECISION_CASE_E = "case";
081 private static final String DECISION_DEFAULT_E = "default";
082
083 private static final String KILL_MESSAGE_E = "message";
084 public static final String VALIDATE_FORK_JOIN = "oozie.validate.ForkJoin";
085
086 private Schema schema;
087 private Class<? extends ControlNodeHandler> controlNodeHandler;
088 private Class<? extends DecisionNodeHandler> decisionHandlerClass;
089 private Class<? extends ActionNodeHandler> actionHandlerClass;
090
091 private static enum VisitStatus {
092 VISITING, VISITED
093 }
094
095 private List<String> forkList = new ArrayList<String>();
096 private List<String> joinList = new ArrayList<String>();
097
098 public LiteWorkflowAppParser(Schema schema,
099 Class<? extends ControlNodeHandler> controlNodeHandler,
100 Class<? extends DecisionNodeHandler> decisionHandlerClass,
101 Class<? extends ActionNodeHandler> actionHandlerClass) throws WorkflowException {
102 this.schema = schema;
103 this.controlNodeHandler = controlNodeHandler;
104 this.decisionHandlerClass = decisionHandlerClass;
105 this.actionHandlerClass = actionHandlerClass;
106 }
107
108 /**
109 * Parse and validate xml to {@link LiteWorkflowApp}
110 *
111 * @param reader
112 * @return LiteWorkflowApp
113 * @throws WorkflowException
114 */
115 public LiteWorkflowApp validateAndParse(Reader reader, Configuration jobConf) throws WorkflowException {
116 try {
117 StringWriter writer = new StringWriter();
118 IOUtils.copyCharStream(reader, writer);
119 String strDef = writer.toString();
120
121 if (schema != null) {
122 Validator validator = schema.newValidator();
123 validator.validate(new StreamSource(new StringReader(strDef)));
124 }
125
126 Element wfDefElement = XmlUtils.parseXml(strDef);
127 ParameterVerifier.verifyParameters(jobConf, wfDefElement);
128 LiteWorkflowApp app = parse(strDef, wfDefElement);
129 Map<String, VisitStatus> traversed = new HashMap<String, VisitStatus>();
130 traversed.put(app.getNode(StartNodeDef.START).getName(), VisitStatus.VISITING);
131 validate(app, app.getNode(StartNodeDef.START), traversed);
132 //Validate whether fork/join are in pair or not
133 if (Services.get().getConf().getBoolean(VALIDATE_FORK_JOIN, true)) {
134 validateForkJoin(app);
135 }
136 return app;
137 }
138 catch (ParameterVerifierException ex) {
139 throw new WorkflowException(ex);
140 }
141 catch (JDOMException ex) {
142 throw new WorkflowException(ErrorCode.E0700, ex.getMessage(), ex);
143 }
144 catch (SAXException ex) {
145 throw new WorkflowException(ErrorCode.E0701, ex.getMessage(), ex);
146 }
147 catch (IOException ex) {
148 throw new WorkflowException(ErrorCode.E0702, ex.getMessage(), ex);
149 }
150 }
151
152 /**
153 * Validate whether fork/join are in pair or not
154 * @param app LiteWorkflowApp
155 * @throws WorkflowException
156 */
157 private void validateForkJoin(LiteWorkflowApp app) throws WorkflowException {
158 // Make sure the number of forks and joins in wf are equal
159 if (forkList.size() != joinList.size()) {
160 throw new WorkflowException(ErrorCode.E0730);
161 }
162
163 while(!forkList.isEmpty()){
164 // Make sure each of the fork node has a corresponding join; start with the root fork Node first
165 validateFork(app.getNode(forkList.remove(0)), app);
166 }
167
168 }
169
170 /*
171 * Test whether the fork node has a corresponding join
172 * @param node - the fork node
173 * @param app - the WorkflowApp
174 * @return
175 * @throws WorkflowException
176 */
177 private NodeDef validateFork(NodeDef forkNode, LiteWorkflowApp app) throws WorkflowException {
178 List<String> transitions = new ArrayList<String>(forkNode.getTransitions());
179 // list for keeping track of "error-to" transitions of Action Node
180 List<String> errorToTransitions = new ArrayList<String>();
181 String joinNode = null;
182 for (int i = 0; i < transitions.size(); i++) {
183 NodeDef node = app.getNode(transitions.get(i));
184 if (node instanceof DecisionNodeDef) {
185 // Make sure the transition is valid
186 validateTransition(errorToTransitions, transitions, app, node);
187 // Add each transition to transitions (once) if they are not a kill node
188 HashSet<String> decisionSet = new HashSet<String>(node.getTransitions());
189 for (String ds : decisionSet) {
190 if (!(app.getNode(ds) instanceof KillNodeDef)) {
191 transitions.add(ds);
192 }
193 }
194 } else if (node instanceof ActionNodeDef) {
195 // Make sure the transition is valid
196 validateTransition(errorToTransitions, transitions, app, node);
197 // Add the "ok-to" transition of node if its not a kill node
198 String okTo = node.getTransitions().get(0);
199 if (!(app.getNode(okTo) instanceof KillNodeDef)) {
200 transitions.add(okTo);
201 }
202 String errorTo = node.getTransitions().get(1);
203 // Add the "error-to" transition if the transition is a Action Node
204 if (app.getNode(errorTo) instanceof ActionNodeDef) {
205 errorToTransitions.add(errorTo);
206 }
207 } else if (node instanceof ForkNodeDef) {
208 forkList.remove(node.getName());
209 // Make a recursive call to resolve this fork node
210 NodeDef joinNd = validateFork(node, app);
211 // Make sure the transition is valid
212 validateTransition(errorToTransitions, transitions, app, node);
213 // Add the "ok-to" transition of node
214 transitions.add(joinNd.getTransitions().get(0));
215 } else if (node instanceof JoinNodeDef) {
216 // If joinNode encountered for the first time, remove it from the joinList and remember it
217 String currentJoin = node.getName();
218 if (joinList.contains(currentJoin)) {
219 joinList.remove(currentJoin);
220 joinNode = currentJoin;
221 } else {
222 // Make sure this join is the same as the join seen from the first time
223 if (joinNode == null) {
224 throw new WorkflowException(ErrorCode.E0733, forkNode);
225 }
226 if (!joinNode.equals(currentJoin)) {
227 throw new WorkflowException(ErrorCode.E0732, forkNode, joinNode);
228 }
229 }
230 } else {
231 throw new WorkflowException(ErrorCode.E0730);
232 }
233 }
234 return app.getNode(joinNode);
235
236 }
237
238 private void validateTransition(List<String> errorToTransitions, List<String> transitions, LiteWorkflowApp app, NodeDef node)
239 throws WorkflowException {
240 for (String transition : node.getTransitions()) {
241 // Make sure the transition node is not an end node
242 NodeDef tNode = app.getNode(transition);
243 if (tNode instanceof EndNodeDef) {
244 throw new WorkflowException(ErrorCode.E0737, node.getName(), transition, "end");
245 }
246 // Make sure the transition node is either a join node or is not already visited
247 if (transitions.contains(transition) && !(tNode instanceof JoinNodeDef)) {
248 throw new WorkflowException(ErrorCode.E0734, node.getName(), transition);
249 }
250 // Make sure the transition node is not the same as an already visited 'error-to' transition
251 if (errorToTransitions.contains(transition)) {
252 throw new WorkflowException(ErrorCode.E0735, node.getName(), transition);
253 }
254 }
255
256 }
257
258 /**
259 * Parse xml to {@link LiteWorkflowApp}
260 *
261 * @param strDef
262 * @param root
263 * @return LiteWorkflowApp
264 * @throws WorkflowException
265 */
266 @SuppressWarnings({"unchecked", "ConstantConditions"})
267 private LiteWorkflowApp parse(String strDef, Element root) throws WorkflowException {
268 Namespace ns = root.getNamespace();
269 LiteWorkflowApp def = null;
270 Element global = null;
271 for (Element eNode : (List<Element>) root.getChildren()) {
272 if (eNode.getName().equals(START_E)) {
273 def = new LiteWorkflowApp(root.getAttributeValue(NAME_A), strDef,
274 new StartNodeDef(controlNodeHandler, eNode.getAttributeValue(TO_A)));
275 }
276 else {
277 if (eNode.getName().equals(END_E)) {
278 def.addNode(new EndNodeDef(eNode.getAttributeValue(NAME_A), controlNodeHandler));
279 }
280 else {
281 if (eNode.getName().equals(KILL_E)) {
282 def.addNode(new KillNodeDef(eNode.getAttributeValue(NAME_A),
283 eNode.getChildText(KILL_MESSAGE_E, ns), controlNodeHandler));
284 }
285 else {
286 if (eNode.getName().equals(FORK_E)) {
287 List<String> paths = new ArrayList<String>();
288 for (Element tran : (List<Element>) eNode.getChildren(FORK_PATH_E, ns)) {
289 paths.add(tran.getAttributeValue(FORK_START_A));
290 }
291 def.addNode(new ForkNodeDef(eNode.getAttributeValue(NAME_A), controlNodeHandler, paths));
292 }
293 else {
294 if (eNode.getName().equals(JOIN_E)) {
295 def.addNode(new JoinNodeDef(eNode.getAttributeValue(NAME_A), controlNodeHandler,
296 eNode.getAttributeValue(TO_A)));
297 }
298 else {
299 if (eNode.getName().equals(DECISION_E)) {
300 Element eSwitch = eNode.getChild(DECISION_SWITCH_E, ns);
301 List<String> transitions = new ArrayList<String>();
302 for (Element e : (List<Element>) eSwitch.getChildren(DECISION_CASE_E, ns)) {
303 transitions.add(e.getAttributeValue(TO_A));
304 }
305 transitions.add(eSwitch.getChild(DECISION_DEFAULT_E, ns).getAttributeValue(TO_A));
306
307 String switchStatement = XmlUtils.prettyPrint(eSwitch).toString();
308 def.addNode(new DecisionNodeDef(eNode.getAttributeValue(NAME_A), switchStatement, decisionHandlerClass,
309 transitions));
310 }
311 else {
312 if (ACTION_E.equals(eNode.getName())) {
313 String[] transitions = new String[2];
314 Element eActionConf = null;
315 for (Element elem : (List<Element>) eNode.getChildren()) {
316 if (ACTION_OK_E.equals(elem.getName())) {
317 transitions[0] = elem.getAttributeValue(TO_A);
318 }
319 else {
320 if (ACTION_ERROR_E.equals(elem.getName())) {
321 transitions[1] = elem.getAttributeValue(TO_A);
322 }
323 else {
324 if (SLA_INFO.equals(elem.getName()) || CREDENTIALS.equals(elem.getName())) {
325 continue;
326 }
327 else {
328 eActionConf = elem;
329 handleGlobal(ns, global, elem);
330 }
331 }
332 }
333 }
334
335 String credStr = eNode.getAttributeValue(CRED_A);
336 String userRetryMaxStr = eNode.getAttributeValue(USER_RETRY_MAX_A);
337 String userRetryIntervalStr = eNode.getAttributeValue(USER_RETRY_INTERVAL_A);
338
339 String actionConf = XmlUtils.prettyPrint(eActionConf).toString();
340 def.addNode(new ActionNodeDef(eNode.getAttributeValue(NAME_A), actionConf, actionHandlerClass,
341 transitions[0], transitions[1], credStr,
342 userRetryMaxStr, userRetryIntervalStr));
343 }
344 else {
345 if (SLA_INFO.equals(eNode.getName()) || CREDENTIALS.equals(eNode.getName())) {
346 // No operation is required
347 }
348 else {
349 if (eNode.getName().equals(GLOBAL)) {
350 global = eNode;
351 }
352 else {
353 if (eNode.getName().equals(PARAMETERS)) {
354 // No operation is required
355 }
356 else {
357 throw new WorkflowException(ErrorCode.E0703, eNode.getName());
358 }
359 }
360 }
361 }
362 }
363 }
364 }
365 }
366 }
367 }
368 }
369 return def;
370 }
371
372 /**
373 * Validate workflow xml
374 *
375 * @param app
376 * @param node
377 * @param traversed
378 * @throws WorkflowException
379 */
380 private void validate(LiteWorkflowApp app, NodeDef node, Map<String, VisitStatus> traversed) throws WorkflowException {
381 if (!(node instanceof StartNodeDef)) {
382 try {
383 ParamChecker.validateActionName(node.getName());
384 }
385 catch (IllegalArgumentException ex) {
386 throw new WorkflowException(ErrorCode.E0724, ex.getMessage());
387 }
388 }
389 if (node instanceof ActionNodeDef) {
390 try {
391 Element action = XmlUtils.parseXml(node.getConf());
392 boolean supportedAction = Services.get().get(ActionService.class).getExecutor(action.getName()) != null;
393 if (!supportedAction) {
394 throw new WorkflowException(ErrorCode.E0723, node.getName(), action.getName());
395 }
396 }
397 catch (JDOMException ex) {
398 throw new RuntimeException("It should never happen, " + ex.getMessage(), ex);
399 }
400 }
401
402 if(node instanceof ForkNodeDef){
403 forkList.add(node.getName());
404 }
405
406 if(node instanceof JoinNodeDef){
407 joinList.add(node.getName());
408 }
409
410 if (node instanceof EndNodeDef) {
411 traversed.put(node.getName(), VisitStatus.VISITED);
412 return;
413 }
414 if (node instanceof KillNodeDef) {
415 traversed.put(node.getName(), VisitStatus.VISITED);
416 return;
417 }
418 for (String transition : node.getTransitions()) {
419
420 if (app.getNode(transition) == null) {
421 throw new WorkflowException(ErrorCode.E0708, node.getName(), transition);
422 }
423
424 //check if it is a cycle
425 if (traversed.get(app.getNode(transition).getName()) == VisitStatus.VISITING) {
426 throw new WorkflowException(ErrorCode.E0707, app.getNode(transition).getName());
427 }
428 //ignore validated one
429 if (traversed.get(app.getNode(transition).getName()) == VisitStatus.VISITED) {
430 continue;
431 }
432
433 traversed.put(app.getNode(transition).getName(), VisitStatus.VISITING);
434 validate(app, app.getNode(transition), traversed);
435 }
436 traversed.put(node.getName(), VisitStatus.VISITED);
437 }
438
439 /**
440 * Handle the global section
441 *
442 * @param ns
443 * @param global
444 * @param eActionConf
445 * @throws WorkflowException
446 */
447
448 private void handleGlobal(Namespace ns, Element global, Element eActionConf) throws WorkflowException {
449
450 // Use the action's namespace when getting children of the action (will be different than ns for extension actions)
451 Namespace actionNs = eActionConf.getNamespace();
452
453 if (global != null) {
454 Element globalJobTracker = global.getChild("job-tracker", ns);
455 Element globalNameNode = global.getChild("name-node", ns);
456 List<Element> globalJobXml = global.getChildren("job-xml", ns);
457 Element globalConfiguration = global.getChild("configuration", ns);
458
459 if (globalJobTracker != null && eActionConf.getChild("job-tracker", actionNs) == null) {
460 Element jobTracker = new Element("job-tracker", actionNs);
461 jobTracker.setText(globalJobTracker.getText());
462 eActionConf.addContent(jobTracker);
463 }
464
465 if (globalNameNode != null && eActionConf.getChild("name-node", actionNs) == null) {
466 Element nameNode = new Element("name-node", actionNs);
467 nameNode.setText(globalNameNode.getText());
468 eActionConf.addContent(nameNode);
469 }
470
471 if (!globalJobXml.isEmpty()) {
472 List<Element> actionJobXml = eActionConf.getChildren("job-xml", actionNs);
473 for(Element jobXml: globalJobXml){
474 boolean alreadyExists = false;
475 for(Element actionXml: actionJobXml){
476 if(jobXml.getText().equals(actionXml.getText())){
477 alreadyExists = true;
478 }
479 }
480
481 if (!alreadyExists){
482 Element ejobXml = new Element("job-xml", actionNs);
483 ejobXml.setText(jobXml.getText());
484 eActionConf.addContent(ejobXml);
485 }
486
487 }
488 }
489
490 if (globalConfiguration != null) {
491 Element actionConfiguration = eActionConf.getChild("configuration", actionNs);
492 if (actionConfiguration == null) {
493 actionConfiguration = new Element("configuration", actionNs);
494 eActionConf.addContent(actionConfiguration);
495 }
496 for (Element globalConfig : (List<Element>) globalConfiguration.getChildren()) {
497 boolean isSet = false;
498 String globalVarName = globalConfig.getChildText("name", ns);
499 for (Element local : (List<Element>) actionConfiguration.getChildren()) {
500 if (local.getChildText("name", actionNs).equals(globalVarName)) {
501 isSet = true;
502 }
503 }
504 if (!isSet) {
505 Element varToCopy = new Element("property", actionNs);
506 Element varName = new Element("name", actionNs);
507 Element varValue = new Element("value", actionNs);
508
509 varName.setText(globalConfig.getChildText("name", ns));
510 varValue.setText(globalConfig.getChildText("value", ns));
511
512 varToCopy.addContent(varName);
513 varToCopy.addContent(varValue);
514
515 actionConfiguration.addContent(varToCopy);
516 }
517 }
518 }
519 }
520 else {
521 ActionExecutor ae = Services.get().get(ActionService.class).getExecutor(eActionConf.getName());
522 if (ae == null) {
523 throw new WorkflowException(ErrorCode.E0723, "Unsupported action type");
524 }
525 if (ae.requiresNNJT) {
526
527 if (eActionConf.getChild("name-node", actionNs) == null) {
528 throw new WorkflowException(ErrorCode.E0701, "No name-node defined");
529 }
530 if (eActionConf.getChild("job-tracker", actionNs) == null) {
531 throw new WorkflowException(ErrorCode.E0701, "No job-tracker defined");
532 }
533 }
534 }
535 }
536
537 }