001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.workflow.lite;
020
021import java.util.ArrayDeque;
022import java.util.Deque;
023import java.util.HashMap;
024import java.util.HashSet;
025import java.util.List;
026import java.util.Map;
027import java.util.Map.Entry;
028import java.util.Set;
029
030import org.apache.commons.collections.CollectionUtils;
031import org.apache.oozie.ErrorCode;
032import org.apache.oozie.service.ActionService;
033import org.apache.oozie.service.Services;
034import org.apache.oozie.util.ParamChecker;
035import org.apache.oozie.util.XmlUtils;
036import org.apache.oozie.workflow.WorkflowException;
037import org.jdom.Element;
038import org.jdom.JDOMException;
039
040import com.google.common.base.Joiner;
041import com.google.common.base.Objects;
042import com.google.common.base.Optional;
043
044public class LiteWorkflowValidator {
045
046    public void validateWorkflow(LiteWorkflowApp app, boolean validateForkJoin) throws WorkflowException {
047        NodeDef startNode = app.getNode(StartNodeDef.START);
048        if (startNode == null) {
049            throw new WorkflowException(ErrorCode.E0700, "no start node"); // shouldn't happen, but just in case...
050        }
051
052        ForkJoinCount forkJoinCount = new ForkJoinCount();
053
054        performBasicValidation(app, startNode, new ArrayDeque<String>(), new HashSet<NodeDef>(), forkJoinCount);
055
056        if (validateForkJoin) {
057            // don't validate fork/join pairs if the number of forks and joins mismatch
058            if (forkJoinCount.forks != forkJoinCount.joins) {
059                throw new WorkflowException(ErrorCode.E0730);
060            }
061
062            validateForkJoin(app,
063                    startNode,
064                    null,
065                    null,
066                    true,
067                    new ArrayDeque<String>(),
068                    new HashMap<String, String>(),
069                    new HashMap<String, Optional<String>>());
070        }
071    }
072
073    /**
074     * Basic recursive validation of the workflow:
075     * - it is acyclic, no loops
076     * - names of the actions follow a specific pattern
077     * - all nodes have valid transitions
078     * - it only has supported action nodes
079     * - there is no node that points to itself
080     * - counts fork/join nodes
081     *
082     * @param app The WorkflowApp
083     * @param node Current node we're checking
084     * @param path The list of nodes that we've visited so far in this call chain
085     * @param checkedNodes The list of nodes that we've already checked. For example, if it's a decision node, then the we
086     * don't have to re-walk the entire path because it indicates that it've been done before on a separate path
087     * @param forkJoinCount Number of fork and join nodes
088     * @throws WorkflowException If there is any of the constraints described above is violated
089     */
090    private void performBasicValidation(LiteWorkflowApp app, NodeDef node, Deque<String> path, Set<NodeDef> checkedNodes,
091            ForkJoinCount forkJoinCount) throws WorkflowException {
092        String nodeName = node.getName();
093
094        checkActionName(node);
095        if (node instanceof ActionNodeDef) {
096            checkActionNode(node);
097        } else if (node instanceof ForkNodeDef) {
098            forkJoinCount.forks++;
099        } else if (node instanceof JoinNodeDef) {
100            forkJoinCount.joins++;
101        }
102        checkCycle(path, nodeName);
103
104        path.addLast(nodeName);
105
106        List<String> transitions = node.getTransitions();
107        // Get all transitions and walk the workflow recursively
108        if (!transitions.isEmpty()) {
109            for (final String t : transitions) {
110                NodeDef transitionNode = app.getNode(t);
111                if (transitionNode == null) {
112                    throw new WorkflowException(ErrorCode.E0708, node.getName(), t);
113                }
114
115                if (!checkedNodes.contains(transitionNode)) {
116                    performBasicValidation(app, transitionNode, path, checkedNodes, forkJoinCount);
117                    checkedNodes.add(transitionNode);
118                }
119            }
120        }
121
122        path.remove(nodeName);
123    }
124
125    /**
126     * This method recursively validates two things:
127     * - fork/join methods are properly paired
128     * - there are no multiple "okTo" paths to a given node
129     *
130     * Important: this method assumes that the workflow is not acyclic - therefore this must run after performBasicValidation()
131     *
132     * @param app The WorkflowApp
133     * @param node Current node we're checking
134     * @param currentFork Current fork node (null if we are not under a fork path)
135     * @param topDecisionParent The top (eldest) decision node along the path to this node, or null if there isn't one
136     * @param okPath false if node (or an ancestor of node) was gotten to via an "error to" transition or via a join node that has
137     * already been visited at least once before
138     * @param forkJoins Map that contains a mapping of fork-join node pairs.
139     * @param nodeAndDecisionParents Map that contains a mapping of nodes and their eldest decision node
140     * @throws WorkflowException If there is any of the constraints described above is violated
141     */
142    private void validateForkJoin(LiteWorkflowApp app, NodeDef node, NodeDef currentFork, String topDecisionParent,
143            boolean okPath, Deque<String> path, Map<String, String> forkJoins,
144            Map<String, Optional<String>> nodeAndDecisionParents) throws WorkflowException {
145        final String nodeName = node.getName();
146
147        path.addLast(nodeName);
148
149        /* If we're walking an "okTo" path and the nodes are not Kill/Join/End, we have to make sure that only a single
150         * "okTo" path exists to the current node.
151         *
152         * The "topDecisionParent" represents the eldest decision in the chain that we've gone through. For example, let's assume
153         * that D1, D2, D3 are decision nodes and A is an action node.
154         *
155         * D1-->D2-->D3---> ... (rest of the WF)
156         *  |   |    |
157         *  |   |    |
158         *  |   |    +----> +---+
159         *  |   +---------> | A |
160         *  +-------------> +---+
161         *
162         * In this case, there are three "okTo" paths to "A" but it's still a valid workflow because the eldest decision node
163         * is D1 and during every run, there is only one possible execution path that leads to A (D1->A, D1->D2->A or
164         * (D1->D2->D3->A). In the code, if we encounter a decision node and we already have one, we don't update it. If it's null
165         * then we set it to the current decision node we're under.
166         *
167         * If the "current" and "top" parents are null, it means that we reached the node from two separate "okTo" paths, which is
168         * not acceptable.
169         *
170         * Also, if we have two distinct top decision parents it means that the node is reachable from two decision paths which
171         * are not "chained" (like in the example).
172         *
173         * It's worth noting that the last two examples can only occur in case of fork-join when we start to execute at least
174         * two separate paths in parallel. Without fork-join, multiple parents or two null parents would mean that there is a loop
175         * in the workflow but that should not happen since it has been validated.
176         */
177        if (okPath && !(node instanceof KillNodeDef) && !(node instanceof JoinNodeDef) && !(node instanceof EndNodeDef)) {
178            // using Optional here so we can distinguish between "non-visited" and "visited - no parent" state.
179            Optional<String> decisionParentOpt = nodeAndDecisionParents.get(nodeName);
180            if (decisionParentOpt == null) {
181                nodeAndDecisionParents.put(node.getName(), Optional.fromNullable(topDecisionParent));
182            } else {
183                String decisionParent = decisionParentOpt.isPresent() ? decisionParentOpt.get() : null;
184
185                if ((decisionParent == null && topDecisionParent == null) || !Objects.equal(decisionParent, topDecisionParent)) {
186                    throw new WorkflowException(ErrorCode.E0743, nodeName);
187                }
188            }
189        }
190
191        /* Fork-Join validation logic:
192         *
193         * At each Fork node, we recurse to every possible paths, changing the "currentFork" variable to the Fork node. We stop
194         * walking as soon as we encounter a Join node. At the Join node, we update the forkJoin mapping, which maintains
195         * the relationship between every fork-join pair (actually it's join->fork mapping). We check whether the join->fork
196         * mapping already contains another Fork node, which means that the Join is reachable from at least two distinct
197         * Fork nodes, so we terminate the validation.
198         *
199         * From the Join node, we don't recurse further. Therefore, all recursive calls return back to the point where we called
200         * validateForkJoin() from the Fork node in question.
201         *
202         * At this point, we have to check how many different Join nodes we've found at each different paths. We collect them to
203         * a set, then we make sure that we have only a single Join node for all Fork paths. Otherwise the workflow is broken.
204         *
205         * If we have only a single Join, then we get the transition node from the Join and go on with the recursive validation -
206         * this time we use the original "currentFork" variable that we have on the stack. With this approach, nested
207         * Fork-Joins are handled correctly.
208         */
209        if (node instanceof ForkNodeDef) {
210            final List<String> transitions = node.getTransitions();
211
212            checkForkTransitions(app, transitions, node);
213
214            for (String t : transitions) {
215                NodeDef transition = app.getNode(t);
216                validateForkJoin(app, transition, node, topDecisionParent, okPath, path, forkJoins, nodeAndDecisionParents);
217            }
218
219            // get the Join node for this ForkNode & validate it (we must have only one)
220            Set<String> joins = new HashSet<String>();
221            collectJoins(app, forkJoins, nodeName, joins);
222            checkJoins(joins, nodeName);
223
224            List<String> joinTransitions = app.getNode(joins.iterator().next()).getTransitions();
225            NodeDef next = app.getNode(joinTransitions.get(0));
226
227            validateForkJoin(app, next, currentFork, topDecisionParent, okPath, path, forkJoins, nodeAndDecisionParents);
228        } else if (node instanceof JoinNodeDef) {
229            if (currentFork == null) {
230                throw new WorkflowException(ErrorCode.E0742, node.getName());
231            }
232
233            // join --> fork mapping
234            String forkNode = forkJoins.get(nodeName);
235            if (forkNode == null) {
236                forkJoins.put(nodeName, currentFork.getName());
237            } else if (!forkNode.equals(currentFork.getName())) {
238                throw new WorkflowException(ErrorCode.E0758, node.getName(), forkNode + "," + currentFork);
239            }
240        } else if (node instanceof DecisionNodeDef) {
241            List<String> transitions = node.getTransitions();
242
243            // see explanation above - if we already have a topDecisionParent, we don't update it
244            String parentDecisionNode = topDecisionParent;
245            if (parentDecisionNode == null) {
246                parentDecisionNode = nodeName;
247            }
248
249            for (String t : transitions) {
250                NodeDef transition = app.getNode(t);
251                validateForkJoin(app, transition, currentFork, parentDecisionNode, okPath, path, forkJoins,
252                        nodeAndDecisionParents);
253            }
254        } else if (node instanceof KillNodeDef) {
255            // no op
256        } else if (node instanceof EndNodeDef) {
257            // We can't end the WF if we're on a Fork path. From the "path" deque, we remove the last node (which
258            // is the current "End") and look at last node again so we know where we came from
259            if (currentFork != null) {
260                path.removeLast();
261                String previous = path.peekLast();
262                throw new WorkflowException(ErrorCode.E0737, previous, node.getName());
263            }
264        } else if (node instanceof ActionNodeDef) {
265            String transition = node.getTransitions().get(0);   // "ok to" transition
266            NodeDef okNode = app.getNode(transition);
267            validateForkJoin(app, okNode, currentFork, topDecisionParent, true, path, forkJoins, nodeAndDecisionParents);
268
269            transition = node.getTransitions().get(1);          // "error to" transition
270            NodeDef errorNode = app.getNode(transition);
271            validateForkJoin(app, errorNode, currentFork, topDecisionParent, false, path, forkJoins, nodeAndDecisionParents);
272        } else if (node instanceof StartNodeDef) {
273            String transition = node.getTransitions().get(0);   // start always has only 1 transition
274            NodeDef tranNode = app.getNode(transition);
275            validateForkJoin(app, tranNode, currentFork, topDecisionParent, okPath, path, forkJoins, nodeAndDecisionParents);
276        } else {
277            throw new WorkflowException(ErrorCode.E0740, node.getClass());
278        }
279
280        path.remove(nodeName);
281    }
282
283    private void checkActionName(NodeDef node) throws WorkflowException {
284        if (!(node instanceof StartNodeDef)) {
285            try {
286                ParamChecker.validateActionName(node.getName());
287            } catch (IllegalArgumentException ex) {
288                throw new WorkflowException(ErrorCode.E0724, ex.getMessage());
289            }
290        }
291    }
292
293    private void checkActionNode(NodeDef node) throws WorkflowException {
294        try {
295            Element action = XmlUtils.parseXml(node.getConf());
296            ActionService actionService = Services.get().get(ActionService.class);
297            boolean supportedAction = actionService.hasActionType(action.getName());
298            if (!supportedAction) {
299                throw new WorkflowException(ErrorCode.E0723, node.getName(), action.getName());
300            }
301        } catch (JDOMException ex) {
302            throw new WorkflowException(ErrorCode.E0700, "JDOMException: " + ex.getMessage());
303        }
304    }
305
306    private void checkCycle(Deque<String> path, String nodeName) throws WorkflowException {
307        if (path.contains(nodeName)) {
308            path.addLast(nodeName);
309            throw new WorkflowException(ErrorCode.E0707, nodeName, Joiner.on("->").join(path));
310        }
311    }
312
313    // Check that a fork doesn't go to the same node more than once
314    private void checkForkTransitions(LiteWorkflowApp app, List<String> transitionsList, NodeDef node) throws WorkflowException {
315        for (final String t : transitionsList) {
316            NodeDef aNode = app.getNode(t);
317            // Now we have to figure out which node is the problem and what type of node they are (join and kill are ok)
318            if (!(aNode instanceof JoinNodeDef) && !(aNode instanceof KillNodeDef)) {
319                int count = CollectionUtils.cardinality(t, transitionsList);
320
321                if (count > 1) {
322                    throw new WorkflowException(ErrorCode.E0744, node.getName(), t);
323                }
324            }
325        }
326    }
327
328    private void collectJoins(LiteWorkflowApp app, Map<String, String> forkJoinPairs, String nodeName, Set<String> joins) {
329        for (Entry<String, String> entry : forkJoinPairs.entrySet()) {
330            if (entry.getValue().equals(nodeName)) {
331                joins.add(app.getNode(entry.getKey()).getName());
332            }
333        }
334    }
335
336    private void checkJoins(Set<String> joinNodes, String forkName) throws WorkflowException {
337        if (joinNodes.size() == 0) {
338            throw new WorkflowException(ErrorCode.E0733, forkName);
339        }
340
341        if (joinNodes.size() > 1) {
342            throw new WorkflowException(ErrorCode.E0757, forkName, Joiner.on(",").join(joinNodes));
343        }
344    }
345
346    // Tiny utility class where we keep track of how many fork and join nodes we have found
347    private class ForkJoinCount {
348        int forks = 0;
349        int joins = 0;
350    }
351}