001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.workflow.lite;
019    
020    import org.apache.oozie.workflow.WorkflowException;
021    import org.apache.oozie.util.IOUtils;
022    import org.apache.oozie.util.XmlUtils;
023    import org.apache.oozie.util.ParamChecker;
024    import org.apache.oozie.ErrorCode;
025    import org.apache.oozie.service.Services;
026    import org.apache.oozie.service.ActionService;
027    import org.jdom.Element;
028    import org.jdom.JDOMException;
029    import org.jdom.Namespace;
030    import org.xml.sax.SAXException;
031    
032    import javax.xml.transform.stream.StreamSource;
033    import javax.xml.validation.Schema;
034    import javax.xml.validation.Validator;
035    import java.io.IOException;
036    import java.io.Reader;
037    import java.io.StringReader;
038    import java.io.StringWriter;
039    import java.util.ArrayList;
040    import java.util.HashMap;
041    import java.util.List;
042    import java.util.Map;
043    
044    /**
045     * Class to parse and validate workflow xml
046     */
047    public class LiteWorkflowAppParser {
048    
049        private static final String DECISION_E = "decision";
050        private static final String ACTION_E = "action";
051        private static final String END_E = "end";
052        private static final String START_E = "start";
053        private static final String JOIN_E = "join";
054        private static final String FORK_E = "fork";
055        private static final Object KILL_E = "kill";
056    
057        private static final String SLA_INFO = "info";
058        private static final String CREDENTIALS = "credentials";
059    
060        private static final String NAME_A = "name";
061        private static final String CRED_A = "cred";
062        private static final String USER_RETRY_MAX_A = "retry-max";
063        private static final String USER_RETRY_INTERVAL_A = "retry-interval";
064        private static final String TO_A = "to";
065    
066        private static final String FORK_PATH_E = "path";
067        private static final String FORK_START_A = "start";
068    
069        private static final String ACTION_OK_E = "ok";
070        private static final String ACTION_ERROR_E = "error";
071    
072        private static final String DECISION_SWITCH_E = "switch";
073        private static final String DECISION_CASE_E = "case";
074        private static final String DECISION_DEFAULT_E = "default";
075    
076        private static final String KILL_MESSAGE_E = "message";
077    
078        private Schema schema;
079        private Class<? extends DecisionNodeHandler> decisionHandlerClass;
080        private Class<? extends ActionNodeHandler> actionHandlerClass;
081    
082        private static enum VisitStatus {
083            VISITING, VISITED
084        }
085    
086        ;
087    
088    
089        public LiteWorkflowAppParser(Schema schema, Class<? extends DecisionNodeHandler> decisionHandlerClass,
090                                     Class<? extends ActionNodeHandler> actionHandlerClass) throws WorkflowException {
091            this.schema = schema;
092            this.decisionHandlerClass = decisionHandlerClass;
093            this.actionHandlerClass = actionHandlerClass;
094        }
095    
096        /**
097         * Parse and validate xml to {@link LiteWorkflowApp}
098         *
099         * @param reader
100         * @return LiteWorkflowApp
101         * @throws WorkflowException
102         */
103        public LiteWorkflowApp validateAndParse(Reader reader) throws WorkflowException {
104            try {
105                StringWriter writer = new StringWriter();
106                IOUtils.copyCharStream(reader, writer);
107                String strDef = writer.toString();
108    
109                if (schema != null) {
110                    Validator validator = schema.newValidator();
111                    validator.validate(new StreamSource(new StringReader(strDef)));
112                }
113    
114                Element wfDefElement = XmlUtils.parseXml(strDef);
115                LiteWorkflowApp app = parse(strDef, wfDefElement);
116                Map<String, VisitStatus> traversed = new HashMap<String, VisitStatus>();
117                traversed.put(app.getNode(StartNodeDef.START).getName(), VisitStatus.VISITING);
118                validate(app, app.getNode(StartNodeDef.START), traversed);
119                return app;
120            }
121            catch (JDOMException ex) {
122                throw new WorkflowException(ErrorCode.E0700, ex.getMessage(), ex);
123            }
124            catch (SAXException ex) {
125                throw new WorkflowException(ErrorCode.E0701, ex.getMessage(), ex);
126            }
127            catch (IOException ex) {
128                throw new WorkflowException(ErrorCode.E0702, ex.getMessage(), ex);
129            }
130        }
131    
132        /**
133         * Parse xml to {@link LiteWorkflowApp}
134         *
135         * @param strDef
136         * @param root
137         * @return LiteWorkflowApp
138         * @throws WorkflowException
139         */
140        @SuppressWarnings({"unchecked", "ConstantConditions"})
141        private LiteWorkflowApp parse(String strDef, Element root) throws WorkflowException {
142            Namespace ns = root.getNamespace();
143            LiteWorkflowApp def = null;
144            for (Element eNode : (List<Element>) root.getChildren()) {
145                if (eNode.getName().equals(START_E)) {
146                    def = new LiteWorkflowApp(root.getAttributeValue(NAME_A), strDef,
147                                              new StartNodeDef(eNode.getAttributeValue(TO_A)));
148                }
149                else {
150                    if (eNode.getName().equals(END_E)) {
151                        def.addNode(new EndNodeDef(eNode.getAttributeValue(NAME_A)));
152                    }
153                    else {
154                        if (eNode.getName().equals(KILL_E)) {
155                            def.addNode(new KillNodeDef(eNode.getAttributeValue(NAME_A), eNode.getChildText(KILL_MESSAGE_E, ns)));
156                        }
157                        else {
158                            if (eNode.getName().equals(FORK_E)) {
159                                List<String> paths = new ArrayList<String>();
160                                for (Element tran : (List<Element>) eNode.getChildren(FORK_PATH_E, ns)) {
161                                    paths.add(tran.getAttributeValue(FORK_START_A));
162                                }
163                                def.addNode(new ForkNodeDef(eNode.getAttributeValue(NAME_A), paths));
164                            }
165                            else {
166                                if (eNode.getName().equals(JOIN_E)) {
167                                    def.addNode(new JoinNodeDef(eNode.getAttributeValue(NAME_A), eNode.getAttributeValue(TO_A)));
168                                }
169                                else {
170                                    if (eNode.getName().equals(DECISION_E)) {
171                                        Element eSwitch = eNode.getChild(DECISION_SWITCH_E, ns);
172                                        List<String> transitions = new ArrayList<String>();
173                                        for (Element e : (List<Element>) eSwitch.getChildren(DECISION_CASE_E, ns)) {
174                                            transitions.add(e.getAttributeValue(TO_A));
175                                        }
176                                        transitions.add(eSwitch.getChild(DECISION_DEFAULT_E, ns).getAttributeValue(TO_A));
177    
178                                        String switchStatement = XmlUtils.prettyPrint(eSwitch).toString();
179                                        def.addNode(new DecisionNodeDef(eNode.getAttributeValue(NAME_A), switchStatement, decisionHandlerClass,
180                                                                        transitions));
181                                    }
182                                    else {
183                                        if (ACTION_E.equals(eNode.getName())) {
184                                            String[] transitions = new String[2];
185                                            Element eActionConf = null;
186                                            for (Element elem : (List<Element>) eNode.getChildren()) {
187                                                if (ACTION_OK_E.equals(elem.getName())) {
188                                                    transitions[0] = elem.getAttributeValue(TO_A);
189                                                }
190                                                else {
191                                                    if (ACTION_ERROR_E.equals(elem.getName())) {
192                                                        transitions[1] = elem.getAttributeValue(TO_A);
193                                                    }
194                                                    else {
195                                                        if (SLA_INFO.equals(elem.getName()) || CREDENTIALS.equals(elem.getName())) {
196                                                            continue;
197                                                        }
198                                                        else {
199                                                            eActionConf = elem;
200                                                        }
201                                                    }
202                                                }
203                                            }
204                                            
205                                            String credStr = eNode.getAttributeValue(CRED_A);
206                                            String userRetryMaxStr = eNode.getAttributeValue(USER_RETRY_MAX_A);
207                                            String userRetryIntervalStr = eNode.getAttributeValue(USER_RETRY_INTERVAL_A);
208                                            
209                                            String actionConf = XmlUtils.prettyPrint(eActionConf).toString();
210                                            def.addNode(new ActionNodeDef(eNode.getAttributeValue(NAME_A), actionConf, actionHandlerClass,
211                                                                          transitions[0], transitions[1], credStr,
212                                                                          userRetryMaxStr, userRetryIntervalStr));
213                                        }
214                                        else {
215                                            if (SLA_INFO.equals(eNode.getName()) || CREDENTIALS.equals(eNode.getName())) {
216                                                // No operation is required
217                                            }
218                                            else {
219                                                throw new WorkflowException(ErrorCode.E0703, eNode.getName());
220                                            }
221                                        }
222                                    }
223                                }
224                            }
225                        }
226                    }
227                }
228            }
229            return def;
230        }
231    
232        /**
233         * Validate workflow xml
234         *
235         * @param app
236         * @param node
237         * @param traversed
238         * @throws WorkflowException
239         */
240        private void validate(LiteWorkflowApp app, NodeDef node, Map<String, VisitStatus> traversed) throws WorkflowException {
241            if (!(node instanceof StartNodeDef)) {
242                try {
243                    ParamChecker.validateActionName(node.getName());
244                }
245                catch (IllegalArgumentException ex) {
246                    throw new WorkflowException(ErrorCode.E0724, ex.getMessage());
247                }
248            }
249            if (node instanceof ActionNodeDef) {
250                try {
251                    Element action = XmlUtils.parseXml(node.getConf());
252                    boolean supportedAction = Services.get().get(ActionService.class).getExecutor(action.getName()) != null;
253                    if (!supportedAction) {
254                        throw new WorkflowException(ErrorCode.E0723, node.getName(), action.getName());
255                    }
256                }
257                catch (JDOMException ex) {
258                    throw new RuntimeException("It should never happen, " + ex.getMessage(), ex);
259                }
260            }
261    
262            if (node instanceof EndNodeDef) {
263                traversed.put(node.getName(), VisitStatus.VISITED);
264                return;
265            }
266            if (node instanceof KillNodeDef) {
267                traversed.put(node.getName(), VisitStatus.VISITED);
268                return;
269            }
270            for (String transition : node.getTransitions()) {
271    
272                if (app.getNode(transition) == null) {
273                    throw new WorkflowException(ErrorCode.E0708, node.getName(), transition);
274                }
275    
276                //check if it is a cycle
277                if (traversed.get(app.getNode(transition).getName()) == VisitStatus.VISITING) {
278                    throw new WorkflowException(ErrorCode.E0707, app.getNode(transition).getName());
279                }
280                //ignore validated one
281                if (traversed.get(app.getNode(transition).getName()) == VisitStatus.VISITED) {
282                    continue;
283                }
284    
285                traversed.put(app.getNode(transition).getName(), VisitStatus.VISITING);
286                validate(app, app.getNode(transition), traversed);
287            }
288            traversed.put(node.getName(), VisitStatus.VISITED);
289        }
290    }