This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.workflow.lite;
019
020 import org.apache.oozie.workflow.WorkflowException;
021 import org.apache.oozie.util.IOUtils;
022 import org.apache.oozie.util.XmlUtils;
023 import org.apache.oozie.util.ParamChecker;
024 import org.apache.oozie.ErrorCode;
025 import org.apache.oozie.service.Services;
026 import org.apache.oozie.service.ActionService;
027 import org.jdom.Element;
028 import org.jdom.JDOMException;
029 import org.jdom.Namespace;
030 import org.xml.sax.SAXException;
031
032 import javax.xml.transform.stream.StreamSource;
033 import javax.xml.validation.Schema;
034 import javax.xml.validation.Validator;
035 import java.io.IOException;
036 import java.io.Reader;
037 import java.io.StringReader;
038 import java.io.StringWriter;
039 import java.util.ArrayList;
040 import java.util.HashMap;
041 import java.util.List;
042 import java.util.Map;
043
044 /**
045 * Class to parse and validate workflow xml
046 */
047 public class LiteWorkflowAppParser {
048
049 private static final String DECISION_E = "decision";
050 private static final String ACTION_E = "action";
051 private static final String END_E = "end";
052 private static final String START_E = "start";
053 private static final String JOIN_E = "join";
054 private static final String FORK_E = "fork";
055 private static final Object KILL_E = "kill";
056
057 private static final String SLA_INFO = "info";
058 private static final String CREDENTIALS = "credentials";
059
060 private static final String NAME_A = "name";
061 private static final String CRED_A = "cred";
062 private static final String USER_RETRY_MAX_A = "retry-max";
063 private static final String USER_RETRY_INTERVAL_A = "retry-interval";
064 private static final String TO_A = "to";
065
066 private static final String FORK_PATH_E = "path";
067 private static final String FORK_START_A = "start";
068
069 private static final String ACTION_OK_E = "ok";
070 private static final String ACTION_ERROR_E = "error";
071
072 private static final String DECISION_SWITCH_E = "switch";
073 private static final String DECISION_CASE_E = "case";
074 private static final String DECISION_DEFAULT_E = "default";
075
076 private static final String KILL_MESSAGE_E = "message";
077
078 private Schema schema;
079 private Class<? extends DecisionNodeHandler> decisionHandlerClass;
080 private Class<? extends ActionNodeHandler> actionHandlerClass;
081
082 private static enum VisitStatus {
083 VISITING, VISITED
084 }
085
086 ;
087
088
089 public LiteWorkflowAppParser(Schema schema, Class<? extends DecisionNodeHandler> decisionHandlerClass,
090 Class<? extends ActionNodeHandler> actionHandlerClass) throws WorkflowException {
091 this.schema = schema;
092 this.decisionHandlerClass = decisionHandlerClass;
093 this.actionHandlerClass = actionHandlerClass;
094 }
095
096 /**
097 * Parse and validate xml to {@link LiteWorkflowApp}
098 *
099 * @param reader
100 * @return LiteWorkflowApp
101 * @throws WorkflowException
102 */
103 public LiteWorkflowApp validateAndParse(Reader reader) throws WorkflowException {
104 try {
105 StringWriter writer = new StringWriter();
106 IOUtils.copyCharStream(reader, writer);
107 String strDef = writer.toString();
108
109 if (schema != null) {
110 Validator validator = schema.newValidator();
111 validator.validate(new StreamSource(new StringReader(strDef)));
112 }
113
114 Element wfDefElement = XmlUtils.parseXml(strDef);
115 LiteWorkflowApp app = parse(strDef, wfDefElement);
116 Map<String, VisitStatus> traversed = new HashMap<String, VisitStatus>();
117 traversed.put(app.getNode(StartNodeDef.START).getName(), VisitStatus.VISITING);
118 validate(app, app.getNode(StartNodeDef.START), traversed);
119 return app;
120 }
121 catch (JDOMException ex) {
122 throw new WorkflowException(ErrorCode.E0700, ex.getMessage(), ex);
123 }
124 catch (SAXException ex) {
125 throw new WorkflowException(ErrorCode.E0701, ex.getMessage(), ex);
126 }
127 catch (IOException ex) {
128 throw new WorkflowException(ErrorCode.E0702, ex.getMessage(), ex);
129 }
130 }
131
132 /**
133 * Parse xml to {@link LiteWorkflowApp}
134 *
135 * @param strDef
136 * @param root
137 * @return LiteWorkflowApp
138 * @throws WorkflowException
139 */
140 @SuppressWarnings({"unchecked", "ConstantConditions"})
141 private LiteWorkflowApp parse(String strDef, Element root) throws WorkflowException {
142 Namespace ns = root.getNamespace();
143 LiteWorkflowApp def = null;
144 for (Element eNode : (List<Element>) root.getChildren()) {
145 if (eNode.getName().equals(START_E)) {
146 def = new LiteWorkflowApp(root.getAttributeValue(NAME_A), strDef,
147 new StartNodeDef(eNode.getAttributeValue(TO_A)));
148 }
149 else {
150 if (eNode.getName().equals(END_E)) {
151 def.addNode(new EndNodeDef(eNode.getAttributeValue(NAME_A)));
152 }
153 else {
154 if (eNode.getName().equals(KILL_E)) {
155 def.addNode(new KillNodeDef(eNode.getAttributeValue(NAME_A), eNode.getChildText(KILL_MESSAGE_E, ns)));
156 }
157 else {
158 if (eNode.getName().equals(FORK_E)) {
159 List<String> paths = new ArrayList<String>();
160 for (Element tran : (List<Element>) eNode.getChildren(FORK_PATH_E, ns)) {
161 paths.add(tran.getAttributeValue(FORK_START_A));
162 }
163 def.addNode(new ForkNodeDef(eNode.getAttributeValue(NAME_A), paths));
164 }
165 else {
166 if (eNode.getName().equals(JOIN_E)) {
167 def.addNode(new JoinNodeDef(eNode.getAttributeValue(NAME_A), eNode.getAttributeValue(TO_A)));
168 }
169 else {
170 if (eNode.getName().equals(DECISION_E)) {
171 Element eSwitch = eNode.getChild(DECISION_SWITCH_E, ns);
172 List<String> transitions = new ArrayList<String>();
173 for (Element e : (List<Element>) eSwitch.getChildren(DECISION_CASE_E, ns)) {
174 transitions.add(e.getAttributeValue(TO_A));
175 }
176 transitions.add(eSwitch.getChild(DECISION_DEFAULT_E, ns).getAttributeValue(TO_A));
177
178 String switchStatement = XmlUtils.prettyPrint(eSwitch).toString();
179 def.addNode(new DecisionNodeDef(eNode.getAttributeValue(NAME_A), switchStatement, decisionHandlerClass,
180 transitions));
181 }
182 else {
183 if (ACTION_E.equals(eNode.getName())) {
184 String[] transitions = new String[2];
185 Element eActionConf = null;
186 for (Element elem : (List<Element>) eNode.getChildren()) {
187 if (ACTION_OK_E.equals(elem.getName())) {
188 transitions[0] = elem.getAttributeValue(TO_A);
189 }
190 else {
191 if (ACTION_ERROR_E.equals(elem.getName())) {
192 transitions[1] = elem.getAttributeValue(TO_A);
193 }
194 else {
195 if (SLA_INFO.equals(elem.getName()) || CREDENTIALS.equals(elem.getName())) {
196 continue;
197 }
198 else {
199 eActionConf = elem;
200 }
201 }
202 }
203 }
204
205 String credStr = eNode.getAttributeValue(CRED_A);
206 String userRetryMaxStr = eNode.getAttributeValue(USER_RETRY_MAX_A);
207 String userRetryIntervalStr = eNode.getAttributeValue(USER_RETRY_INTERVAL_A);
208
209 String actionConf = XmlUtils.prettyPrint(eActionConf).toString();
210 def.addNode(new ActionNodeDef(eNode.getAttributeValue(NAME_A), actionConf, actionHandlerClass,
211 transitions[0], transitions[1], credStr,
212 userRetryMaxStr, userRetryIntervalStr));
213 }
214 else {
215 if (SLA_INFO.equals(eNode.getName()) || CREDENTIALS.equals(eNode.getName())) {
216 // No operation is required
217 }
218 else {
219 throw new WorkflowException(ErrorCode.E0703, eNode.getName());
220 }
221 }
222 }
223 }
224 }
225 }
226 }
227 }
228 }
229 return def;
230 }
231
232 /**
233 * Validate workflow xml
234 *
235 * @param app
236 * @param node
237 * @param traversed
238 * @throws WorkflowException
239 */
240 private void validate(LiteWorkflowApp app, NodeDef node, Map<String, VisitStatus> traversed) throws WorkflowException {
241 if (!(node instanceof StartNodeDef)) {
242 try {
243 ParamChecker.validateActionName(node.getName());
244 }
245 catch (IllegalArgumentException ex) {
246 throw new WorkflowException(ErrorCode.E0724, ex.getMessage());
247 }
248 }
249 if (node instanceof ActionNodeDef) {
250 try {
251 Element action = XmlUtils.parseXml(node.getConf());
252 boolean supportedAction = Services.get().get(ActionService.class).getExecutor(action.getName()) != null;
253 if (!supportedAction) {
254 throw new WorkflowException(ErrorCode.E0723, node.getName(), action.getName());
255 }
256 }
257 catch (JDOMException ex) {
258 throw new RuntimeException("It should never happen, " + ex.getMessage(), ex);
259 }
260 }
261
262 if (node instanceof EndNodeDef) {
263 traversed.put(node.getName(), VisitStatus.VISITED);
264 return;
265 }
266 if (node instanceof KillNodeDef) {
267 traversed.put(node.getName(), VisitStatus.VISITED);
268 return;
269 }
270 for (String transition : node.getTransitions()) {
271
272 if (app.getNode(transition) == null) {
273 throw new WorkflowException(ErrorCode.E0708, node.getName(), transition);
274 }
275
276 //check if it is a cycle
277 if (traversed.get(app.getNode(transition).getName()) == VisitStatus.VISITING) {
278 throw new WorkflowException(ErrorCode.E0707, app.getNode(transition).getName());
279 }
280 //ignore validated one
281 if (traversed.get(app.getNode(transition).getName()) == VisitStatus.VISITED) {
282 continue;
283 }
284
285 traversed.put(app.getNode(transition).getName(), VisitStatus.VISITING);
286 validate(app, app.getNode(transition), traversed);
287 }
288 traversed.put(node.getName(), VisitStatus.VISITED);
289 }
290 }