001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.workflow.lite;
019    
020    import org.apache.oozie.workflow.WorkflowException;
021    import org.apache.oozie.util.IOUtils;
022    import org.apache.oozie.util.XmlUtils;
023    import org.apache.oozie.util.ParamChecker;
024    import org.apache.oozie.util.ParameterVerifier;
025    import org.apache.oozie.util.ParameterVerifierException;
026    import org.apache.oozie.ErrorCode;
027    import org.apache.oozie.action.ActionExecutor;
028    import org.apache.oozie.service.Services;
029    import org.apache.oozie.service.ActionService;
030    import org.apache.hadoop.conf.Configuration;
031    import org.jdom.Element;
032    import org.jdom.JDOMException;
033    import org.jdom.Namespace;
034    import org.xml.sax.SAXException;
035    
036    import javax.xml.transform.stream.StreamSource;
037    import javax.xml.validation.Schema;
038    import javax.xml.validation.Validator;
039    import java.io.IOException;
040    import java.io.Reader;
041    import java.io.StringReader;
042    import java.io.StringWriter;
043    import java.util.ArrayList;
044    import java.util.Arrays;
045    import java.util.Deque;
046    import java.util.HashMap;
047    import java.util.HashSet;
048    import java.util.LinkedList;
049    import java.util.List;
050    import java.util.Map;
051    
052    /**
053     * Class to parse and validate workflow xml
054     */
055    public class LiteWorkflowAppParser {
056    
057        private static final String DECISION_E = "decision";
058        private static final String ACTION_E = "action";
059        private static final String END_E = "end";
060        private static final String START_E = "start";
061        private static final String JOIN_E = "join";
062        private static final String FORK_E = "fork";
063        private static final Object KILL_E = "kill";
064    
065        private static final String SLA_INFO = "info";
066        private static final String CREDENTIALS = "credentials";
067        private static final String GLOBAL = "global";
068        private static final String PARAMETERS = "parameters";
069    
070        private static final String NAME_A = "name";
071        private static final String CRED_A = "cred";
072        private static final String USER_RETRY_MAX_A = "retry-max";
073        private static final String USER_RETRY_INTERVAL_A = "retry-interval";
074        private static final String TO_A = "to";
075    
076        private static final String FORK_PATH_E = "path";
077        private static final String FORK_START_A = "start";
078    
079        private static final String ACTION_OK_E = "ok";
080        private static final String ACTION_ERROR_E = "error";
081    
082        private static final String DECISION_SWITCH_E = "switch";
083        private static final String DECISION_CASE_E = "case";
084        private static final String DECISION_DEFAULT_E = "default";
085    
086        private static final String KILL_MESSAGE_E = "message";
087        public static final String VALIDATE_FORK_JOIN = "oozie.validate.ForkJoin";
088        public static final String WF_VALIDATE_FORK_JOIN = "oozie.wf.validate.ForkJoin";
089    
090        private Schema schema;
091        private Class<? extends ControlNodeHandler> controlNodeHandler;
092        private Class<? extends DecisionNodeHandler> decisionHandlerClass;
093        private Class<? extends ActionNodeHandler> actionHandlerClass;
094    
095        private static enum VisitStatus {
096            VISITING, VISITED
097        }
098    
099        private List<String> forkList = new ArrayList<String>();
100        private List<String> joinList = new ArrayList<String>();
101        private StartNodeDef startNode;
102        private List<String> visitedOkNodes = new ArrayList<String>();
103        private List<String> visitedJoinNodes = new ArrayList<String>();
104    
105        public LiteWorkflowAppParser(Schema schema,
106                                     Class<? extends ControlNodeHandler> controlNodeHandler,
107                                     Class<? extends DecisionNodeHandler> decisionHandlerClass,
108                                     Class<? extends ActionNodeHandler> actionHandlerClass) throws WorkflowException {
109            this.schema = schema;
110            this.controlNodeHandler = controlNodeHandler;
111            this.decisionHandlerClass = decisionHandlerClass;
112            this.actionHandlerClass = actionHandlerClass;
113        }
114    
115        /**
116         * Parse and validate xml to {@link LiteWorkflowApp}
117         *
118         * @param reader
119         * @return LiteWorkflowApp
120         * @throws WorkflowException
121         */
122        public LiteWorkflowApp validateAndParse(Reader reader, Configuration jobConf) throws WorkflowException {
123            try {
124                StringWriter writer = new StringWriter();
125                IOUtils.copyCharStream(reader, writer);
126                String strDef = writer.toString();
127    
128                if (schema != null) {
129                    Validator validator = schema.newValidator();
130                    validator.validate(new StreamSource(new StringReader(strDef)));
131                }
132    
133                Element wfDefElement = XmlUtils.parseXml(strDef);
134                ParameterVerifier.verifyParameters(jobConf, wfDefElement);
135                LiteWorkflowApp app = parse(strDef, wfDefElement);
136                Map<String, VisitStatus> traversed = new HashMap<String, VisitStatus>();
137                traversed.put(app.getNode(StartNodeDef.START).getName(), VisitStatus.VISITING);
138                validate(app, app.getNode(StartNodeDef.START), traversed);
139                //Validate whether fork/join are in pair or not
140                if (jobConf.getBoolean(WF_VALIDATE_FORK_JOIN, true) && Services.get().getConf().getBoolean(VALIDATE_FORK_JOIN, true)) {
141                    validateForkJoin(app);
142                }
143                return app;
144            }
145            catch (ParameterVerifierException ex) {
146                throw new WorkflowException(ex);
147            }
148            catch (JDOMException ex) {
149                throw new WorkflowException(ErrorCode.E0700, ex.getMessage(), ex);
150            }
151            catch (SAXException ex) {
152                throw new WorkflowException(ErrorCode.E0701, ex.getMessage(), ex);
153            }
154            catch (IOException ex) {
155                throw new WorkflowException(ErrorCode.E0702, ex.getMessage(), ex);
156            }
157        }
158    
159        /**
160         * Validate whether fork/join are in pair or not
161         * @param app LiteWorkflowApp
162         * @throws WorkflowException
163         */
164        private void validateForkJoin(LiteWorkflowApp app) throws WorkflowException {
165            // Make sure the number of forks and joins in wf are equal
166            if (forkList.size() != joinList.size()) {
167                throw new WorkflowException(ErrorCode.E0730);
168            }
169    
170            // No need to bother going through all of this if there are no fork/join nodes
171            if (!forkList.isEmpty()) {
172                visitedOkNodes.clear();
173                visitedJoinNodes.clear();
174                validateForkJoin(startNode, app, new LinkedList<String>(), new LinkedList<String>(), new LinkedList<String>(), true);
175            }
176        }
177    
178        /*
179         * Recursively walk through the DAG and make sure that all fork paths are valid.
180         * This should be called from validateForkJoin(LiteWorkflowApp app).  It assumes that visitedOkNodes and visitedJoinNodes are
181         * both empty ArrayLists on the first call.
182         *
183         * @param node the current node; use the startNode on the first call
184         * @param app the WorkflowApp
185         * @param forkNodes a stack of the current fork nodes
186         * @param joinNodes a stack of the current join nodes
187         * @param path a stack of the current path
188         * @param okTo false if node (or an ancestor of node) was gotten to via an "error to" transition or via a join node that has
189         * already been visited at least once before
190         * @throws WorkflowException
191         */
192        private void validateForkJoin(NodeDef node, LiteWorkflowApp app, Deque<String> forkNodes, Deque<String> joinNodes,
193                Deque<String> path, boolean okTo) throws WorkflowException {
194            if (path.contains(node.getName())) {
195                // cycle
196                throw new WorkflowException(ErrorCode.E0741, node.getName(), Arrays.toString(path.toArray()));
197            }
198            path.push(node.getName());
199    
200            // Make sure that we're not revisiting a node (that's not a Kill, Join, or End type) that's been visited before from an
201            // "ok to" transition; if its from an "error to" transition, then its okay to visit it multiple times.  Also, because we
202            // traverse through join nodes multiple times, we have to make sure not to throw an exception here when we're really just
203            // re-walking the same execution path (this is why we need the visitedJoinNodes list used later)
204            if (okTo && !(node instanceof KillNodeDef) && !(node instanceof JoinNodeDef) && !(node instanceof EndNodeDef)) {
205                if (visitedOkNodes.contains(node.getName())) {
206                    throw new WorkflowException(ErrorCode.E0743, node.getName());
207                }
208                visitedOkNodes.add(node.getName());
209            }
210    
211            if (node instanceof StartNodeDef) {
212                String transition = node.getTransitions().get(0);   // start always has only 1 transition
213                NodeDef tranNode = app.getNode(transition);
214                validateForkJoin(tranNode, app, forkNodes, joinNodes, path, okTo);
215            }
216            else if (node instanceof ActionNodeDef) {
217                String transition = node.getTransitions().get(0);   // "ok to" transition
218                NodeDef tranNode = app.getNode(transition);
219                validateForkJoin(tranNode, app, forkNodes, joinNodes, path, okTo);  // propogate okTo
220                transition = node.getTransitions().get(1);          // "error to" transition
221                tranNode = app.getNode(transition);
222                validateForkJoin(tranNode, app, forkNodes, joinNodes, path, false); // use false
223            }
224            else if (node instanceof DecisionNodeDef) {
225                for(String transition : (new HashSet<String>(node.getTransitions()))) {
226                    NodeDef tranNode = app.getNode(transition);
227                    validateForkJoin(tranNode, app, forkNodes, joinNodes, path, okTo);
228                }
229            }
230            else if (node instanceof ForkNodeDef) {
231                forkNodes.push(node.getName());
232                for(String transition : (new HashSet<String>(node.getTransitions()))) {
233                    NodeDef tranNode = app.getNode(transition);
234                    validateForkJoin(tranNode, app, forkNodes, joinNodes, path, okTo);
235                }
236                forkNodes.pop();
237                if (!joinNodes.isEmpty()) {
238                    joinNodes.pop();
239                }
240            }
241            else if (node instanceof JoinNodeDef) {
242                if (forkNodes.isEmpty()) {
243                    // no fork for join to match with
244                    throw new WorkflowException(ErrorCode.E0742, node.getName());
245                }
246                if (forkNodes.size() > joinNodes.size() && (joinNodes.isEmpty() || !joinNodes.peek().equals(node.getName()))) {
247                    joinNodes.push(node.getName());
248                }
249                if (!joinNodes.peek().equals(node.getName())) {
250                    // join doesn't match fork
251                    throw new WorkflowException(ErrorCode.E0732, forkNodes.peek(), node.getName(), joinNodes.peek());
252                }
253                joinNodes.pop();
254                String currentForkNode = forkNodes.pop();
255                String transition = node.getTransitions().get(0);   // join always has only 1 transition
256                NodeDef tranNode = app.getNode(transition);
257                // If we're already under a situation where okTo is false, use false (propogate it)
258                // Or if we've already visited this join node, use false (because we've already traversed this path before and we don't
259                // want to throw an exception from the check against visitedOkNodes)
260                if (!okTo || visitedJoinNodes.contains(node.getName())) {
261                    validateForkJoin(tranNode, app, forkNodes, joinNodes, path, false);
262                // Else, use true because this is either the first time we've gone through this join node or okTo was already false
263                } else {
264                    visitedJoinNodes.add(node.getName());
265                    validateForkJoin(tranNode, app, forkNodes, joinNodes, path, true);
266                }
267                forkNodes.push(currentForkNode);
268                joinNodes.push(node.getName());
269            }
270            else if (node instanceof KillNodeDef) {
271                // do nothing
272            }
273            else if (node instanceof EndNodeDef) {
274                if (!forkNodes.isEmpty()) {
275                    path.pop();     // = node
276                    String parent = path.peek();
277                    // can't go to an end node in a fork
278                    throw new WorkflowException(ErrorCode.E0737, parent, node.getName());
279                }
280            }
281            else {
282                // invalid node type (shouldn't happen)
283                throw new WorkflowException(ErrorCode.E0740, node.getName());
284            }
285            path.pop();
286        }
287    
288        /**
289         * Parse xml to {@link LiteWorkflowApp}
290         *
291         * @param strDef
292         * @param root
293         * @return LiteWorkflowApp
294         * @throws WorkflowException
295         */
296        @SuppressWarnings({"unchecked", "ConstantConditions"})
297        private LiteWorkflowApp parse(String strDef, Element root) throws WorkflowException {
298            Namespace ns = root.getNamespace();
299            LiteWorkflowApp def = null;
300            Element global = null;
301            for (Element eNode : (List<Element>) root.getChildren()) {
302                if (eNode.getName().equals(START_E)) {
303                    def = new LiteWorkflowApp(root.getAttributeValue(NAME_A), strDef,
304                                              new StartNodeDef(controlNodeHandler, eNode.getAttributeValue(TO_A)));
305                }
306                else {
307                    if (eNode.getName().equals(END_E)) {
308                        def.addNode(new EndNodeDef(eNode.getAttributeValue(NAME_A), controlNodeHandler));
309                    }
310                    else {
311                        if (eNode.getName().equals(KILL_E)) {
312                            def.addNode(new KillNodeDef(eNode.getAttributeValue(NAME_A),
313                                                        eNode.getChildText(KILL_MESSAGE_E, ns), controlNodeHandler));
314                        }
315                        else {
316                            if (eNode.getName().equals(FORK_E)) {
317                                List<String> paths = new ArrayList<String>();
318                                for (Element tran : (List<Element>) eNode.getChildren(FORK_PATH_E, ns)) {
319                                    paths.add(tran.getAttributeValue(FORK_START_A));
320                                }
321                                def.addNode(new ForkNodeDef(eNode.getAttributeValue(NAME_A), controlNodeHandler, paths));
322                            }
323                            else {
324                                if (eNode.getName().equals(JOIN_E)) {
325                                    def.addNode(new JoinNodeDef(eNode.getAttributeValue(NAME_A), controlNodeHandler,
326                                                                eNode.getAttributeValue(TO_A)));
327                                }
328                                else {
329                                    if (eNode.getName().equals(DECISION_E)) {
330                                        Element eSwitch = eNode.getChild(DECISION_SWITCH_E, ns);
331                                        List<String> transitions = new ArrayList<String>();
332                                        for (Element e : (List<Element>) eSwitch.getChildren(DECISION_CASE_E, ns)) {
333                                            transitions.add(e.getAttributeValue(TO_A));
334                                        }
335                                        transitions.add(eSwitch.getChild(DECISION_DEFAULT_E, ns).getAttributeValue(TO_A));
336    
337                                        String switchStatement = XmlUtils.prettyPrint(eSwitch).toString();
338                                        def.addNode(new DecisionNodeDef(eNode.getAttributeValue(NAME_A), switchStatement, decisionHandlerClass,
339                                                                        transitions));
340                                    }
341                                    else {
342                                        if (ACTION_E.equals(eNode.getName())) {
343                                            String[] transitions = new String[2];
344                                            Element eActionConf = null;
345                                            for (Element elem : (List<Element>) eNode.getChildren()) {
346                                                if (ACTION_OK_E.equals(elem.getName())) {
347                                                    transitions[0] = elem.getAttributeValue(TO_A);
348                                                }
349                                                else {
350                                                    if (ACTION_ERROR_E.equals(elem.getName())) {
351                                                        transitions[1] = elem.getAttributeValue(TO_A);
352                                                    }
353                                                    else {
354                                                        if (SLA_INFO.equals(elem.getName()) || CREDENTIALS.equals(elem.getName())) {
355                                                            continue;
356                                                        }
357                                                        else {
358                                                            eActionConf = elem;
359                                                            handleGlobal(ns, global, elem);
360                                                            }
361                                                    }
362                                                }
363                                            }
364    
365                                            String credStr = eNode.getAttributeValue(CRED_A);
366                                            String userRetryMaxStr = eNode.getAttributeValue(USER_RETRY_MAX_A);
367                                            String userRetryIntervalStr = eNode.getAttributeValue(USER_RETRY_INTERVAL_A);
368    
369                                            String actionConf = XmlUtils.prettyPrint(eActionConf).toString();
370                                            def.addNode(new ActionNodeDef(eNode.getAttributeValue(NAME_A), actionConf, actionHandlerClass,
371                                                                          transitions[0], transitions[1], credStr,
372                                                                          userRetryMaxStr, userRetryIntervalStr));
373                                        }
374                                        else {
375                                            if (SLA_INFO.equals(eNode.getName()) || CREDENTIALS.equals(eNode.getName())) {
376                                                // No operation is required
377                                            }
378                                            else {
379                                                if (eNode.getName().equals(GLOBAL)) {
380                                                    global = eNode;
381                                                }
382                                                else {
383                                                    if (eNode.getName().equals(PARAMETERS)) {
384                                                        // No operation is required
385                                                    }
386                                                    else {
387                                                        throw new WorkflowException(ErrorCode.E0703, eNode.getName());
388                                                    }
389                                                }
390                                            }
391                                        }
392                                    }
393                                }
394                            }
395                        }
396                    }
397                }
398            }
399            return def;
400        }
401    
402        /**
403         * Validate workflow xml
404         *
405         * @param app
406         * @param node
407         * @param traversed
408         * @throws WorkflowException
409         */
410        private void validate(LiteWorkflowApp app, NodeDef node, Map<String, VisitStatus> traversed) throws WorkflowException {
411            if (node instanceof StartNodeDef) {
412                startNode = (StartNodeDef) node;
413            }
414            else {
415                try {
416                    ParamChecker.validateActionName(node.getName());
417                }
418                catch (IllegalArgumentException ex) {
419                    throw new WorkflowException(ErrorCode.E0724, ex.getMessage());
420                }
421            }
422            if (node instanceof ActionNodeDef) {
423                try {
424                    Element action = XmlUtils.parseXml(node.getConf());
425                    boolean supportedAction = Services.get().get(ActionService.class).getExecutor(action.getName()) != null;
426                    if (!supportedAction) {
427                        throw new WorkflowException(ErrorCode.E0723, node.getName(), action.getName());
428                    }
429                }
430                catch (JDOMException ex) {
431                    throw new RuntimeException("It should never happen, " + ex.getMessage(), ex);
432                }
433            }
434    
435            if(node instanceof ForkNodeDef){
436                forkList.add(node.getName());
437            }
438    
439            if(node instanceof JoinNodeDef){
440                joinList.add(node.getName());
441            }
442    
443            if (node instanceof EndNodeDef) {
444                traversed.put(node.getName(), VisitStatus.VISITED);
445                return;
446            }
447            if (node instanceof KillNodeDef) {
448                traversed.put(node.getName(), VisitStatus.VISITED);
449                return;
450            }
451            for (String transition : node.getTransitions()) {
452    
453                if (app.getNode(transition) == null) {
454                    throw new WorkflowException(ErrorCode.E0708, node.getName(), transition);
455                }
456    
457                //check if it is a cycle
458                if (traversed.get(app.getNode(transition).getName()) == VisitStatus.VISITING) {
459                    throw new WorkflowException(ErrorCode.E0707, app.getNode(transition).getName());
460                }
461                //ignore validated one
462                if (traversed.get(app.getNode(transition).getName()) == VisitStatus.VISITED) {
463                    continue;
464                }
465    
466                traversed.put(app.getNode(transition).getName(), VisitStatus.VISITING);
467                validate(app, app.getNode(transition), traversed);
468            }
469            traversed.put(node.getName(), VisitStatus.VISITED);
470        }
471    
472        /**
473         * Handle the global section
474         *
475         * @param ns
476         * @param global
477         * @param eActionConf
478         * @throws WorkflowException
479         */
480    
481        private void handleGlobal(Namespace ns, Element global, Element eActionConf) throws WorkflowException {
482    
483            // Use the action's namespace when getting children of the action (will be different than ns for extension actions)
484            Namespace actionNs = eActionConf.getNamespace();
485    
486            if (global != null) {
487                Element globalJobTracker = global.getChild("job-tracker", ns);
488                Element globalNameNode = global.getChild("name-node", ns);
489                List<Element> globalJobXml = global.getChildren("job-xml", ns);
490                Element globalConfiguration = global.getChild("configuration", ns);
491    
492                if (globalJobTracker != null && eActionConf.getChild("job-tracker", actionNs) == null) {
493                    Element jobTracker = new Element("job-tracker", actionNs);
494                    jobTracker.setText(globalJobTracker.getText());
495                    eActionConf.addContent(jobTracker);
496                }
497    
498                if (globalNameNode != null && eActionConf.getChild("name-node", actionNs) == null) {
499                    Element nameNode = new Element("name-node", actionNs);
500                    nameNode.setText(globalNameNode.getText());
501                    eActionConf.addContent(nameNode);
502                }
503    
504                if (!globalJobXml.isEmpty()) {
505                    List<Element> actionJobXml = eActionConf.getChildren("job-xml", actionNs);
506                    for(Element jobXml: globalJobXml){
507                        boolean alreadyExists = false;
508                        for(Element actionXml: actionJobXml){
509                            if(jobXml.getText().equals(actionXml.getText())){
510                                alreadyExists = true;
511                            }
512                        }
513    
514                        if (!alreadyExists){
515                            Element ejobXml = new Element("job-xml", actionNs);
516                            ejobXml.setText(jobXml.getText());
517                            eActionConf.addContent(ejobXml);
518                        }
519    
520                    }
521                }
522    
523                if (globalConfiguration != null) {
524                    Element actionConfiguration = eActionConf.getChild("configuration", actionNs);
525                    if (actionConfiguration == null) {
526                        actionConfiguration = new Element("configuration", actionNs);
527                        eActionConf.addContent(actionConfiguration);
528                    }
529                    for (Element globalConfig : (List<Element>) globalConfiguration.getChildren()) {
530                        boolean isSet = false;
531                        String globalVarName = globalConfig.getChildText("name", ns);
532                        for (Element local : (List<Element>) actionConfiguration.getChildren()) {
533                            if (local.getChildText("name", actionNs).equals(globalVarName)) {
534                                isSet = true;
535                            }
536                        }
537                        if (!isSet) {
538                            Element varToCopy = new Element("property", actionNs);
539                            Element varName = new Element("name", actionNs);
540                            Element varValue = new Element("value", actionNs);
541    
542                            varName.setText(globalConfig.getChildText("name", ns));
543                            varValue.setText(globalConfig.getChildText("value", ns));
544    
545                            varToCopy.addContent(varName);
546                            varToCopy.addContent(varValue);
547    
548                            actionConfiguration.addContent(varToCopy);
549                        }
550                    }
551                }
552            }
553            else {
554                ActionExecutor ae = Services.get().get(ActionService.class).getExecutor(eActionConf.getName());
555                if (ae == null) {
556                    throw new WorkflowException(ErrorCode.E0723, eActionConf.getName(), ActionService.class.getName());
557                }
558                if (ae.requiresNNJT) {
559    
560                    if (eActionConf.getChild("name-node", actionNs) == null) {
561                        throw new WorkflowException(ErrorCode.E0701, "No name-node defined");
562                    }
563                    if (eActionConf.getChild("job-tracker", actionNs) == null) {
564                        throw new WorkflowException(ErrorCode.E0701, "No job-tracker defined");
565                    }
566                }
567            }
568        }
569    
570    }