001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.workflow.lite; 020 021import java.util.ArrayDeque; 022import java.util.Deque; 023import java.util.HashMap; 024import java.util.HashSet; 025import java.util.List; 026import java.util.Map; 027import java.util.Map.Entry; 028import java.util.Set; 029 030import org.apache.commons.collections.CollectionUtils; 031import org.apache.oozie.ErrorCode; 032import org.apache.oozie.service.ActionService; 033import org.apache.oozie.service.Services; 034import org.apache.oozie.util.ParamChecker; 035import org.apache.oozie.util.XmlUtils; 036import org.apache.oozie.workflow.WorkflowException; 037import org.jdom.Element; 038import org.jdom.JDOMException; 039 040import com.google.common.base.Joiner; 041import com.google.common.base.Objects; 042import com.google.common.base.Optional; 043 044public class LiteWorkflowValidator { 045 046 public void validateWorkflow(LiteWorkflowApp app, boolean validateForkJoin) throws WorkflowException { 047 NodeDef startNode = app.getNode(StartNodeDef.START); 048 if (startNode == null) { 049 throw new WorkflowException(ErrorCode.E0700, "no start node"); // shouldn't happen, but just in case... 050 } 051 052 ForkJoinCount forkJoinCount = new ForkJoinCount(); 053 054 performBasicValidation(app, startNode, new ArrayDeque<String>(), new HashSet<NodeDef>(), forkJoinCount); 055 056 if (validateForkJoin) { 057 // don't validate fork/join pairs if the number of forks and joins mismatch 058 if (forkJoinCount.forks != forkJoinCount.joins) { 059 throw new WorkflowException(ErrorCode.E0730); 060 } 061 062 validateForkJoin(app, 063 startNode, 064 null, 065 null, 066 true, 067 new ArrayDeque<String>(), 068 new HashMap<String, String>(), 069 new HashMap<String, Optional<String>>()); 070 } 071 } 072 073 /** 074 * Basic recursive validation of the workflow: 075 * - it is acyclic, no loops 076 * - names of the actions follow a specific pattern 077 * - all nodes have valid transitions 078 * - it only has supported action nodes 079 * - there is no node that points to itself 080 * - counts fork/join nodes 081 * 082 * @param app The WorkflowApp 083 * @param node Current node we're checking 084 * @param path The list of nodes that we've visited so far in this call chain 085 * @param checkedNodes The list of nodes that we've already checked. For example, if it's a decision node, then the we 086 * don't have to re-walk the entire path because it indicates that it've been done before on a separate path 087 * @param forkJoinCount Number of fork and join nodes 088 * @throws WorkflowException If there is any of the constraints described above is violated 089 */ 090 private void performBasicValidation(LiteWorkflowApp app, NodeDef node, Deque<String> path, Set<NodeDef> checkedNodes, 091 ForkJoinCount forkJoinCount) throws WorkflowException { 092 String nodeName = node.getName(); 093 094 checkActionName(node); 095 if (node instanceof ActionNodeDef) { 096 checkActionNode(node); 097 } else if (node instanceof ForkNodeDef) { 098 forkJoinCount.forks++; 099 } else if (node instanceof JoinNodeDef) { 100 forkJoinCount.joins++; 101 } 102 checkCycle(path, nodeName); 103 104 path.addLast(nodeName); 105 106 List<String> transitions = node.getTransitions(); 107 // Get all transitions and walk the workflow recursively 108 if (!transitions.isEmpty()) { 109 for (final String t : transitions) { 110 NodeDef transitionNode = app.getNode(t); 111 if (transitionNode == null) { 112 throw new WorkflowException(ErrorCode.E0708, node.getName(), t); 113 } 114 115 if (!checkedNodes.contains(transitionNode)) { 116 performBasicValidation(app, transitionNode, path, checkedNodes, forkJoinCount); 117 checkedNodes.add(transitionNode); 118 } 119 } 120 } 121 122 path.remove(nodeName); 123 } 124 125 /** 126 * This method recursively validates two things: 127 * - fork/join methods are properly paired 128 * - there are no multiple "okTo" paths to a given node 129 * 130 * Important: this method assumes that the workflow is not acyclic - therefore this must run after performBasicValidation() 131 * 132 * @param app The WorkflowApp 133 * @param node Current node we're checking 134 * @param currentFork Current fork node (null if we are not under a fork path) 135 * @param topDecisionParent The top (eldest) decision node along the path to this node, or null if there isn't one 136 * @param okPath false if node (or an ancestor of node) was gotten to via an "error to" transition or via a join node that has 137 * already been visited at least once before 138 * @param forkJoins Map that contains a mapping of fork-join node pairs. 139 * @param nodeAndDecisionParents Map that contains a mapping of nodes and their eldest decision node 140 * @throws WorkflowException If there is any of the constraints described above is violated 141 */ 142 private void validateForkJoin(LiteWorkflowApp app, NodeDef node, NodeDef currentFork, String topDecisionParent, 143 boolean okPath, Deque<String> path, Map<String, String> forkJoins, 144 Map<String, Optional<String>> nodeAndDecisionParents) throws WorkflowException { 145 final String nodeName = node.getName(); 146 147 path.addLast(nodeName); 148 149 /* If we're walking an "okTo" path and the nodes are not Kill/Join/End, we have to make sure that only a single 150 * "okTo" path exists to the current node. 151 * 152 * The "topDecisionParent" represents the eldest decision in the chain that we've gone through. For example, let's assume 153 * that D1, D2, D3 are decision nodes and A is an action node. 154 * 155 * D1-->D2-->D3---> ... (rest of the WF) 156 * | | | 157 * | | | 158 * | | +----> +---+ 159 * | +---------> | A | 160 * +-------------> +---+ 161 * 162 * In this case, there are three "okTo" paths to "A" but it's still a valid workflow because the eldest decision node 163 * is D1 and during every run, there is only one possible execution path that leads to A (D1->A, D1->D2->A or 164 * (D1->D2->D3->A). In the code, if we encounter a decision node and we already have one, we don't update it. If it's null 165 * then we set it to the current decision node we're under. 166 * 167 * If the "current" and "top" parents are null, it means that we reached the node from two separate "okTo" paths, which is 168 * not acceptable. 169 * 170 * Also, if we have two distinct top decision parents it means that the node is reachable from two decision paths which 171 * are not "chained" (like in the example). 172 * 173 * It's worth noting that the last two examples can only occur in case of fork-join when we start to execute at least 174 * two separate paths in parallel. Without fork-join, multiple parents or two null parents would mean that there is a loop 175 * in the workflow but that should not happen since it has been validated. 176 */ 177 if (okPath && !(node instanceof KillNodeDef) && !(node instanceof JoinNodeDef) && !(node instanceof EndNodeDef)) { 178 // using Optional here so we can distinguish between "non-visited" and "visited - no parent" state. 179 Optional<String> decisionParentOpt = nodeAndDecisionParents.get(nodeName); 180 if (decisionParentOpt == null) { 181 nodeAndDecisionParents.put(node.getName(), Optional.fromNullable(topDecisionParent)); 182 } else { 183 String decisionParent = decisionParentOpt.isPresent() ? decisionParentOpt.get() : null; 184 185 if ((decisionParent == null && topDecisionParent == null) || !Objects.equal(decisionParent, topDecisionParent)) { 186 throw new WorkflowException(ErrorCode.E0743, nodeName); 187 } 188 } 189 } 190 191 /* Fork-Join validation logic: 192 * 193 * At each Fork node, we recurse to every possible paths, changing the "currentFork" variable to the Fork node. We stop 194 * walking as soon as we encounter a Join node. At the Join node, we update the forkJoin mapping, which maintains 195 * the relationship between every fork-join pair (actually it's join->fork mapping). We check whether the join->fork 196 * mapping already contains another Fork node, which means that the Join is reachable from at least two distinct 197 * Fork nodes, so we terminate the validation. 198 * 199 * From the Join node, we don't recurse further. Therefore, all recursive calls return back to the point where we called 200 * validateForkJoin() from the Fork node in question. 201 * 202 * At this point, we have to check how many different Join nodes we've found at each different paths. We collect them to 203 * a set, then we make sure that we have only a single Join node for all Fork paths. Otherwise the workflow is broken. 204 * 205 * If we have only a single Join, then we get the transition node from the Join and go on with the recursive validation - 206 * this time we use the original "currentFork" variable that we have on the stack. With this approach, nested 207 * Fork-Joins are handled correctly. 208 */ 209 if (node instanceof ForkNodeDef) { 210 final List<String> transitions = node.getTransitions(); 211 212 checkForkTransitions(app, transitions, node); 213 214 for (String t : transitions) { 215 NodeDef transition = app.getNode(t); 216 validateForkJoin(app, transition, node, topDecisionParent, okPath, path, forkJoins, nodeAndDecisionParents); 217 } 218 219 // get the Join node for this ForkNode & validate it (we must have only one) 220 Set<String> joins = new HashSet<String>(); 221 collectJoins(app, forkJoins, nodeName, joins); 222 checkJoins(joins, nodeName); 223 224 List<String> joinTransitions = app.getNode(joins.iterator().next()).getTransitions(); 225 NodeDef next = app.getNode(joinTransitions.get(0)); 226 227 validateForkJoin(app, next, currentFork, topDecisionParent, okPath, path, forkJoins, nodeAndDecisionParents); 228 } else if (node instanceof JoinNodeDef) { 229 if (currentFork == null) { 230 throw new WorkflowException(ErrorCode.E0742, node.getName()); 231 } 232 233 // join --> fork mapping 234 String forkNode = forkJoins.get(nodeName); 235 if (forkNode == null) { 236 forkJoins.put(nodeName, currentFork.getName()); 237 } else if (!forkNode.equals(currentFork.getName())) { 238 throw new WorkflowException(ErrorCode.E0758, node.getName(), forkNode + "," + currentFork); 239 } 240 } else if (node instanceof DecisionNodeDef) { 241 List<String> transitions = node.getTransitions(); 242 243 // see explanation above - if we already have a topDecisionParent, we don't update it 244 String parentDecisionNode = topDecisionParent; 245 if (parentDecisionNode == null) { 246 parentDecisionNode = nodeName; 247 } 248 249 for (String t : transitions) { 250 NodeDef transition = app.getNode(t); 251 validateForkJoin(app, transition, currentFork, parentDecisionNode, okPath, path, forkJoins, 252 nodeAndDecisionParents); 253 } 254 } else if (node instanceof KillNodeDef) { 255 // no op 256 } else if (node instanceof EndNodeDef) { 257 // We can't end the WF if we're on a Fork path. From the "path" deque, we remove the last node (which 258 // is the current "End") and look at last node again so we know where we came from 259 if (currentFork != null) { 260 path.removeLast(); 261 String previous = path.peekLast(); 262 throw new WorkflowException(ErrorCode.E0737, previous, node.getName()); 263 } 264 } else if (node instanceof ActionNodeDef) { 265 String transition = node.getTransitions().get(0); // "ok to" transition 266 NodeDef okNode = app.getNode(transition); 267 validateForkJoin(app, okNode, currentFork, topDecisionParent, okPath, path, forkJoins, nodeAndDecisionParents); 268 269 transition = node.getTransitions().get(1); // "error to" transition 270 NodeDef errorNode = app.getNode(transition); 271 validateForkJoin(app, errorNode, currentFork, topDecisionParent, false, path, forkJoins, nodeAndDecisionParents); 272 } else if (node instanceof StartNodeDef) { 273 String transition = node.getTransitions().get(0); // start always has only 1 transition 274 NodeDef tranNode = app.getNode(transition); 275 validateForkJoin(app, tranNode, currentFork, topDecisionParent, okPath, path, forkJoins, nodeAndDecisionParents); 276 } else { 277 throw new WorkflowException(ErrorCode.E0740, node.getClass()); 278 } 279 280 path.remove(nodeName); 281 } 282 283 private void checkActionName(NodeDef node) throws WorkflowException { 284 if (!(node instanceof StartNodeDef)) { 285 try { 286 ParamChecker.validateActionName(node.getName()); 287 } catch (IllegalArgumentException ex) { 288 throw new WorkflowException(ErrorCode.E0724, ex.getMessage()); 289 } 290 } 291 } 292 293 private void checkActionNode(NodeDef node) throws WorkflowException { 294 try { 295 Element action = XmlUtils.parseXml(node.getConf()); 296 ActionService actionService = Services.get().get(ActionService.class); 297 boolean supportedAction = actionService.hasActionType(action.getName()); 298 if (!supportedAction) { 299 throw new WorkflowException(ErrorCode.E0723, node.getName(), action.getName()); 300 } 301 } catch (JDOMException ex) { 302 throw new WorkflowException(ErrorCode.E0700, "JDOMException: " + ex.getMessage()); 303 } 304 } 305 306 private void checkCycle(Deque<String> path, String nodeName) throws WorkflowException { 307 if (path.contains(nodeName)) { 308 path.addLast(nodeName); 309 throw new WorkflowException(ErrorCode.E0707, nodeName, Joiner.on("->").join(path)); 310 } 311 } 312 313 // Check that a fork doesn't go to the same node more than once 314 private void checkForkTransitions(LiteWorkflowApp app, List<String> transitionsList, NodeDef node) throws WorkflowException { 315 for (final String t : transitionsList) { 316 NodeDef aNode = app.getNode(t); 317 // Now we have to figure out which node is the problem and what type of node they are (join and kill are ok) 318 if (!(aNode instanceof JoinNodeDef) && !(aNode instanceof KillNodeDef)) { 319 int count = CollectionUtils.cardinality(t, transitionsList); 320 321 if (count > 1) { 322 throw new WorkflowException(ErrorCode.E0744, node.getName(), t); 323 } 324 } 325 } 326 } 327 328 private void collectJoins(LiteWorkflowApp app, Map<String, String> forkJoinPairs, String nodeName, Set<String> joins) { 329 for (Entry<String, String> entry : forkJoinPairs.entrySet()) { 330 if (entry.getValue().equals(nodeName)) { 331 joins.add(app.getNode(entry.getKey()).getName()); 332 } 333 } 334 } 335 336 private void checkJoins(Set<String> joinNodes, String forkName) throws WorkflowException { 337 if (joinNodes.size() == 0) { 338 throw new WorkflowException(ErrorCode.E0733, forkName); 339 } 340 341 if (joinNodes.size() > 1) { 342 throw new WorkflowException(ErrorCode.E0757, forkName, Joiner.on(",").join(joinNodes)); 343 } 344 } 345 346 // Tiny utility class where we keep track of how many fork and join nodes we have found 347 private class ForkJoinCount { 348 int forks = 0; 349 int joins = 0; 350 } 351}