001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.workflow.lite; 019 020 import org.apache.oozie.workflow.WorkflowException; 021 import org.apache.oozie.util.IOUtils; 022 import org.apache.oozie.util.XmlUtils; 023 import org.apache.oozie.util.ParamChecker; 024 import org.apache.oozie.ErrorCode; 025 import org.apache.oozie.service.Services; 026 import org.apache.oozie.service.ActionService; 027 import org.jdom.Element; 028 import org.jdom.JDOMException; 029 import org.jdom.Namespace; 030 import org.xml.sax.SAXException; 031 032 import javax.xml.transform.stream.StreamSource; 033 import javax.xml.validation.Schema; 034 import javax.xml.validation.Validator; 035 import java.io.IOException; 036 import java.io.Reader; 037 import java.io.StringReader; 038 import java.io.StringWriter; 039 import java.util.ArrayList; 040 import java.util.HashMap; 041 import java.util.List; 042 import java.util.Map; 043 044 /** 045 * Class to parse and validate workflow xml 046 */ 047 public class LiteWorkflowAppParser { 048 049 private static final String DECISION_E = "decision"; 050 private static final String ACTION_E = "action"; 051 private static final String END_E = "end"; 052 private static final String START_E = "start"; 053 private static final String JOIN_E = "join"; 054 private static final String FORK_E = "fork"; 055 private static final Object KILL_E = "kill"; 056 057 private static final String SLA_INFO = "info"; 058 private static final String CREDENTIALS = "credentials"; 059 060 private static final String NAME_A = "name"; 061 private static final String CRED_A = "cred"; 062 private static final String USER_RETRY_MAX_A = "retry-max"; 063 private static final String USER_RETRY_INTERVAL_A = "retry-interval"; 064 private static final String TO_A = "to"; 065 066 private static final String FORK_PATH_E = "path"; 067 private static final String FORK_START_A = "start"; 068 069 private static final String ACTION_OK_E = "ok"; 070 private static final String ACTION_ERROR_E = "error"; 071 072 private static final String DECISION_SWITCH_E = "switch"; 073 private static final String DECISION_CASE_E = "case"; 074 private static final String DECISION_DEFAULT_E = "default"; 075 076 private static final String KILL_MESSAGE_E = "message"; 077 078 private Schema schema; 079 private Class<? extends DecisionNodeHandler> decisionHandlerClass; 080 private Class<? extends ActionNodeHandler> actionHandlerClass; 081 082 private static enum VisitStatus { 083 VISITING, VISITED 084 } 085 086 ; 087 088 089 public LiteWorkflowAppParser(Schema schema, Class<? extends DecisionNodeHandler> decisionHandlerClass, 090 Class<? extends ActionNodeHandler> actionHandlerClass) throws WorkflowException { 091 this.schema = schema; 092 this.decisionHandlerClass = decisionHandlerClass; 093 this.actionHandlerClass = actionHandlerClass; 094 } 095 096 /** 097 * Parse and validate xml to {@link LiteWorkflowApp} 098 * 099 * @param reader 100 * @return LiteWorkflowApp 101 * @throws WorkflowException 102 */ 103 public LiteWorkflowApp validateAndParse(Reader reader) throws WorkflowException { 104 try { 105 StringWriter writer = new StringWriter(); 106 IOUtils.copyCharStream(reader, writer); 107 String strDef = writer.toString(); 108 109 if (schema != null) { 110 Validator validator = schema.newValidator(); 111 validator.validate(new StreamSource(new StringReader(strDef))); 112 } 113 114 Element wfDefElement = XmlUtils.parseXml(strDef); 115 LiteWorkflowApp app = parse(strDef, wfDefElement); 116 Map<String, VisitStatus> traversed = new HashMap<String, VisitStatus>(); 117 traversed.put(app.getNode(StartNodeDef.START).getName(), VisitStatus.VISITING); 118 validate(app, app.getNode(StartNodeDef.START), traversed); 119 return app; 120 } 121 catch (JDOMException ex) { 122 throw new WorkflowException(ErrorCode.E0700, ex.getMessage(), ex); 123 } 124 catch (SAXException ex) { 125 throw new WorkflowException(ErrorCode.E0701, ex.getMessage(), ex); 126 } 127 catch (IOException ex) { 128 throw new WorkflowException(ErrorCode.E0702, ex.getMessage(), ex); 129 } 130 } 131 132 /** 133 * Parse xml to {@link LiteWorkflowApp} 134 * 135 * @param strDef 136 * @param root 137 * @return LiteWorkflowApp 138 * @throws WorkflowException 139 */ 140 @SuppressWarnings({"unchecked", "ConstantConditions"}) 141 private LiteWorkflowApp parse(String strDef, Element root) throws WorkflowException { 142 Namespace ns = root.getNamespace(); 143 LiteWorkflowApp def = null; 144 for (Element eNode : (List<Element>) root.getChildren()) { 145 if (eNode.getName().equals(START_E)) { 146 def = new LiteWorkflowApp(root.getAttributeValue(NAME_A), strDef, 147 new StartNodeDef(eNode.getAttributeValue(TO_A))); 148 } 149 else { 150 if (eNode.getName().equals(END_E)) { 151 def.addNode(new EndNodeDef(eNode.getAttributeValue(NAME_A))); 152 } 153 else { 154 if (eNode.getName().equals(KILL_E)) { 155 def.addNode(new KillNodeDef(eNode.getAttributeValue(NAME_A), eNode.getChildText(KILL_MESSAGE_E, ns))); 156 } 157 else { 158 if (eNode.getName().equals(FORK_E)) { 159 List<String> paths = new ArrayList<String>(); 160 for (Element tran : (List<Element>) eNode.getChildren(FORK_PATH_E, ns)) { 161 paths.add(tran.getAttributeValue(FORK_START_A)); 162 } 163 def.addNode(new ForkNodeDef(eNode.getAttributeValue(NAME_A), paths)); 164 } 165 else { 166 if (eNode.getName().equals(JOIN_E)) { 167 def.addNode(new JoinNodeDef(eNode.getAttributeValue(NAME_A), eNode.getAttributeValue(TO_A))); 168 } 169 else { 170 if (eNode.getName().equals(DECISION_E)) { 171 Element eSwitch = eNode.getChild(DECISION_SWITCH_E, ns); 172 List<String> transitions = new ArrayList<String>(); 173 for (Element e : (List<Element>) eSwitch.getChildren(DECISION_CASE_E, ns)) { 174 transitions.add(e.getAttributeValue(TO_A)); 175 } 176 transitions.add(eSwitch.getChild(DECISION_DEFAULT_E, ns).getAttributeValue(TO_A)); 177 178 String switchStatement = XmlUtils.prettyPrint(eSwitch).toString(); 179 def.addNode(new DecisionNodeDef(eNode.getAttributeValue(NAME_A), switchStatement, decisionHandlerClass, 180 transitions)); 181 } 182 else { 183 if (ACTION_E.equals(eNode.getName())) { 184 String[] transitions = new String[2]; 185 Element eActionConf = null; 186 for (Element elem : (List<Element>) eNode.getChildren()) { 187 if (ACTION_OK_E.equals(elem.getName())) { 188 transitions[0] = elem.getAttributeValue(TO_A); 189 } 190 else { 191 if (ACTION_ERROR_E.equals(elem.getName())) { 192 transitions[1] = elem.getAttributeValue(TO_A); 193 } 194 else { 195 if (SLA_INFO.equals(elem.getName()) || CREDENTIALS.equals(elem.getName())) { 196 continue; 197 } 198 else { 199 eActionConf = elem; 200 } 201 } 202 } 203 } 204 205 String credStr = eNode.getAttributeValue(CRED_A); 206 String userRetryMaxStr = eNode.getAttributeValue(USER_RETRY_MAX_A); 207 String userRetryIntervalStr = eNode.getAttributeValue(USER_RETRY_INTERVAL_A); 208 209 String actionConf = XmlUtils.prettyPrint(eActionConf).toString(); 210 def.addNode(new ActionNodeDef(eNode.getAttributeValue(NAME_A), actionConf, actionHandlerClass, 211 transitions[0], transitions[1], credStr, 212 userRetryMaxStr, userRetryIntervalStr)); 213 } 214 else { 215 if (SLA_INFO.equals(eNode.getName()) || CREDENTIALS.equals(eNode.getName())) { 216 // No operation is required 217 } 218 else { 219 throw new WorkflowException(ErrorCode.E0703, eNode.getName()); 220 } 221 } 222 } 223 } 224 } 225 } 226 } 227 } 228 } 229 return def; 230 } 231 232 /** 233 * Validate workflow xml 234 * 235 * @param app 236 * @param node 237 * @param traversed 238 * @throws WorkflowException 239 */ 240 private void validate(LiteWorkflowApp app, NodeDef node, Map<String, VisitStatus> traversed) throws WorkflowException { 241 if (!(node instanceof StartNodeDef)) { 242 try { 243 ParamChecker.validateActionName(node.getName()); 244 } 245 catch (IllegalArgumentException ex) { 246 throw new WorkflowException(ErrorCode.E0724, ex.getMessage()); 247 } 248 } 249 if (node instanceof ActionNodeDef) { 250 try { 251 Element action = XmlUtils.parseXml(node.getConf()); 252 boolean supportedAction = Services.get().get(ActionService.class).getExecutor(action.getName()) != null; 253 if (!supportedAction) { 254 throw new WorkflowException(ErrorCode.E0723, node.getName(), action.getName()); 255 } 256 } 257 catch (JDOMException ex) { 258 throw new RuntimeException("It should never happen, " + ex.getMessage(), ex); 259 } 260 } 261 262 if (node instanceof EndNodeDef) { 263 traversed.put(node.getName(), VisitStatus.VISITED); 264 return; 265 } 266 if (node instanceof KillNodeDef) { 267 traversed.put(node.getName(), VisitStatus.VISITED); 268 return; 269 } 270 for (String transition : node.getTransitions()) { 271 272 if (app.getNode(transition) == null) { 273 throw new WorkflowException(ErrorCode.E0708, node.getName(), transition); 274 } 275 276 //check if it is a cycle 277 if (traversed.get(app.getNode(transition).getName()) == VisitStatus.VISITING) { 278 throw new WorkflowException(ErrorCode.E0707, app.getNode(transition).getName()); 279 } 280 //ignore validated one 281 if (traversed.get(app.getNode(transition).getName()) == VisitStatus.VISITED) { 282 continue; 283 } 284 285 traversed.put(app.getNode(transition).getName(), VisitStatus.VISITING); 286 validate(app, app.getNode(transition), traversed); 287 } 288 traversed.put(node.getName(), VisitStatus.VISITED); 289 } 290 }