001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.workflow.lite;
019
020import org.apache.oozie.workflow.WorkflowException;
021import org.apache.oozie.util.ELUtils;
022import org.apache.oozie.util.IOUtils;
023import org.apache.oozie.util.XConfiguration;
024import org.apache.oozie.util.XmlUtils;
025import org.apache.oozie.util.ParamChecker;
026import org.apache.oozie.util.ParameterVerifier;
027import org.apache.oozie.util.ParameterVerifierException;
028import org.apache.oozie.ErrorCode;
029import org.apache.oozie.action.ActionExecutor;
030import org.apache.oozie.service.Services;
031import org.apache.oozie.service.ActionService;
032import org.apache.commons.lang.StringUtils;
033import org.apache.hadoop.conf.Configuration;
034import org.jdom.Element;
035import org.jdom.JDOMException;
036import org.jdom.Namespace;
037import org.xml.sax.SAXException;
038
039import javax.xml.transform.stream.StreamSource;
040import javax.xml.validation.Schema;
041import javax.xml.validation.Validator;
042
043import java.io.IOException;
044import java.io.Reader;
045import java.io.StringReader;
046import java.io.StringWriter;
047import java.util.ArrayList;
048import java.util.Arrays;
049import java.util.Deque;
050import java.util.HashMap;
051import java.util.HashSet;
052import java.util.LinkedList;
053import java.util.List;
054import java.util.Map;
055
056/**
057 * Class to parse and validate workflow xml
058 */
059public class LiteWorkflowAppParser {
060
061    private static final String DECISION_E = "decision";
062    private static final String ACTION_E = "action";
063    private static final String END_E = "end";
064    private static final String START_E = "start";
065    private static final String JOIN_E = "join";
066    private static final String FORK_E = "fork";
067    private static final Object KILL_E = "kill";
068
069    private static final String SLA_INFO = "info";
070    private static final String CREDENTIALS = "credentials";
071    private static final String GLOBAL = "global";
072    private static final String PARAMETERS = "parameters";
073
074    private static final String NAME_A = "name";
075    private static final String CRED_A = "cred";
076    private static final String USER_RETRY_MAX_A = "retry-max";
077    private static final String USER_RETRY_INTERVAL_A = "retry-interval";
078    private static final String TO_A = "to";
079
080    private static final String FORK_PATH_E = "path";
081    private static final String FORK_START_A = "start";
082
083    private static final String ACTION_OK_E = "ok";
084    private static final String ACTION_ERROR_E = "error";
085
086    private static final String DECISION_SWITCH_E = "switch";
087    private static final String DECISION_CASE_E = "case";
088    private static final String DECISION_DEFAULT_E = "default";
089
090    private static final String KILL_MESSAGE_E = "message";
091    public static final String VALIDATE_FORK_JOIN = "oozie.validate.ForkJoin";
092    public static final String WF_VALIDATE_FORK_JOIN = "oozie.wf.validate.ForkJoin";
093
094    private Schema schema;
095    private Class<? extends ControlNodeHandler> controlNodeHandler;
096    private Class<? extends DecisionNodeHandler> decisionHandlerClass;
097    private Class<? extends ActionNodeHandler> actionHandlerClass;
098
099    private static enum VisitStatus {
100        VISITING, VISITED
101    }
102
103    /**
104     * We use this to store a node name and its top (eldest) decision parent node name for the forkjoin validation
105     */
106    class NodeAndTopDecisionParent {
107        String node;
108        String topDecisionParent;
109
110        public NodeAndTopDecisionParent(String node, String topDecisionParent) {
111            this.node = node;
112            this.topDecisionParent = topDecisionParent;
113        }
114    }
115
116    private List<String> forkList = new ArrayList<String>();
117    private List<String> joinList = new ArrayList<String>();
118    private StartNodeDef startNode;
119    private List<NodeAndTopDecisionParent> visitedOkNodes = new ArrayList<NodeAndTopDecisionParent>();
120    private List<String> visitedJoinNodes = new ArrayList<String>();
121
122    public LiteWorkflowAppParser(Schema schema,
123                                 Class<? extends ControlNodeHandler> controlNodeHandler,
124                                 Class<? extends DecisionNodeHandler> decisionHandlerClass,
125                                 Class<? extends ActionNodeHandler> actionHandlerClass) throws WorkflowException {
126        this.schema = schema;
127        this.controlNodeHandler = controlNodeHandler;
128        this.decisionHandlerClass = decisionHandlerClass;
129        this.actionHandlerClass = actionHandlerClass;
130    }
131
132    public LiteWorkflowApp validateAndParse(Reader reader, Configuration jobConf) throws WorkflowException {
133        return validateAndParse(reader, jobConf, null);
134    }
135
136    /**
137     * Parse and validate xml to {@link LiteWorkflowApp}
138     *
139     * @param reader
140     * @return LiteWorkflowApp
141     * @throws WorkflowException
142     */
143    public LiteWorkflowApp validateAndParse(Reader reader, Configuration jobConf, Configuration configDefault)
144            throws WorkflowException {
145        try {
146            StringWriter writer = new StringWriter();
147            IOUtils.copyCharStream(reader, writer);
148            String strDef = writer.toString();
149
150            if (schema != null) {
151                Validator validator = schema.newValidator();
152                validator.validate(new StreamSource(new StringReader(strDef)));
153            }
154
155            Element wfDefElement = XmlUtils.parseXml(strDef);
156            ParameterVerifier.verifyParameters(jobConf, wfDefElement);
157            LiteWorkflowApp app = parse(strDef, wfDefElement, configDefault, jobConf);
158            Map<String, VisitStatus> traversed = new HashMap<String, VisitStatus>();
159            traversed.put(app.getNode(StartNodeDef.START).getName(), VisitStatus.VISITING);
160            validate(app, app.getNode(StartNodeDef.START), traversed);
161            //Validate whether fork/join are in pair or not
162            if (jobConf.getBoolean(WF_VALIDATE_FORK_JOIN, true) && Services.get().getConf().getBoolean(VALIDATE_FORK_JOIN, true)) {
163                validateForkJoin(app);
164            }
165            return app;
166        }
167        catch (ParameterVerifierException ex) {
168            throw new WorkflowException(ex);
169        }
170        catch (JDOMException ex) {
171            throw new WorkflowException(ErrorCode.E0700, ex.getMessage(), ex);
172        }
173        catch (SAXException ex) {
174            throw new WorkflowException(ErrorCode.E0701, ex.getMessage(), ex);
175        }
176        catch (IOException ex) {
177            throw new WorkflowException(ErrorCode.E0702, ex.getMessage(), ex);
178        }
179    }
180
181    /**
182     * Validate whether fork/join are in pair or not
183     * @param app LiteWorkflowApp
184     * @throws WorkflowException
185     */
186    private void validateForkJoin(LiteWorkflowApp app) throws WorkflowException {
187        // Make sure the number of forks and joins in wf are equal
188        if (forkList.size() != joinList.size()) {
189            throw new WorkflowException(ErrorCode.E0730);
190        }
191
192        // No need to bother going through all of this if there are no fork/join nodes
193        if (!forkList.isEmpty()) {
194            visitedOkNodes.clear();
195            visitedJoinNodes.clear();
196            validateForkJoin(startNode, app, new LinkedList<String>(), new LinkedList<String>(), new LinkedList<String>(), true,
197                    null);
198        }
199    }
200
201    /*
202     * Recursively walk through the DAG and make sure that all fork paths are valid.
203     * This should be called from validateForkJoin(LiteWorkflowApp app).  It assumes that visitedOkNodes and visitedJoinNodes are
204     * both empty ArrayLists on the first call.
205     *
206     * @param node the current node; use the startNode on the first call
207     * @param app the WorkflowApp
208     * @param forkNodes a stack of the current fork nodes
209     * @param joinNodes a stack of the current join nodes
210     * @param path a stack of the current path
211     * @param okTo false if node (or an ancestor of node) was gotten to via an "error to" transition or via a join node that has
212     * already been visited at least once before
213     * @param topDecisionParent The top (eldest) decision node along the path to this node, or null if there isn't one
214     * @throws WorkflowException
215     */
216    private void validateForkJoin(NodeDef node, LiteWorkflowApp app, Deque<String> forkNodes, Deque<String> joinNodes,
217            Deque<String> path, boolean okTo, String topDecisionParent) throws WorkflowException {
218        if (path.contains(node.getName())) {
219            // cycle
220            throw new WorkflowException(ErrorCode.E0741, node.getName(), Arrays.toString(path.toArray()));
221        }
222        path.push(node.getName());
223
224        // Make sure that we're not revisiting a node (that's not a Kill, Join, or End type) that's been visited before from an
225        // "ok to" transition; if its from an "error to" transition, then its okay to visit it multiple times.  Also, because we
226        // traverse through join nodes multiple times, we have to make sure not to throw an exception here when we're really just
227        // re-walking the same execution path (this is why we need the visitedJoinNodes list used later)
228        if (okTo && !(node instanceof KillNodeDef) && !(node instanceof JoinNodeDef) && !(node instanceof EndNodeDef)) {
229            NodeAndTopDecisionParent natdp = findInVisitedOkNodes(node.getName());
230            if (natdp != null) {
231                // However, if we've visited the node and it's under a decision node, we may be seeing it again and it's only
232                // illegal if that decision node is not the same as what we're seeing now (because during execution we only go
233                // down one path of the decision node, so while we're seeing the node multiple times here, during runtime it will
234                // only be executed once).  Also, this decision node should be the top (eldest) decision node.  As null indicates
235                // that there isn't a decision node, when this happens they must both be null to be valid.  Here is a good example
236                // to visualize a node ("actionX") that has three "ok to" paths to it, but should still be a valid workflow (it may
237                // be easier to see if you draw it):
238                    // decisionA --> {actionX, decisionB}
239                    // decisionB --> {actionX, actionY}
240                    // actionY   --> {actionX}
241                // And, if we visit this node twice under the same decision node in an invalid way, the path cycle checking code
242                // will catch it, so we don't have to worry about that here.
243                if ((natdp.topDecisionParent == null && topDecisionParent == null)
244                     || (natdp.topDecisionParent == null && topDecisionParent != null)
245                     || (natdp.topDecisionParent != null && topDecisionParent == null)
246                     || !natdp.topDecisionParent.equals(topDecisionParent)) {
247                    // If we get here, then we've seen this node before from an "ok to" transition but they don't have the same
248                    // decision node top parent, which means that this node will be executed twice, which is illegal
249                    throw new WorkflowException(ErrorCode.E0743, node.getName());
250                }
251            }
252            else {
253                // If we haven't transitioned to this node before, add it and its top decision parent node
254                visitedOkNodes.add(new NodeAndTopDecisionParent(node.getName(), topDecisionParent));
255            }
256        }
257
258        if (node instanceof StartNodeDef) {
259            String transition = node.getTransitions().get(0);   // start always has only 1 transition
260            NodeDef tranNode = app.getNode(transition);
261            validateForkJoin(tranNode, app, forkNodes, joinNodes, path, okTo, topDecisionParent);
262        }
263        else if (node instanceof ActionNodeDef) {
264            String transition = node.getTransitions().get(0);   // "ok to" transition
265            NodeDef tranNode = app.getNode(transition);
266            validateForkJoin(tranNode, app, forkNodes, joinNodes, path, okTo, topDecisionParent);  // propogate okTo
267            transition = node.getTransitions().get(1);          // "error to" transition
268            tranNode = app.getNode(transition);
269            validateForkJoin(tranNode, app, forkNodes, joinNodes, path, false, topDecisionParent); // use false
270        }
271        else if (node instanceof DecisionNodeDef) {
272            for(String transition : (new HashSet<String>(node.getTransitions()))) {
273                NodeDef tranNode = app.getNode(transition);
274                // if there currently isn't a topDecisionParent (i.e. null), then use this node instead of propagating null
275                String parentDecisionNode = topDecisionParent;
276                if (parentDecisionNode == null) {
277                    parentDecisionNode = node.getName();
278                }
279                validateForkJoin(tranNode, app, forkNodes, joinNodes, path, okTo, parentDecisionNode);
280            }
281        }
282        else if (node instanceof ForkNodeDef) {
283            forkNodes.push(node.getName());
284            List<String> transitionsList = node.getTransitions();
285            HashSet<String> transitionsSet = new HashSet<String>(transitionsList);
286            // Check that a fork doesn't go to the same node more than once
287            if (!transitionsList.isEmpty() && transitionsList.size() != transitionsSet.size()) {
288                // Now we have to figure out which node is the problem and what type of node they are (join and kill are ok)
289                for (int i = 0; i < transitionsList.size(); i++) {
290                    String a = transitionsList.get(i);
291                    NodeDef aNode = app.getNode(a);
292                    if (!(aNode instanceof JoinNodeDef) && !(aNode instanceof KillNodeDef)) {
293                        for (int k = i+1; k < transitionsList.size(); k++) {
294                            String b = transitionsList.get(k);
295                            if (a.equals(b)) {
296                                throw new WorkflowException(ErrorCode.E0744, node.getName(), a);
297                            }
298                        }
299                    }
300                }
301            }
302            for(String transition : transitionsSet) {
303                NodeDef tranNode = app.getNode(transition);
304                validateForkJoin(tranNode, app, forkNodes, joinNodes, path, okTo, topDecisionParent);
305            }
306            forkNodes.pop();
307            if (!joinNodes.isEmpty()) {
308                joinNodes.pop();
309            }
310        }
311        else if (node instanceof JoinNodeDef) {
312            if (forkNodes.isEmpty()) {
313                // no fork for join to match with
314                throw new WorkflowException(ErrorCode.E0742, node.getName());
315            }
316            if (forkNodes.size() > joinNodes.size() && (joinNodes.isEmpty() || !joinNodes.peek().equals(node.getName()))) {
317                joinNodes.push(node.getName());
318            }
319            if (!joinNodes.peek().equals(node.getName())) {
320                // join doesn't match fork
321                throw new WorkflowException(ErrorCode.E0732, forkNodes.peek(), node.getName(), joinNodes.peek());
322            }
323            joinNodes.pop();
324            String currentForkNode = forkNodes.pop();
325            String transition = node.getTransitions().get(0);   // join always has only 1 transition
326            NodeDef tranNode = app.getNode(transition);
327            // If we're already under a situation where okTo is false, use false (propogate it)
328            // Or if we've already visited this join node, use false (because we've already traversed this path before and we don't
329            // want to throw an exception from the check against visitedOkNodes)
330            if (!okTo || visitedJoinNodes.contains(node.getName())) {
331                validateForkJoin(tranNode, app, forkNodes, joinNodes, path, false, topDecisionParent);
332            // Else, use true because this is either the first time we've gone through this join node or okTo was already false
333            } else {
334                visitedJoinNodes.add(node.getName());
335                validateForkJoin(tranNode, app, forkNodes, joinNodes, path, true, topDecisionParent);
336            }
337            forkNodes.push(currentForkNode);
338            joinNodes.push(node.getName());
339        }
340        else if (node instanceof KillNodeDef) {
341            // do nothing
342        }
343        else if (node instanceof EndNodeDef) {
344            if (!forkNodes.isEmpty()) {
345                path.pop();     // = node
346                String parent = path.peek();
347                // can't go to an end node in a fork
348                throw new WorkflowException(ErrorCode.E0737, parent, node.getName());
349            }
350        }
351        else {
352            // invalid node type (shouldn't happen)
353            throw new WorkflowException(ErrorCode.E0740, node.getName());
354        }
355        path.pop();
356    }
357
358    /**
359     * Return a {@link NodeAndTopDecisionParent} whose {@link NodeAndTopDecisionParent#node} is equal to the passed in name, or null
360     * if it isn't in the {@link LiteWorkflowAppParser#visitedOkNodes} list.
361     *
362     * @param name The name to search for
363     * @return a NodeAndTopDecisionParent or null
364     */
365    private NodeAndTopDecisionParent findInVisitedOkNodes(String name) {
366        NodeAndTopDecisionParent natdp = null;
367        for (NodeAndTopDecisionParent v : visitedOkNodes) {
368            if (v.node.equals(name)) {
369                natdp = v;
370                break;
371            }
372        }
373        return natdp;
374    }
375
376    /**
377     * Parse xml to {@link LiteWorkflowApp}
378     *
379     * @param strDef
380     * @param root
381     * @param configDefault
382     * @param jobConf
383     * @return LiteWorkflowApp
384     * @throws WorkflowException
385     */
386    @SuppressWarnings({"unchecked"})
387    private LiteWorkflowApp parse(String strDef, Element root, Configuration configDefault, Configuration jobConf)
388            throws WorkflowException {
389        Namespace ns = root.getNamespace();
390        LiteWorkflowApp def = null;
391        Element global = null;
392        for (Element eNode : (List<Element>) root.getChildren()) {
393            if (eNode.getName().equals(START_E)) {
394                def = new LiteWorkflowApp(root.getAttributeValue(NAME_A), strDef,
395                                          new StartNodeDef(controlNodeHandler, eNode.getAttributeValue(TO_A)));
396            }
397            else {
398                if (eNode.getName().equals(END_E)) {
399                    def.addNode(new EndNodeDef(eNode.getAttributeValue(NAME_A), controlNodeHandler));
400                }
401                else {
402                    if (eNode.getName().equals(KILL_E)) {
403                        def.addNode(new KillNodeDef(eNode.getAttributeValue(NAME_A),
404                                                    eNode.getChildText(KILL_MESSAGE_E, ns), controlNodeHandler));
405                    }
406                    else {
407                        if (eNode.getName().equals(FORK_E)) {
408                            List<String> paths = new ArrayList<String>();
409                            for (Element tran : (List<Element>) eNode.getChildren(FORK_PATH_E, ns)) {
410                                paths.add(tran.getAttributeValue(FORK_START_A));
411                            }
412                            def.addNode(new ForkNodeDef(eNode.getAttributeValue(NAME_A), controlNodeHandler, paths));
413                        }
414                        else {
415                            if (eNode.getName().equals(JOIN_E)) {
416                                def.addNode(new JoinNodeDef(eNode.getAttributeValue(NAME_A), controlNodeHandler,
417                                                            eNode.getAttributeValue(TO_A)));
418                            }
419                            else {
420                                if (eNode.getName().equals(DECISION_E)) {
421                                    Element eSwitch = eNode.getChild(DECISION_SWITCH_E, ns);
422                                    List<String> transitions = new ArrayList<String>();
423                                    for (Element e : (List<Element>) eSwitch.getChildren(DECISION_CASE_E, ns)) {
424                                        transitions.add(e.getAttributeValue(TO_A));
425                                    }
426                                    transitions.add(eSwitch.getChild(DECISION_DEFAULT_E, ns).getAttributeValue(TO_A));
427
428                                    String switchStatement = XmlUtils.prettyPrint(eSwitch).toString();
429                                    def.addNode(new DecisionNodeDef(eNode.getAttributeValue(NAME_A), switchStatement, decisionHandlerClass,
430                                                                    transitions));
431                                }
432                                else {
433                                    if (ACTION_E.equals(eNode.getName())) {
434                                        String[] transitions = new String[2];
435                                        Element eActionConf = null;
436                                        for (Element elem : (List<Element>) eNode.getChildren()) {
437                                            if (ACTION_OK_E.equals(elem.getName())) {
438                                                transitions[0] = elem.getAttributeValue(TO_A);
439                                            }
440                                            else {
441                                                if (ACTION_ERROR_E.equals(elem.getName())) {
442                                                    transitions[1] = elem.getAttributeValue(TO_A);
443                                                }
444                                                else {
445                                                    if (SLA_INFO.equals(elem.getName()) || CREDENTIALS.equals(elem.getName())) {
446                                                        continue;
447                                                    }
448                                                    else {
449                                                        eActionConf = elem;
450                                                        handleGlobal(ns, global, configDefault, elem);
451                                                        }
452                                                }
453                                            }
454                                        }
455
456                                        String credStr = eNode.getAttributeValue(CRED_A);
457                                        String userRetryMaxStr = eNode.getAttributeValue(USER_RETRY_MAX_A);
458                                        String userRetryIntervalStr = eNode.getAttributeValue(USER_RETRY_INTERVAL_A);
459                                        try {
460                                            if (!StringUtils.isEmpty(userRetryMaxStr)) {
461                                                userRetryMaxStr = ELUtils.resolveAppName(userRetryMaxStr, jobConf);
462                                            }
463                                            if (!StringUtils.isEmpty(userRetryIntervalStr)) {
464                                                userRetryIntervalStr = ELUtils.resolveAppName(userRetryIntervalStr,
465                                                        jobConf);
466                                            }
467                                        }
468                                        catch (Exception e) {
469                                            throw new WorkflowException(ErrorCode.E0703, e.getMessage());
470                                        }
471
472                                        String actionConf = XmlUtils.prettyPrint(eActionConf).toString();
473                                        def.addNode(new ActionNodeDef(eNode.getAttributeValue(NAME_A), actionConf, actionHandlerClass,
474                                                                      transitions[0], transitions[1], credStr,
475                                                                      userRetryMaxStr, userRetryIntervalStr));
476                                    }
477                                    else {
478                                        if (SLA_INFO.equals(eNode.getName()) || CREDENTIALS.equals(eNode.getName())) {
479                                            // No operation is required
480                                        }
481                                        else {
482                                            if (eNode.getName().equals(GLOBAL)) {
483                                                global = eNode;
484                                            }
485                                            else {
486                                                if (eNode.getName().equals(PARAMETERS)) {
487                                                    // No operation is required
488                                                }
489                                                else {
490                                                    throw new WorkflowException(ErrorCode.E0703, eNode.getName());
491                                                }
492                                            }
493                                        }
494                                    }
495                                }
496                            }
497                        }
498                    }
499                }
500            }
501        }
502        return def;
503    }
504
505    /**
506     * Validate workflow xml
507     *
508     * @param app
509     * @param node
510     * @param traversed
511     * @throws WorkflowException
512     */
513    private void validate(LiteWorkflowApp app, NodeDef node, Map<String, VisitStatus> traversed) throws WorkflowException {
514        if (node instanceof StartNodeDef) {
515            startNode = (StartNodeDef) node;
516        }
517        else {
518            try {
519                ParamChecker.validateActionName(node.getName());
520            }
521            catch (IllegalArgumentException ex) {
522                throw new WorkflowException(ErrorCode.E0724, ex.getMessage());
523            }
524        }
525        if (node instanceof ActionNodeDef) {
526            try {
527                Element action = XmlUtils.parseXml(node.getConf());
528                boolean supportedAction = Services.get().get(ActionService.class).getExecutor(action.getName()) != null;
529                if (!supportedAction) {
530                    throw new WorkflowException(ErrorCode.E0723, node.getName(), action.getName());
531                }
532            }
533            catch (JDOMException ex) {
534                throw new RuntimeException("It should never happen, " + ex.getMessage(), ex);
535            }
536        }
537
538        if(node instanceof ForkNodeDef){
539            forkList.add(node.getName());
540        }
541
542        if(node instanceof JoinNodeDef){
543            joinList.add(node.getName());
544        }
545
546        if (node instanceof EndNodeDef) {
547            traversed.put(node.getName(), VisitStatus.VISITED);
548            return;
549        }
550        if (node instanceof KillNodeDef) {
551            traversed.put(node.getName(), VisitStatus.VISITED);
552            return;
553        }
554        for (String transition : node.getTransitions()) {
555
556            if (app.getNode(transition) == null) {
557                throw new WorkflowException(ErrorCode.E0708, node.getName(), transition);
558            }
559
560            //check if it is a cycle
561            if (traversed.get(app.getNode(transition).getName()) == VisitStatus.VISITING) {
562                throw new WorkflowException(ErrorCode.E0707, app.getNode(transition).getName());
563            }
564            //ignore validated one
565            if (traversed.get(app.getNode(transition).getName()) == VisitStatus.VISITED) {
566                continue;
567            }
568
569            traversed.put(app.getNode(transition).getName(), VisitStatus.VISITING);
570            validate(app, app.getNode(transition), traversed);
571        }
572        traversed.put(node.getName(), VisitStatus.VISITED);
573    }
574
575    /**
576     * Handle the global section
577     *
578     * @param ns
579     * @param global
580     * @param eActionConf
581     * @throws WorkflowException
582     */
583
584    @SuppressWarnings("unchecked")
585    private void handleGlobal(Namespace ns, Element global, Configuration configDefault, Element eActionConf)
586            throws WorkflowException {
587
588        // Use the action's namespace when getting children of the action (will
589        // be different than ns for extension actions)
590        Namespace actionNs = eActionConf.getNamespace();
591
592        if (global != null) {
593            Element globalJobTracker = global.getChild("job-tracker", ns);
594            Element globalNameNode = global.getChild("name-node", ns);
595            List<Element> globalJobXml = global.getChildren("job-xml", ns);
596            Element globalConfiguration = global.getChild("configuration", ns);
597
598            if (globalJobTracker != null && eActionConf.getChild("job-tracker", actionNs) == null) {
599                Element jobTracker = new Element("job-tracker", actionNs);
600                jobTracker.setText(globalJobTracker.getText());
601                eActionConf.addContent(jobTracker);
602            }
603
604            if (globalNameNode != null && eActionConf.getChild("name-node", actionNs) == null) {
605                Element nameNode = new Element("name-node", actionNs);
606                nameNode.setText(globalNameNode.getText());
607                eActionConf.addContent(nameNode);
608            }
609
610            if (!globalJobXml.isEmpty()) {
611                List<Element> actionJobXml = eActionConf.getChildren("job-xml", actionNs);
612                for(Element jobXml: globalJobXml){
613                    boolean alreadyExists = false;
614                    for(Element actionXml: actionJobXml){
615                        if(jobXml.getText().equals(actionXml.getText())){
616                            alreadyExists = true;
617                            break;
618                        }
619                    }
620
621                    if (!alreadyExists){
622                        Element ejobXml = new Element("job-xml", actionNs);
623                        ejobXml.setText(jobXml.getText());
624                        eActionConf.addContent(ejobXml);
625                    }
626
627                }
628            }
629            try {
630                XConfiguration actionConf = new XConfiguration();
631                if (configDefault != null)
632                    XConfiguration.copy(configDefault, actionConf);
633                if (globalConfiguration != null) {
634                    Configuration globalConf = new XConfiguration(new StringReader(XmlUtils.prettyPrint(
635                            globalConfiguration).toString()));
636                    XConfiguration.copy(globalConf, actionConf);
637                }
638                Element actionConfiguration = eActionConf.getChild("configuration", actionNs);
639                if (actionConfiguration != null) {
640                    //copy and override
641                    XConfiguration.copy(new XConfiguration(new StringReader(XmlUtils.prettyPrint(
642                            actionConfiguration).toString())), actionConf);
643                }
644                int position = eActionConf.indexOf(actionConfiguration);
645                eActionConf.removeContent(actionConfiguration); //replace with enhanced one
646                Element eConfXml = XmlUtils.parseXml(actionConf.toXmlString(false));
647                eConfXml.detach();
648                eConfXml.setNamespace(actionNs);
649                if (position > 0) {
650                    eActionConf.addContent(position, eConfXml);
651                }
652                else {
653                    eActionConf.addContent(eConfXml);
654                }
655            }
656            catch (IOException e) {
657                throw new WorkflowException(ErrorCode.E0700, "Error while processing action conf");
658            }
659            catch (JDOMException e) {
660                throw new WorkflowException(ErrorCode.E0700, "Error while processing action conf");
661            }
662        }
663        else {
664            ActionExecutor ae = Services.get().get(ActionService.class).getExecutor(eActionConf.getName());
665            if (ae == null) {
666                throw new WorkflowException(ErrorCode.E0723, eActionConf.getName(), ActionService.class.getName());
667            }
668            if (ae.requiresNNJT) {
669
670                if (eActionConf.getChild("name-node", actionNs) == null) {
671                    throw new WorkflowException(ErrorCode.E0701, "No name-node defined");
672                }
673                if (eActionConf.getChild("job-tracker", actionNs) == null) {
674                    throw new WorkflowException(ErrorCode.E0701, "No job-tracker defined");
675                }
676            }
677        }
678    }
679
680}