001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.workflow.lite;
019    
020    import org.apache.oozie.workflow.WorkflowException;
021    import org.apache.oozie.util.IOUtils;
022    import org.apache.oozie.util.XmlUtils;
023    import org.apache.oozie.util.ParamChecker;
024    import org.apache.oozie.util.ParameterVerifier;
025    import org.apache.oozie.util.ParameterVerifierException;
026    import org.apache.oozie.ErrorCode;
027    import org.apache.oozie.action.ActionExecutor;
028    import org.apache.oozie.service.Services;
029    import org.apache.oozie.service.ActionService;
030    import org.apache.hadoop.conf.Configuration;
031    import org.jdom.Element;
032    import org.jdom.JDOMException;
033    import org.jdom.Namespace;
034    import org.xml.sax.SAXException;
035    
036    import javax.xml.transform.stream.StreamSource;
037    import javax.xml.validation.Schema;
038    import javax.xml.validation.Validator;
039    import java.io.IOException;
040    import java.io.Reader;
041    import java.io.StringReader;
042    import java.io.StringWriter;
043    import java.util.ArrayList;
044    import java.util.HashMap;
045    import java.util.HashSet;
046    import java.util.List;
047    import java.util.Map;
048    
049    /**
050     * Class to parse and validate workflow xml
051     */
052    public class LiteWorkflowAppParser {
053    
054        private static final String DECISION_E = "decision";
055        private static final String ACTION_E = "action";
056        private static final String END_E = "end";
057        private static final String START_E = "start";
058        private static final String JOIN_E = "join";
059        private static final String FORK_E = "fork";
060        private static final Object KILL_E = "kill";
061    
062        private static final String SLA_INFO = "info";
063        private static final String CREDENTIALS = "credentials";
064        private static final String GLOBAL = "global";
065        private static final String PARAMETERS = "parameters";
066    
067        private static final String NAME_A = "name";
068        private static final String CRED_A = "cred";
069        private static final String USER_RETRY_MAX_A = "retry-max";
070        private static final String USER_RETRY_INTERVAL_A = "retry-interval";
071        private static final String TO_A = "to";
072    
073        private static final String FORK_PATH_E = "path";
074        private static final String FORK_START_A = "start";
075    
076        private static final String ACTION_OK_E = "ok";
077        private static final String ACTION_ERROR_E = "error";
078    
079        private static final String DECISION_SWITCH_E = "switch";
080        private static final String DECISION_CASE_E = "case";
081        private static final String DECISION_DEFAULT_E = "default";
082    
083        private static final String KILL_MESSAGE_E = "message";
084        public static final String VALIDATE_FORK_JOIN = "oozie.validate.ForkJoin";
085    
086        private Schema schema;
087        private Class<? extends ControlNodeHandler> controlNodeHandler;
088        private Class<? extends DecisionNodeHandler> decisionHandlerClass;
089        private Class<? extends ActionNodeHandler> actionHandlerClass;
090    
091        private static enum VisitStatus {
092            VISITING, VISITED
093        }
094    
095        private List<String> forkList = new ArrayList<String>();
096        private List<String> joinList = new ArrayList<String>();
097    
098        public LiteWorkflowAppParser(Schema schema,
099                                     Class<? extends ControlNodeHandler> controlNodeHandler,
100                                     Class<? extends DecisionNodeHandler> decisionHandlerClass,
101                                     Class<? extends ActionNodeHandler> actionHandlerClass) throws WorkflowException {
102            this.schema = schema;
103            this.controlNodeHandler = controlNodeHandler;
104            this.decisionHandlerClass = decisionHandlerClass;
105            this.actionHandlerClass = actionHandlerClass;
106        }
107    
108        /**
109         * Parse and validate xml to {@link LiteWorkflowApp}
110         *
111         * @param reader
112         * @return LiteWorkflowApp
113         * @throws WorkflowException
114         */
115        public LiteWorkflowApp validateAndParse(Reader reader, Configuration jobConf) throws WorkflowException {
116            try {
117                StringWriter writer = new StringWriter();
118                IOUtils.copyCharStream(reader, writer);
119                String strDef = writer.toString();
120    
121                if (schema != null) {
122                    Validator validator = schema.newValidator();
123                    validator.validate(new StreamSource(new StringReader(strDef)));
124                }
125    
126                Element wfDefElement = XmlUtils.parseXml(strDef);
127                ParameterVerifier.verifyParameters(jobConf, wfDefElement);
128                LiteWorkflowApp app = parse(strDef, wfDefElement);
129                Map<String, VisitStatus> traversed = new HashMap<String, VisitStatus>();
130                traversed.put(app.getNode(StartNodeDef.START).getName(), VisitStatus.VISITING);
131                validate(app, app.getNode(StartNodeDef.START), traversed);
132                //Validate whether fork/join are in pair or not
133                if (Services.get().getConf().getBoolean(VALIDATE_FORK_JOIN, true)) {
134                    validateForkJoin(app);
135                }
136                return app;
137            }
138            catch (ParameterVerifierException ex) {
139                throw new WorkflowException(ex);
140            }
141            catch (JDOMException ex) {
142                throw new WorkflowException(ErrorCode.E0700, ex.getMessage(), ex);
143            }
144            catch (SAXException ex) {
145                throw new WorkflowException(ErrorCode.E0701, ex.getMessage(), ex);
146            }
147            catch (IOException ex) {
148                throw new WorkflowException(ErrorCode.E0702, ex.getMessage(), ex);
149            }
150        }
151    
152        /**
153         * Validate whether fork/join are in pair or not
154         * @param app LiteWorkflowApp
155         * @throws WorkflowException
156         */
157        private void validateForkJoin(LiteWorkflowApp app) throws WorkflowException {
158            // Make sure the number of forks and joins in wf are equal
159            if (forkList.size() != joinList.size()) {
160                throw new WorkflowException(ErrorCode.E0730);
161            }
162    
163            while(!forkList.isEmpty()){
164                // Make sure each of the fork node has a corresponding join; start with the root fork Node first
165                validateFork(app.getNode(forkList.remove(0)), app);
166            }
167    
168        }
169    
170        /*
171         * Test whether the fork node has a corresponding join
172         * @param node - the fork node
173         * @param app - the WorkflowApp
174         * @return
175         * @throws WorkflowException
176         */
177        private NodeDef validateFork(NodeDef forkNode, LiteWorkflowApp app) throws WorkflowException {
178            List<String> transitions = new ArrayList<String>(forkNode.getTransitions());
179            // list for keeping track of "error-to" transitions of Action Node
180            List<String> errorToTransitions = new ArrayList<String>();
181            String joinNode = null;
182            for (int i = 0; i < transitions.size(); i++) {
183                NodeDef node = app.getNode(transitions.get(i));
184                if (node instanceof DecisionNodeDef) {
185                    // Make sure the transition is valid
186                    validateTransition(errorToTransitions, transitions, app, node);
187                    // Add each transition to transitions (once) if they are not a kill node
188                    HashSet<String> decisionSet = new HashSet<String>(node.getTransitions());
189                    for (String ds : decisionSet) {
190                        if (!(app.getNode(ds) instanceof KillNodeDef)) {
191                            transitions.add(ds);
192                        }
193                    }
194                } else if (node instanceof ActionNodeDef) {
195                    // Make sure the transition is valid
196                    validateTransition(errorToTransitions, transitions, app, node);
197                    // Add the "ok-to" transition of node if its not a kill node
198                    String okTo = node.getTransitions().get(0);
199                    if (!(app.getNode(okTo) instanceof KillNodeDef)) {
200                        transitions.add(okTo);
201                    }
202                    String errorTo = node.getTransitions().get(1);
203                    // Add the "error-to" transition if the transition is a Action Node
204                    if (app.getNode(errorTo) instanceof ActionNodeDef) {
205                        errorToTransitions.add(errorTo);
206                    }
207                } else if (node instanceof ForkNodeDef) {
208                    forkList.remove(node.getName());
209                    // Make a recursive call to resolve this fork node
210                    NodeDef joinNd = validateFork(node, app);
211                    // Make sure the transition is valid
212                    validateTransition(errorToTransitions, transitions, app, node);
213                    // Add the "ok-to" transition of node
214                    transitions.add(joinNd.getTransitions().get(0));
215                } else if (node instanceof JoinNodeDef) {
216                    // If joinNode encountered for the first time, remove it from the joinList and remember it
217                    String currentJoin = node.getName();
218                    if (joinList.contains(currentJoin)) {
219                        joinList.remove(currentJoin);
220                        joinNode = currentJoin;
221                    } else {
222                        // Make sure this join is the same as the join seen from the first time
223                        if (joinNode == null) {
224                            throw new WorkflowException(ErrorCode.E0733, forkNode);
225                        }
226                        if (!joinNode.equals(currentJoin)) {
227                            throw new WorkflowException(ErrorCode.E0732, forkNode, joinNode);
228                        }
229                    }
230                } else {
231                    throw new WorkflowException(ErrorCode.E0730);
232                }
233            }
234            return app.getNode(joinNode);
235    
236        }
237    
238        private void validateTransition(List<String> errorToTransitions, List<String> transitions, LiteWorkflowApp app, NodeDef node)
239                throws WorkflowException {
240            for (String transition : node.getTransitions()) {
241                // Make sure the transition node is not an end node
242                NodeDef tNode = app.getNode(transition);
243                if (tNode instanceof EndNodeDef) {
244                    throw new WorkflowException(ErrorCode.E0737, node.getName(), transition, "end");
245                }
246                // Make sure the transition node is either a join node or is not already visited
247                if (transitions.contains(transition) && !(tNode instanceof JoinNodeDef)) {
248                        throw new WorkflowException(ErrorCode.E0734, node.getName(), transition);
249                    }
250                    // Make sure the transition node is not the same as an already visited 'error-to' transition
251                    if (errorToTransitions.contains(transition)) {
252                        throw new WorkflowException(ErrorCode.E0735, node.getName(), transition);
253                    }
254                }
255    
256        }
257    
258        /**
259         * Parse xml to {@link LiteWorkflowApp}
260         *
261         * @param strDef
262         * @param root
263         * @return LiteWorkflowApp
264         * @throws WorkflowException
265         */
266        @SuppressWarnings({"unchecked", "ConstantConditions"})
267        private LiteWorkflowApp parse(String strDef, Element root) throws WorkflowException {
268            Namespace ns = root.getNamespace();
269            LiteWorkflowApp def = null;
270            Element global = null;
271            for (Element eNode : (List<Element>) root.getChildren()) {
272                if (eNode.getName().equals(START_E)) {
273                    def = new LiteWorkflowApp(root.getAttributeValue(NAME_A), strDef,
274                                              new StartNodeDef(controlNodeHandler, eNode.getAttributeValue(TO_A)));
275                }
276                else {
277                    if (eNode.getName().equals(END_E)) {
278                        def.addNode(new EndNodeDef(eNode.getAttributeValue(NAME_A), controlNodeHandler));
279                    }
280                    else {
281                        if (eNode.getName().equals(KILL_E)) {
282                            def.addNode(new KillNodeDef(eNode.getAttributeValue(NAME_A),
283                                                        eNode.getChildText(KILL_MESSAGE_E, ns), controlNodeHandler));
284                        }
285                        else {
286                            if (eNode.getName().equals(FORK_E)) {
287                                List<String> paths = new ArrayList<String>();
288                                for (Element tran : (List<Element>) eNode.getChildren(FORK_PATH_E, ns)) {
289                                    paths.add(tran.getAttributeValue(FORK_START_A));
290                                }
291                                def.addNode(new ForkNodeDef(eNode.getAttributeValue(NAME_A), controlNodeHandler, paths));
292                            }
293                            else {
294                                if (eNode.getName().equals(JOIN_E)) {
295                                    def.addNode(new JoinNodeDef(eNode.getAttributeValue(NAME_A), controlNodeHandler,
296                                                                eNode.getAttributeValue(TO_A)));
297                                }
298                                else {
299                                    if (eNode.getName().equals(DECISION_E)) {
300                                        Element eSwitch = eNode.getChild(DECISION_SWITCH_E, ns);
301                                        List<String> transitions = new ArrayList<String>();
302                                        for (Element e : (List<Element>) eSwitch.getChildren(DECISION_CASE_E, ns)) {
303                                            transitions.add(e.getAttributeValue(TO_A));
304                                        }
305                                        transitions.add(eSwitch.getChild(DECISION_DEFAULT_E, ns).getAttributeValue(TO_A));
306    
307                                        String switchStatement = XmlUtils.prettyPrint(eSwitch).toString();
308                                        def.addNode(new DecisionNodeDef(eNode.getAttributeValue(NAME_A), switchStatement, decisionHandlerClass,
309                                                                        transitions));
310                                    }
311                                    else {
312                                        if (ACTION_E.equals(eNode.getName())) {
313                                            String[] transitions = new String[2];
314                                            Element eActionConf = null;
315                                            for (Element elem : (List<Element>) eNode.getChildren()) {
316                                                if (ACTION_OK_E.equals(elem.getName())) {
317                                                    transitions[0] = elem.getAttributeValue(TO_A);
318                                                }
319                                                else {
320                                                    if (ACTION_ERROR_E.equals(elem.getName())) {
321                                                        transitions[1] = elem.getAttributeValue(TO_A);
322                                                    }
323                                                    else {
324                                                        if (SLA_INFO.equals(elem.getName()) || CREDENTIALS.equals(elem.getName())) {
325                                                            continue;
326                                                        }
327                                                        else {
328                                                            eActionConf = elem;
329                                                            handleGlobal(ns, global, elem);
330                                                            }
331                                                    }
332                                                }
333                                            }
334    
335                                            String credStr = eNode.getAttributeValue(CRED_A);
336                                            String userRetryMaxStr = eNode.getAttributeValue(USER_RETRY_MAX_A);
337                                            String userRetryIntervalStr = eNode.getAttributeValue(USER_RETRY_INTERVAL_A);
338    
339                                            String actionConf = XmlUtils.prettyPrint(eActionConf).toString();
340                                            def.addNode(new ActionNodeDef(eNode.getAttributeValue(NAME_A), actionConf, actionHandlerClass,
341                                                                          transitions[0], transitions[1], credStr,
342                                                                          userRetryMaxStr, userRetryIntervalStr));
343                                        }
344                                        else {
345                                            if (SLA_INFO.equals(eNode.getName()) || CREDENTIALS.equals(eNode.getName())) {
346                                                // No operation is required
347                                            }
348                                            else {
349                                                if (eNode.getName().equals(GLOBAL)) {
350                                                    global = eNode;
351                                                }
352                                                else {
353                                                    if (eNode.getName().equals(PARAMETERS)) {
354                                                        // No operation is required
355                                                    }
356                                                    else {
357                                                        throw new WorkflowException(ErrorCode.E0703, eNode.getName());
358                                                    }
359                                                }
360                                            }
361                                        }
362                                    }
363                                }
364                            }
365                        }
366                    }
367                }
368            }
369            return def;
370        }
371    
372        /**
373         * Validate workflow xml
374         *
375         * @param app
376         * @param node
377         * @param traversed
378         * @throws WorkflowException
379         */
380        private void validate(LiteWorkflowApp app, NodeDef node, Map<String, VisitStatus> traversed) throws WorkflowException {
381            if (!(node instanceof StartNodeDef)) {
382                try {
383                    ParamChecker.validateActionName(node.getName());
384                }
385                catch (IllegalArgumentException ex) {
386                    throw new WorkflowException(ErrorCode.E0724, ex.getMessage());
387                }
388            }
389            if (node instanceof ActionNodeDef) {
390                try {
391                    Element action = XmlUtils.parseXml(node.getConf());
392                    boolean supportedAction = Services.get().get(ActionService.class).getExecutor(action.getName()) != null;
393                    if (!supportedAction) {
394                        throw new WorkflowException(ErrorCode.E0723, node.getName(), action.getName());
395                    }
396                }
397                catch (JDOMException ex) {
398                    throw new RuntimeException("It should never happen, " + ex.getMessage(), ex);
399                }
400            }
401    
402            if(node instanceof ForkNodeDef){
403                forkList.add(node.getName());
404            }
405    
406            if(node instanceof JoinNodeDef){
407                joinList.add(node.getName());
408            }
409    
410            if (node instanceof EndNodeDef) {
411                traversed.put(node.getName(), VisitStatus.VISITED);
412                return;
413            }
414            if (node instanceof KillNodeDef) {
415                traversed.put(node.getName(), VisitStatus.VISITED);
416                return;
417            }
418            for (String transition : node.getTransitions()) {
419    
420                if (app.getNode(transition) == null) {
421                    throw new WorkflowException(ErrorCode.E0708, node.getName(), transition);
422                }
423    
424                //check if it is a cycle
425                if (traversed.get(app.getNode(transition).getName()) == VisitStatus.VISITING) {
426                    throw new WorkflowException(ErrorCode.E0707, app.getNode(transition).getName());
427                }
428                //ignore validated one
429                if (traversed.get(app.getNode(transition).getName()) == VisitStatus.VISITED) {
430                    continue;
431                }
432    
433                traversed.put(app.getNode(transition).getName(), VisitStatus.VISITING);
434                validate(app, app.getNode(transition), traversed);
435            }
436            traversed.put(node.getName(), VisitStatus.VISITED);
437        }
438    
439        /**
440         * Handle the global section
441         *
442         * @param ns
443         * @param global
444         * @param eActionConf
445         * @throws WorkflowException
446         */
447    
448        private void handleGlobal(Namespace ns, Element global, Element eActionConf) throws WorkflowException {
449    
450            // Use the action's namespace when getting children of the action (will be different than ns for extension actions)
451            Namespace actionNs = eActionConf.getNamespace();
452    
453            if (global != null) {
454                Element globalJobTracker = global.getChild("job-tracker", ns);
455                Element globalNameNode = global.getChild("name-node", ns);
456                List<Element> globalJobXml = global.getChildren("job-xml", ns);
457                Element globalConfiguration = global.getChild("configuration", ns);
458    
459                if (globalJobTracker != null && eActionConf.getChild("job-tracker", actionNs) == null) {
460                    Element jobTracker = new Element("job-tracker", actionNs);
461                    jobTracker.setText(globalJobTracker.getText());
462                    eActionConf.addContent(jobTracker);
463                }
464    
465                if (globalNameNode != null && eActionConf.getChild("name-node", actionNs) == null) {
466                    Element nameNode = new Element("name-node", actionNs);
467                    nameNode.setText(globalNameNode.getText());
468                    eActionConf.addContent(nameNode);
469                }
470    
471                if (!globalJobXml.isEmpty()) {
472                    List<Element> actionJobXml = eActionConf.getChildren("job-xml", actionNs);
473                    for(Element jobXml: globalJobXml){
474                        boolean alreadyExists = false;
475                        for(Element actionXml: actionJobXml){
476                            if(jobXml.getText().equals(actionXml.getText())){
477                                alreadyExists = true;
478                            }
479                        }
480    
481                        if (!alreadyExists){
482                            Element ejobXml = new Element("job-xml", actionNs);
483                            ejobXml.setText(jobXml.getText());
484                            eActionConf.addContent(ejobXml);
485                        }
486    
487                    }
488                }
489    
490                if (globalConfiguration != null) {
491                    Element actionConfiguration = eActionConf.getChild("configuration", actionNs);
492                    if (actionConfiguration == null) {
493                        actionConfiguration = new Element("configuration", actionNs);
494                        eActionConf.addContent(actionConfiguration);
495                    }
496                    for (Element globalConfig : (List<Element>) globalConfiguration.getChildren()) {
497                        boolean isSet = false;
498                        String globalVarName = globalConfig.getChildText("name", ns);
499                        for (Element local : (List<Element>) actionConfiguration.getChildren()) {
500                            if (local.getChildText("name", actionNs).equals(globalVarName)) {
501                                isSet = true;
502                            }
503                        }
504                        if (!isSet) {
505                            Element varToCopy = new Element("property", actionNs);
506                            Element varName = new Element("name", actionNs);
507                            Element varValue = new Element("value", actionNs);
508    
509                            varName.setText(globalConfig.getChildText("name", ns));
510                            varValue.setText(globalConfig.getChildText("value", ns));
511    
512                            varToCopy.addContent(varName);
513                            varToCopy.addContent(varValue);
514    
515                            actionConfiguration.addContent(varToCopy);
516                        }
517                    }
518                }
519            }
520            else {
521                ActionExecutor ae = Services.get().get(ActionService.class).getExecutor(eActionConf.getName());
522                if (ae == null) {
523                    throw new WorkflowException(ErrorCode.E0723, "Unsupported action type");
524                }
525                if (ae.requiresNNJT) {
526    
527                    if (eActionConf.getChild("name-node", actionNs) == null) {
528                        throw new WorkflowException(ErrorCode.E0701, "No name-node defined");
529                    }
530                    if (eActionConf.getChild("job-tracker", actionNs) == null) {
531                        throw new WorkflowException(ErrorCode.E0701, "No job-tracker defined");
532                    }
533                }
534            }
535        }
536    
537    }