001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.workflow.lite;
019    
020    import org.apache.oozie.workflow.WorkflowException;
021    import org.apache.oozie.util.IOUtils;
022    import org.apache.oozie.util.XmlUtils;
023    import org.apache.oozie.util.ParamChecker;
024    import org.apache.oozie.ErrorCode;
025    import org.apache.oozie.service.Services;
026    import org.apache.oozie.service.ActionService;
027    import org.jdom.Element;
028    import org.jdom.JDOMException;
029    import org.jdom.Namespace;
030    import org.xml.sax.SAXException;
031    
032    import javax.xml.transform.stream.StreamSource;
033    import javax.xml.validation.Schema;
034    import javax.xml.validation.Validator;
035    import java.io.IOException;
036    import java.io.Reader;
037    import java.io.StringReader;
038    import java.io.StringWriter;
039    import java.util.ArrayList;
040    import java.util.HashMap;
041    import java.util.HashSet;
042    import java.util.List;
043    import java.util.Map;
044    import java.util.Set;
045    import java.util.Stack;
046    
047    /**
048     * Class to parse and validate workflow xml
049     */
050    public class LiteWorkflowAppParser {
051    
052        private static final String DECISION_E = "decision";
053        private static final String ACTION_E = "action";
054        private static final String END_E = "end";
055        private static final String START_E = "start";
056        private static final String JOIN_E = "join";
057        private static final String FORK_E = "fork";
058        private static final Object KILL_E = "kill";
059    
060        private static final String SLA_INFO = "info";
061        private static final String CREDENTIALS = "credentials";
062    
063        private static final String NAME_A = "name";
064        private static final String CRED_A = "cred";
065        private static final String USER_RETRY_MAX_A = "retry-max";
066        private static final String USER_RETRY_INTERVAL_A = "retry-interval";
067        private static final String TO_A = "to";
068    
069        private static final String FORK_PATH_E = "path";
070        private static final String FORK_START_A = "start";
071    
072        private static final String ACTION_OK_E = "ok";
073        private static final String ACTION_ERROR_E = "error";
074    
075        private static final String DECISION_SWITCH_E = "switch";
076        private static final String DECISION_CASE_E = "case";
077        private static final String DECISION_DEFAULT_E = "default";
078    
079        private static final String KILL_MESSAGE_E = "message";
080        public static final String VALIDATE_FORK_JOIN = "oozie.validate.ForkJoin";
081    
082        private Schema schema;
083        private Class<? extends DecisionNodeHandler> decisionHandlerClass;
084        private Class<? extends ActionNodeHandler> actionHandlerClass;
085    
086        private static enum VisitStatus {
087            VISITING, VISITED
088        }
089    
090        private List<String> forkList = new ArrayList<String>();
091        private List<String> joinList = new ArrayList<String>();
092    
093        public LiteWorkflowAppParser(Schema schema, Class<? extends DecisionNodeHandler> decisionHandlerClass,
094                                     Class<? extends ActionNodeHandler> actionHandlerClass) throws WorkflowException {
095            this.schema = schema;
096            this.decisionHandlerClass = decisionHandlerClass;
097            this.actionHandlerClass = actionHandlerClass;
098        }
099    
100        /**
101         * Parse and validate xml to {@link LiteWorkflowApp}
102         *
103         * @param reader
104         * @return LiteWorkflowApp
105         * @throws WorkflowException
106         */
107        public LiteWorkflowApp validateAndParse(Reader reader) throws WorkflowException {
108            try {
109                StringWriter writer = new StringWriter();
110                IOUtils.copyCharStream(reader, writer);
111                String strDef = writer.toString();
112    
113                if (schema != null) {
114                    Validator validator = schema.newValidator();
115                    validator.validate(new StreamSource(new StringReader(strDef)));
116                }
117    
118                Element wfDefElement = XmlUtils.parseXml(strDef);
119                LiteWorkflowApp app = parse(strDef, wfDefElement);
120                Map<String, VisitStatus> traversed = new HashMap<String, VisitStatus>();
121                traversed.put(app.getNode(StartNodeDef.START).getName(), VisitStatus.VISITING);
122                validate(app, app.getNode(StartNodeDef.START), traversed);
123                //Validate whether fork/join are in pair or not
124                if (Services.get().getConf().getBoolean(VALIDATE_FORK_JOIN, true)) {
125                    validateForkJoin(app);
126                }
127                return app;
128            }
129            catch (JDOMException ex) {
130                throw new WorkflowException(ErrorCode.E0700, ex.getMessage(), ex);
131            }
132            catch (SAXException ex) {
133                throw new WorkflowException(ErrorCode.E0701, ex.getMessage(), ex);
134            }
135            catch (IOException ex) {
136                throw new WorkflowException(ErrorCode.E0702, ex.getMessage(), ex);
137            }
138        }
139    
140        /**
141         * Validate whether fork/join are in pair or not
142         * @param app LiteWorkflowApp
143         * @throws WorkflowException
144         */
145        private void validateForkJoin(LiteWorkflowApp app) throws WorkflowException {
146            // Make sure the number of forks and joins in wf are equal
147            if (forkList.size() != joinList.size()) {
148                throw new WorkflowException(ErrorCode.E0730);
149            }
150    
151            while(!forkList.isEmpty()){
152                // Make sure each of the fork node has a corresponding join; start with the root fork Node first
153                validateFork(app.getNode(forkList.remove(0)), app);
154            }
155    
156        }
157    
158        /*
159         * Test whether the fork node has a corresponding join
160         * @param node - the fork node
161         * @param app - the WorkflowApp
162         * @return
163         * @throws WorkflowException
164         */
165        private NodeDef validateFork(NodeDef forkNode, LiteWorkflowApp app) throws WorkflowException {
166            List<String> transitions = new ArrayList<String>(forkNode.getTransitions());
167            // list for keeping track of "error-to" transitions of Action Node
168            List<String> errorToTransitions = new ArrayList<String>();
169            String joinNode = null;
170            for (int i = 0; i < transitions.size(); i++) {
171                NodeDef node = app.getNode(transitions.get(i));
172                if (node instanceof DecisionNodeDef) {
173                    Set<String> decisionSet = new HashSet<String>(node.getTransitions());
174                    for (String ds : decisionSet) {
175                        if (transitions.contains(ds)) {
176                            throw new WorkflowException(ErrorCode.E0734, node.getName(), ds);
177                        } else {
178                            transitions.add(ds);
179                        }
180                    }
181                } else if (node instanceof ActionNodeDef) {
182                    // Make sure the transition is valid
183                    validateTransition(errorToTransitions, transitions, app, node);
184                    // Add the "ok-to" transition of node
185                    transitions.add(node.getTransitions().get(0));
186                    String errorTo = node.getTransitions().get(1);
187                    // Add the "error-to" transition if the transition is a Action Node
188                    if (app.getNode(errorTo) instanceof ActionNodeDef) {
189                        errorToTransitions.add(errorTo);
190                    }
191                } else if (node instanceof ForkNodeDef) {
192                    forkList.remove(node.getName());
193                    // Make a recursive call to resolve this fork node
194                    NodeDef joinNd = validateFork(node, app);
195                    // Make sure the transition is valid
196                    validateTransition(errorToTransitions, transitions, app, node);
197                    // Add the "ok-to" transition of node
198                    transitions.add(joinNd.getTransitions().get(0));
199                } else if (node instanceof JoinNodeDef) {
200                    // If joinNode encountered for the first time, remove it from the joinList and remember it
201                    String currentJoin = node.getName();
202                    if (joinList.contains(currentJoin)) {
203                        joinList.remove(currentJoin);
204                        joinNode = currentJoin;
205                    } else {
206                        // Make sure this join is the same as the join seen from the first time
207                        if (joinNode == null) {
208                            throw new WorkflowException(ErrorCode.E0733, forkNode);
209                        }
210                        if (!joinNode.equals(currentJoin)) {
211                            throw new WorkflowException(ErrorCode.E0732, forkNode, joinNode);
212                        }
213                    }
214                } else {
215                    throw new WorkflowException(ErrorCode.E0730);
216                }
217            }
218            return app.getNode(joinNode);
219    
220        }
221    
222        private void validateTransition(List<String> errorToTransitions, List<String> transitions, LiteWorkflowApp app, NodeDef node)
223                throws WorkflowException {
224            for (String transition : node.getTransitions()) {
225                // Make sure the transition node is either a join node or is not already visited
226                if (transitions.contains(transition) && !(app.getNode(transition) instanceof JoinNodeDef)) {
227                    throw new WorkflowException(ErrorCode.E0734, node.getName(), transition);
228                }
229                // Make sure the transition node is not the same as an already visited 'error-to' transition
230                if (errorToTransitions.contains(transition)) {
231                    throw new WorkflowException(ErrorCode.E0735, node.getName(), transition);
232                }
233            }
234    
235        }
236    
237        /**
238         * Parse xml to {@link LiteWorkflowApp}
239         *
240         * @param strDef
241         * @param root
242         * @return LiteWorkflowApp
243         * @throws WorkflowException
244         */
245        @SuppressWarnings({"unchecked", "ConstantConditions"})
246        private LiteWorkflowApp parse(String strDef, Element root) throws WorkflowException {
247            Namespace ns = root.getNamespace();
248            LiteWorkflowApp def = null;
249            for (Element eNode : (List<Element>) root.getChildren()) {
250                if (eNode.getName().equals(START_E)) {
251                    def = new LiteWorkflowApp(root.getAttributeValue(NAME_A), strDef,
252                                              new StartNodeDef(eNode.getAttributeValue(TO_A)));
253                }
254                else {
255                    if (eNode.getName().equals(END_E)) {
256                        def.addNode(new EndNodeDef(eNode.getAttributeValue(NAME_A)));
257                    }
258                    else {
259                        if (eNode.getName().equals(KILL_E)) {
260                            def.addNode(new KillNodeDef(eNode.getAttributeValue(NAME_A), eNode.getChildText(KILL_MESSAGE_E, ns)));
261                        }
262                        else {
263                            if (eNode.getName().equals(FORK_E)) {
264                                List<String> paths = new ArrayList<String>();
265                                for (Element tran : (List<Element>) eNode.getChildren(FORK_PATH_E, ns)) {
266                                    paths.add(tran.getAttributeValue(FORK_START_A));
267                                }
268                                def.addNode(new ForkNodeDef(eNode.getAttributeValue(NAME_A), paths));
269                            }
270                            else {
271                                if (eNode.getName().equals(JOIN_E)) {
272                                    def.addNode(new JoinNodeDef(eNode.getAttributeValue(NAME_A), eNode.getAttributeValue(TO_A)));
273                                }
274                                else {
275                                    if (eNode.getName().equals(DECISION_E)) {
276                                        Element eSwitch = eNode.getChild(DECISION_SWITCH_E, ns);
277                                        List<String> transitions = new ArrayList<String>();
278                                        for (Element e : (List<Element>) eSwitch.getChildren(DECISION_CASE_E, ns)) {
279                                            transitions.add(e.getAttributeValue(TO_A));
280                                        }
281                                        transitions.add(eSwitch.getChild(DECISION_DEFAULT_E, ns).getAttributeValue(TO_A));
282    
283                                        String switchStatement = XmlUtils.prettyPrint(eSwitch).toString();
284                                        def.addNode(new DecisionNodeDef(eNode.getAttributeValue(NAME_A), switchStatement, decisionHandlerClass,
285                                                                        transitions));
286                                    }
287                                    else {
288                                        if (ACTION_E.equals(eNode.getName())) {
289                                            String[] transitions = new String[2];
290                                            Element eActionConf = null;
291                                            for (Element elem : (List<Element>) eNode.getChildren()) {
292                                                if (ACTION_OK_E.equals(elem.getName())) {
293                                                    transitions[0] = elem.getAttributeValue(TO_A);
294                                                }
295                                                else {
296                                                    if (ACTION_ERROR_E.equals(elem.getName())) {
297                                                        transitions[1] = elem.getAttributeValue(TO_A);
298                                                    }
299                                                    else {
300                                                        if (SLA_INFO.equals(elem.getName()) || CREDENTIALS.equals(elem.getName())) {
301                                                            continue;
302                                                        }
303                                                        else {
304                                                            eActionConf = elem;
305                                                        }
306                                                    }
307                                                }
308                                            }
309    
310                                            String credStr = eNode.getAttributeValue(CRED_A);
311                                            String userRetryMaxStr = eNode.getAttributeValue(USER_RETRY_MAX_A);
312                                            String userRetryIntervalStr = eNode.getAttributeValue(USER_RETRY_INTERVAL_A);
313    
314                                            String actionConf = XmlUtils.prettyPrint(eActionConf).toString();
315                                            def.addNode(new ActionNodeDef(eNode.getAttributeValue(NAME_A), actionConf, actionHandlerClass,
316                                                                          transitions[0], transitions[1], credStr,
317                                                                          userRetryMaxStr, userRetryIntervalStr));
318                                        }
319                                        else {
320                                            if (SLA_INFO.equals(eNode.getName()) || CREDENTIALS.equals(eNode.getName())) {
321                                                // No operation is required
322                                            }
323                                            else {
324                                                throw new WorkflowException(ErrorCode.E0703, eNode.getName());
325                                            }
326                                        }
327                                    }
328                                }
329                            }
330                        }
331                    }
332                }
333            }
334            return def;
335        }
336    
337        /**
338         * Validate workflow xml
339         *
340         * @param app
341         * @param node
342         * @param traversed
343         * @throws WorkflowException
344         */
345        private void validate(LiteWorkflowApp app, NodeDef node, Map<String, VisitStatus> traversed) throws WorkflowException {
346            if (!(node instanceof StartNodeDef)) {
347                try {
348                    ParamChecker.validateActionName(node.getName());
349                }
350                catch (IllegalArgumentException ex) {
351                    throw new WorkflowException(ErrorCode.E0724, ex.getMessage());
352                }
353            }
354            if (node instanceof ActionNodeDef) {
355                try {
356                    Element action = XmlUtils.parseXml(node.getConf());
357                    boolean supportedAction = Services.get().get(ActionService.class).getExecutor(action.getName()) != null;
358                    if (!supportedAction) {
359                        throw new WorkflowException(ErrorCode.E0723, node.getName(), action.getName());
360                    }
361                }
362                catch (JDOMException ex) {
363                    throw new RuntimeException("It should never happen, " + ex.getMessage(), ex);
364                }
365            }
366    
367            if(node instanceof ForkNodeDef){
368                forkList.add(node.getName());
369            }
370    
371            if(node instanceof JoinNodeDef){
372                joinList.add(node.getName());
373            }
374    
375            if (node instanceof EndNodeDef) {
376                traversed.put(node.getName(), VisitStatus.VISITED);
377                return;
378            }
379            if (node instanceof KillNodeDef) {
380                traversed.put(node.getName(), VisitStatus.VISITED);
381                return;
382            }
383            for (String transition : node.getTransitions()) {
384    
385                if (app.getNode(transition) == null) {
386                    throw new WorkflowException(ErrorCode.E0708, node.getName(), transition);
387                }
388    
389                //check if it is a cycle
390                if (traversed.get(app.getNode(transition).getName()) == VisitStatus.VISITING) {
391                    throw new WorkflowException(ErrorCode.E0707, app.getNode(transition).getName());
392                }
393                //ignore validated one
394                if (traversed.get(app.getNode(transition).getName()) == VisitStatus.VISITED) {
395                    continue;
396                }
397    
398                traversed.put(app.getNode(transition).getName(), VisitStatus.VISITING);
399                validate(app, app.getNode(transition), traversed);
400            }
401            traversed.put(node.getName(), VisitStatus.VISITED);
402        }
403    }