This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.workflow.lite;
019
020 import org.apache.oozie.workflow.WorkflowException;
021 import org.apache.oozie.util.IOUtils;
022 import org.apache.oozie.util.XmlUtils;
023 import org.apache.oozie.util.ParamChecker;
024 import org.apache.oozie.ErrorCode;
025 import org.apache.oozie.service.Services;
026 import org.apache.oozie.service.ActionService;
027 import org.jdom.Element;
028 import org.jdom.JDOMException;
029 import org.jdom.Namespace;
030 import org.xml.sax.SAXException;
031
032 import javax.xml.transform.stream.StreamSource;
033 import javax.xml.validation.Schema;
034 import javax.xml.validation.Validator;
035 import java.io.IOException;
036 import java.io.Reader;
037 import java.io.StringReader;
038 import java.io.StringWriter;
039 import java.util.ArrayList;
040 import java.util.HashMap;
041 import java.util.HashSet;
042 import java.util.List;
043 import java.util.Map;
044 import java.util.Set;
045 import java.util.Stack;
046
047 /**
048 * Class to parse and validate workflow xml
049 */
050 public class LiteWorkflowAppParser {
051
052 private static final String DECISION_E = "decision";
053 private static final String ACTION_E = "action";
054 private static final String END_E = "end";
055 private static final String START_E = "start";
056 private static final String JOIN_E = "join";
057 private static final String FORK_E = "fork";
058 private static final Object KILL_E = "kill";
059
060 private static final String SLA_INFO = "info";
061 private static final String CREDENTIALS = "credentials";
062
063 private static final String NAME_A = "name";
064 private static final String CRED_A = "cred";
065 private static final String USER_RETRY_MAX_A = "retry-max";
066 private static final String USER_RETRY_INTERVAL_A = "retry-interval";
067 private static final String TO_A = "to";
068
069 private static final String FORK_PATH_E = "path";
070 private static final String FORK_START_A = "start";
071
072 private static final String ACTION_OK_E = "ok";
073 private static final String ACTION_ERROR_E = "error";
074
075 private static final String DECISION_SWITCH_E = "switch";
076 private static final String DECISION_CASE_E = "case";
077 private static final String DECISION_DEFAULT_E = "default";
078
079 private static final String KILL_MESSAGE_E = "message";
080 public static final String VALIDATE_FORK_JOIN = "oozie.validate.ForkJoin";
081
082 private Schema schema;
083 private Class<? extends DecisionNodeHandler> decisionHandlerClass;
084 private Class<? extends ActionNodeHandler> actionHandlerClass;
085
086 private static enum VisitStatus {
087 VISITING, VISITED
088 }
089
090 private List<String> forkList = new ArrayList<String>();
091 private List<String> joinList = new ArrayList<String>();
092
093 public LiteWorkflowAppParser(Schema schema, Class<? extends DecisionNodeHandler> decisionHandlerClass,
094 Class<? extends ActionNodeHandler> actionHandlerClass) throws WorkflowException {
095 this.schema = schema;
096 this.decisionHandlerClass = decisionHandlerClass;
097 this.actionHandlerClass = actionHandlerClass;
098 }
099
100 /**
101 * Parse and validate xml to {@link LiteWorkflowApp}
102 *
103 * @param reader
104 * @return LiteWorkflowApp
105 * @throws WorkflowException
106 */
107 public LiteWorkflowApp validateAndParse(Reader reader) throws WorkflowException {
108 try {
109 StringWriter writer = new StringWriter();
110 IOUtils.copyCharStream(reader, writer);
111 String strDef = writer.toString();
112
113 if (schema != null) {
114 Validator validator = schema.newValidator();
115 validator.validate(new StreamSource(new StringReader(strDef)));
116 }
117
118 Element wfDefElement = XmlUtils.parseXml(strDef);
119 LiteWorkflowApp app = parse(strDef, wfDefElement);
120 Map<String, VisitStatus> traversed = new HashMap<String, VisitStatus>();
121 traversed.put(app.getNode(StartNodeDef.START).getName(), VisitStatus.VISITING);
122 validate(app, app.getNode(StartNodeDef.START), traversed);
123 //Validate whether fork/join are in pair or not
124 if (Services.get().getConf().getBoolean(VALIDATE_FORK_JOIN, true)) {
125 validateForkJoin(app);
126 }
127 return app;
128 }
129 catch (JDOMException ex) {
130 throw new WorkflowException(ErrorCode.E0700, ex.getMessage(), ex);
131 }
132 catch (SAXException ex) {
133 throw new WorkflowException(ErrorCode.E0701, ex.getMessage(), ex);
134 }
135 catch (IOException ex) {
136 throw new WorkflowException(ErrorCode.E0702, ex.getMessage(), ex);
137 }
138 }
139
140 /**
141 * Validate whether fork/join are in pair or not
142 * @param app LiteWorkflowApp
143 * @throws WorkflowException
144 */
145 private void validateForkJoin(LiteWorkflowApp app) throws WorkflowException {
146 // Make sure the number of forks and joins in wf are equal
147 if (forkList.size() != joinList.size()) {
148 throw new WorkflowException(ErrorCode.E0730);
149 }
150
151 while(!forkList.isEmpty()){
152 // Make sure each of the fork node has a corresponding join; start with the root fork Node first
153 validateFork(app.getNode(forkList.remove(0)), app);
154 }
155
156 }
157
158 /*
159 * Test whether the fork node has a corresponding join
160 * @param node - the fork node
161 * @param app - the WorkflowApp
162 * @return
163 * @throws WorkflowException
164 */
165 private NodeDef validateFork(NodeDef forkNode, LiteWorkflowApp app) throws WorkflowException {
166 List<String> transitions = new ArrayList<String>(forkNode.getTransitions());
167 // list for keeping track of "error-to" transitions of Action Node
168 List<String> errorToTransitions = new ArrayList<String>();
169 String joinNode = null;
170 for (int i = 0; i < transitions.size(); i++) {
171 NodeDef node = app.getNode(transitions.get(i));
172 if (node instanceof DecisionNodeDef) {
173 Set<String> decisionSet = new HashSet<String>(node.getTransitions());
174 for (String ds : decisionSet) {
175 if (transitions.contains(ds)) {
176 throw new WorkflowException(ErrorCode.E0734, node.getName(), ds);
177 } else {
178 transitions.add(ds);
179 }
180 }
181 } else if (node instanceof ActionNodeDef) {
182 // Make sure the transition is valid
183 validateTransition(errorToTransitions, transitions, app, node);
184 // Add the "ok-to" transition of node
185 transitions.add(node.getTransitions().get(0));
186 String errorTo = node.getTransitions().get(1);
187 // Add the "error-to" transition if the transition is a Action Node
188 if (app.getNode(errorTo) instanceof ActionNodeDef) {
189 errorToTransitions.add(errorTo);
190 }
191 } else if (node instanceof ForkNodeDef) {
192 forkList.remove(node.getName());
193 // Make a recursive call to resolve this fork node
194 NodeDef joinNd = validateFork(node, app);
195 // Make sure the transition is valid
196 validateTransition(errorToTransitions, transitions, app, node);
197 // Add the "ok-to" transition of node
198 transitions.add(joinNd.getTransitions().get(0));
199 } else if (node instanceof JoinNodeDef) {
200 // If joinNode encountered for the first time, remove it from the joinList and remember it
201 String currentJoin = node.getName();
202 if (joinList.contains(currentJoin)) {
203 joinList.remove(currentJoin);
204 joinNode = currentJoin;
205 } else {
206 // Make sure this join is the same as the join seen from the first time
207 if (joinNode == null) {
208 throw new WorkflowException(ErrorCode.E0733, forkNode);
209 }
210 if (!joinNode.equals(currentJoin)) {
211 throw new WorkflowException(ErrorCode.E0732, forkNode, joinNode);
212 }
213 }
214 } else {
215 throw new WorkflowException(ErrorCode.E0730);
216 }
217 }
218 return app.getNode(joinNode);
219
220 }
221
222 private void validateTransition(List<String> errorToTransitions, List<String> transitions, LiteWorkflowApp app, NodeDef node)
223 throws WorkflowException {
224 for (String transition : node.getTransitions()) {
225 // Make sure the transition node is either a join node or is not already visited
226 if (transitions.contains(transition) && !(app.getNode(transition) instanceof JoinNodeDef)) {
227 throw new WorkflowException(ErrorCode.E0734, node.getName(), transition);
228 }
229 // Make sure the transition node is not the same as an already visited 'error-to' transition
230 if (errorToTransitions.contains(transition)) {
231 throw new WorkflowException(ErrorCode.E0735, node.getName(), transition);
232 }
233 }
234
235 }
236
237 /**
238 * Parse xml to {@link LiteWorkflowApp}
239 *
240 * @param strDef
241 * @param root
242 * @return LiteWorkflowApp
243 * @throws WorkflowException
244 */
245 @SuppressWarnings({"unchecked", "ConstantConditions"})
246 private LiteWorkflowApp parse(String strDef, Element root) throws WorkflowException {
247 Namespace ns = root.getNamespace();
248 LiteWorkflowApp def = null;
249 for (Element eNode : (List<Element>) root.getChildren()) {
250 if (eNode.getName().equals(START_E)) {
251 def = new LiteWorkflowApp(root.getAttributeValue(NAME_A), strDef,
252 new StartNodeDef(eNode.getAttributeValue(TO_A)));
253 }
254 else {
255 if (eNode.getName().equals(END_E)) {
256 def.addNode(new EndNodeDef(eNode.getAttributeValue(NAME_A)));
257 }
258 else {
259 if (eNode.getName().equals(KILL_E)) {
260 def.addNode(new KillNodeDef(eNode.getAttributeValue(NAME_A), eNode.getChildText(KILL_MESSAGE_E, ns)));
261 }
262 else {
263 if (eNode.getName().equals(FORK_E)) {
264 List<String> paths = new ArrayList<String>();
265 for (Element tran : (List<Element>) eNode.getChildren(FORK_PATH_E, ns)) {
266 paths.add(tran.getAttributeValue(FORK_START_A));
267 }
268 def.addNode(new ForkNodeDef(eNode.getAttributeValue(NAME_A), paths));
269 }
270 else {
271 if (eNode.getName().equals(JOIN_E)) {
272 def.addNode(new JoinNodeDef(eNode.getAttributeValue(NAME_A), eNode.getAttributeValue(TO_A)));
273 }
274 else {
275 if (eNode.getName().equals(DECISION_E)) {
276 Element eSwitch = eNode.getChild(DECISION_SWITCH_E, ns);
277 List<String> transitions = new ArrayList<String>();
278 for (Element e : (List<Element>) eSwitch.getChildren(DECISION_CASE_E, ns)) {
279 transitions.add(e.getAttributeValue(TO_A));
280 }
281 transitions.add(eSwitch.getChild(DECISION_DEFAULT_E, ns).getAttributeValue(TO_A));
282
283 String switchStatement = XmlUtils.prettyPrint(eSwitch).toString();
284 def.addNode(new DecisionNodeDef(eNode.getAttributeValue(NAME_A), switchStatement, decisionHandlerClass,
285 transitions));
286 }
287 else {
288 if (ACTION_E.equals(eNode.getName())) {
289 String[] transitions = new String[2];
290 Element eActionConf = null;
291 for (Element elem : (List<Element>) eNode.getChildren()) {
292 if (ACTION_OK_E.equals(elem.getName())) {
293 transitions[0] = elem.getAttributeValue(TO_A);
294 }
295 else {
296 if (ACTION_ERROR_E.equals(elem.getName())) {
297 transitions[1] = elem.getAttributeValue(TO_A);
298 }
299 else {
300 if (SLA_INFO.equals(elem.getName()) || CREDENTIALS.equals(elem.getName())) {
301 continue;
302 }
303 else {
304 eActionConf = elem;
305 }
306 }
307 }
308 }
309
310 String credStr = eNode.getAttributeValue(CRED_A);
311 String userRetryMaxStr = eNode.getAttributeValue(USER_RETRY_MAX_A);
312 String userRetryIntervalStr = eNode.getAttributeValue(USER_RETRY_INTERVAL_A);
313
314 String actionConf = XmlUtils.prettyPrint(eActionConf).toString();
315 def.addNode(new ActionNodeDef(eNode.getAttributeValue(NAME_A), actionConf, actionHandlerClass,
316 transitions[0], transitions[1], credStr,
317 userRetryMaxStr, userRetryIntervalStr));
318 }
319 else {
320 if (SLA_INFO.equals(eNode.getName()) || CREDENTIALS.equals(eNode.getName())) {
321 // No operation is required
322 }
323 else {
324 throw new WorkflowException(ErrorCode.E0703, eNode.getName());
325 }
326 }
327 }
328 }
329 }
330 }
331 }
332 }
333 }
334 return def;
335 }
336
337 /**
338 * Validate workflow xml
339 *
340 * @param app
341 * @param node
342 * @param traversed
343 * @throws WorkflowException
344 */
345 private void validate(LiteWorkflowApp app, NodeDef node, Map<String, VisitStatus> traversed) throws WorkflowException {
346 if (!(node instanceof StartNodeDef)) {
347 try {
348 ParamChecker.validateActionName(node.getName());
349 }
350 catch (IllegalArgumentException ex) {
351 throw new WorkflowException(ErrorCode.E0724, ex.getMessage());
352 }
353 }
354 if (node instanceof ActionNodeDef) {
355 try {
356 Element action = XmlUtils.parseXml(node.getConf());
357 boolean supportedAction = Services.get().get(ActionService.class).getExecutor(action.getName()) != null;
358 if (!supportedAction) {
359 throw new WorkflowException(ErrorCode.E0723, node.getName(), action.getName());
360 }
361 }
362 catch (JDOMException ex) {
363 throw new RuntimeException("It should never happen, " + ex.getMessage(), ex);
364 }
365 }
366
367 if(node instanceof ForkNodeDef){
368 forkList.add(node.getName());
369 }
370
371 if(node instanceof JoinNodeDef){
372 joinList.add(node.getName());
373 }
374
375 if (node instanceof EndNodeDef) {
376 traversed.put(node.getName(), VisitStatus.VISITED);
377 return;
378 }
379 if (node instanceof KillNodeDef) {
380 traversed.put(node.getName(), VisitStatus.VISITED);
381 return;
382 }
383 for (String transition : node.getTransitions()) {
384
385 if (app.getNode(transition) == null) {
386 throw new WorkflowException(ErrorCode.E0708, node.getName(), transition);
387 }
388
389 //check if it is a cycle
390 if (traversed.get(app.getNode(transition).getName()) == VisitStatus.VISITING) {
391 throw new WorkflowException(ErrorCode.E0707, app.getNode(transition).getName());
392 }
393 //ignore validated one
394 if (traversed.get(app.getNode(transition).getName()) == VisitStatus.VISITED) {
395 continue;
396 }
397
398 traversed.put(app.getNode(transition).getName(), VisitStatus.VISITING);
399 validate(app, app.getNode(transition), traversed);
400 }
401 traversed.put(node.getName(), VisitStatus.VISITED);
402 }
403 }