001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.workflow.lite;
020
021import java.io.ByteArrayInputStream;
022import java.io.ByteArrayOutputStream;
023import java.io.DataInput;
024import java.io.DataInputStream;
025import java.io.DataOutput;
026import java.io.DataOutputStream;
027import java.io.IOException;
028import java.io.Reader;
029import java.io.StringReader;
030import java.io.StringWriter;
031import java.util.ArrayList;
032import java.util.List;
033import java.util.zip.Deflater;
034import java.util.zip.DeflaterOutputStream;
035import java.util.zip.Inflater;
036import java.util.zip.InflaterInputStream;
037
038import javax.xml.transform.stream.StreamSource;
039import javax.xml.validation.Schema;
040import javax.xml.validation.Validator;
041
042import org.apache.commons.codec.binary.Base64;
043import org.apache.commons.lang.StringUtils;
044import org.apache.hadoop.conf.Configuration;
045import org.apache.hadoop.io.Writable;
046import org.apache.oozie.ErrorCode;
047import org.apache.oozie.action.ActionExecutor;
048import org.apache.oozie.action.hadoop.FsActionExecutor;
049import org.apache.oozie.action.oozie.SubWorkflowActionExecutor;
050import org.apache.oozie.service.ActionService;
051import org.apache.oozie.service.ConfigurationService;
052import org.apache.oozie.service.Services;
053import org.apache.oozie.util.ELUtils;
054import org.apache.oozie.util.IOUtils;
055import org.apache.oozie.util.ParameterVerifier;
056import org.apache.oozie.util.ParameterVerifierException;
057import org.apache.oozie.util.WritableUtils;
058import org.apache.oozie.util.XConfiguration;
059import org.apache.oozie.util.XmlUtils;
060import org.apache.oozie.workflow.WorkflowException;
061import org.jdom.Element;
062import org.jdom.JDOMException;
063import org.jdom.Namespace;
064import org.xml.sax.SAXException;
065
066/**
067 * Class to parse and validate workflow xml
068 */
069public class LiteWorkflowAppParser {
070
071    private static final String DECISION_E = "decision";
072    private static final String ACTION_E = "action";
073    private static final String END_E = "end";
074    private static final String START_E = "start";
075    private static final String JOIN_E = "join";
076    private static final String FORK_E = "fork";
077    private static final Object KILL_E = "kill";
078
079    private static final String SLA_INFO = "info";
080    private static final String CREDENTIALS = "credentials";
081    private static final String GLOBAL = "global";
082    private static final String PARAMETERS = "parameters";
083
084    private static final String NAME_A = "name";
085    private static final String CRED_A = "cred";
086    private static final String USER_RETRY_MAX_A = "retry-max";
087    private static final String USER_RETRY_INTERVAL_A = "retry-interval";
088    private static final String TO_A = "to";
089    private static final String USER_RETRY_POLICY_A = "retry-policy";
090
091    private static final String FORK_PATH_E = "path";
092    private static final String FORK_START_A = "start";
093
094    private static final String ACTION_OK_E = "ok";
095    private static final String ACTION_ERROR_E = "error";
096
097    private static final String DECISION_SWITCH_E = "switch";
098    private static final String DECISION_CASE_E = "case";
099    private static final String DECISION_DEFAULT_E = "default";
100
101    private static final String SUBWORKFLOW_E = "sub-workflow";
102
103    private static final String KILL_MESSAGE_E = "message";
104    public static final String VALIDATE_FORK_JOIN = "oozie.validate.ForkJoin";
105    public static final String WF_VALIDATE_FORK_JOIN = "oozie.wf.validate.ForkJoin";
106
107    public static final String DEFAULT_NAME_NODE = "oozie.actions.default.name-node";
108    public static final String DEFAULT_JOB_TRACKER = "oozie.actions.default.job-tracker";
109    public static final String OOZIE_GLOBAL = "oozie.wf.globalconf";
110
111    private static final String JOB_TRACKER = "job-tracker";
112    private static final String NAME_NODE = "name-node";
113    private static final String JOB_XML = "job-xml";
114    private static final String CONFIGURATION = "configuration";
115
116    private Schema schema;
117    private Class<? extends ControlNodeHandler> controlNodeHandler;
118    private Class<? extends DecisionNodeHandler> decisionHandlerClass;
119    private Class<? extends ActionNodeHandler> actionHandlerClass;
120
121    private String defaultNameNode;
122    private String defaultJobTracker;
123
124    public LiteWorkflowAppParser(Schema schema,
125                                 Class<? extends ControlNodeHandler> controlNodeHandler,
126                                 Class<? extends DecisionNodeHandler> decisionHandlerClass,
127                                 Class<? extends ActionNodeHandler> actionHandlerClass) throws WorkflowException {
128        this.schema = schema;
129        this.controlNodeHandler = controlNodeHandler;
130        this.decisionHandlerClass = decisionHandlerClass;
131        this.actionHandlerClass = actionHandlerClass;
132
133        defaultNameNode = ConfigurationService.get(DEFAULT_NAME_NODE);
134        if (defaultNameNode != null) {
135            defaultNameNode = defaultNameNode.trim();
136            if (defaultNameNode.isEmpty()) {
137                defaultNameNode = null;
138            }
139        }
140        defaultJobTracker = ConfigurationService.get(DEFAULT_JOB_TRACKER);
141        if (defaultJobTracker != null) {
142            defaultJobTracker = defaultJobTracker.trim();
143            if (defaultJobTracker.isEmpty()) {
144                defaultJobTracker = null;
145            }
146        }
147    }
148
149    public LiteWorkflowApp validateAndParse(Reader reader, Configuration jobConf) throws WorkflowException {
150        return validateAndParse(reader, jobConf, null);
151    }
152
153    /**
154     * Parse and validate xml to {@link LiteWorkflowApp}
155     *
156     * @param reader
157     * @return LiteWorkflowApp
158     * @throws WorkflowException
159     */
160    public LiteWorkflowApp validateAndParse(Reader reader, Configuration jobConf, Configuration configDefault)
161            throws WorkflowException {
162        try {
163            StringWriter writer = new StringWriter();
164            IOUtils.copyCharStream(reader, writer);
165            String strDef = writer.toString();
166
167            if (schema != null) {
168                Validator validator = schema.newValidator();
169                validator.validate(new StreamSource(new StringReader(strDef)));
170            }
171
172            Element wfDefElement = XmlUtils.parseXml(strDef);
173            ParameterVerifier.verifyParameters(jobConf, wfDefElement);
174            LiteWorkflowApp app = parse(strDef, wfDefElement, configDefault, jobConf);
175
176
177            boolean validateForkJoin = false;
178
179            if (jobConf.getBoolean(WF_VALIDATE_FORK_JOIN, true)
180                    && ConfigurationService.getBoolean(VALIDATE_FORK_JOIN)) {
181                validateForkJoin = true;
182            }
183
184            LiteWorkflowValidator validator = new LiteWorkflowValidator();
185            validator.validateWorkflow(app, validateForkJoin);
186
187            return app;
188        }
189        catch (ParameterVerifierException ex) {
190            throw new WorkflowException(ex);
191        }
192        catch (JDOMException ex) {
193            throw new WorkflowException(ErrorCode.E0700, ex.getMessage(), ex);
194        }
195        catch (SAXException ex) {
196            throw new WorkflowException(ErrorCode.E0701, ex.getMessage(), ex);
197        }
198        catch (IOException ex) {
199            throw new WorkflowException(ErrorCode.E0702, ex.getMessage(), ex);
200        }
201    }
202
203    /**
204     * Parse xml to {@link LiteWorkflowApp}
205     *
206     * @param strDef
207     * @param root
208     * @param configDefault
209     * @param jobConf
210     * @return LiteWorkflowApp
211     * @throws WorkflowException
212     */
213    @SuppressWarnings({"unchecked"})
214    private LiteWorkflowApp parse(String strDef, Element root, Configuration configDefault, Configuration jobConf)
215            throws WorkflowException {
216        Namespace ns = root.getNamespace();
217        LiteWorkflowApp def = null;
218        GlobalSectionData gData = jobConf.get(OOZIE_GLOBAL) == null ?
219                null : getGlobalFromString(jobConf.get(OOZIE_GLOBAL));
220        boolean serializedGlobalConf = false;
221        for (Element eNode : (List<Element>) root.getChildren()) {
222            if (eNode.getName().equals(START_E)) {
223                def = new LiteWorkflowApp(root.getAttributeValue(NAME_A), strDef,
224                                          new StartNodeDef(controlNodeHandler, eNode.getAttributeValue(TO_A)));
225            } else if (eNode.getName().equals(END_E)) {
226                def.addNode(new EndNodeDef(eNode.getAttributeValue(NAME_A), controlNodeHandler));
227            } else if (eNode.getName().equals(KILL_E)) {
228                def.addNode(new KillNodeDef(eNode.getAttributeValue(NAME_A),
229                                            eNode.getChildText(KILL_MESSAGE_E, ns), controlNodeHandler));
230            } else if (eNode.getName().equals(FORK_E)) {
231                List<String> paths = new ArrayList<String>();
232                for (Element tran : (List<Element>) eNode.getChildren(FORK_PATH_E, ns)) {
233                    paths.add(tran.getAttributeValue(FORK_START_A));
234                }
235                def.addNode(new ForkNodeDef(eNode.getAttributeValue(NAME_A), controlNodeHandler, paths));
236            } else if (eNode.getName().equals(JOIN_E)) {
237                def.addNode(new JoinNodeDef(eNode.getAttributeValue(NAME_A), controlNodeHandler, eNode.getAttributeValue(TO_A)));
238            } else if (eNode.getName().equals(DECISION_E)) {
239                Element eSwitch = eNode.getChild(DECISION_SWITCH_E, ns);
240                List<String> transitions = new ArrayList<String>();
241                for (Element e : (List<Element>) eSwitch.getChildren(DECISION_CASE_E, ns)) {
242                    transitions.add(e.getAttributeValue(TO_A));
243                }
244                transitions.add(eSwitch.getChild(DECISION_DEFAULT_E, ns).getAttributeValue(TO_A));
245
246                String switchStatement = XmlUtils.prettyPrint(eSwitch).toString();
247                def.addNode(new DecisionNodeDef(eNode.getAttributeValue(NAME_A), switchStatement, decisionHandlerClass,
248                                                transitions));
249            } else if (ACTION_E.equals(eNode.getName())) {
250                String[] transitions = new String[2];
251                Element eActionConf = null;
252                for (Element elem : (List<Element>) eNode.getChildren()) {
253                    if (ACTION_OK_E.equals(elem.getName())) {
254                        transitions[0] = elem.getAttributeValue(TO_A);
255                    } else if (ACTION_ERROR_E.equals(elem.getName())) {
256                        transitions[1] = elem.getAttributeValue(TO_A);
257                    } else if (SLA_INFO.equals(elem.getName()) || CREDENTIALS.equals(elem.getName())) {
258                        continue;
259                    } else {
260                        if (!serializedGlobalConf && elem.getName().equals(SubWorkflowActionExecutor.ACTION_TYPE) &&
261                                elem.getChild(("propagate-configuration"), ns) != null && gData != null) {
262                            serializedGlobalConf = true;
263                            jobConf.set(OOZIE_GLOBAL, getGlobalString(gData));
264                        }
265                        eActionConf = elem;
266                        if (SUBWORKFLOW_E.equals(elem.getName())) {
267                            handleDefaultsAndGlobal(gData, null, elem);
268                        }
269                        else {
270                            handleDefaultsAndGlobal(gData, configDefault, elem);
271                        }
272                    }
273                }
274
275                String credStr = eNode.getAttributeValue(CRED_A);
276                String userRetryMaxStr = eNode.getAttributeValue(USER_RETRY_MAX_A);
277                String userRetryIntervalStr = eNode.getAttributeValue(USER_RETRY_INTERVAL_A);
278                String userRetryPolicyStr = eNode.getAttributeValue(USER_RETRY_POLICY_A);
279                try {
280                    if (!StringUtils.isEmpty(userRetryMaxStr)) {
281                        userRetryMaxStr = ELUtils.resolveAppName(userRetryMaxStr, jobConf);
282                    }
283                    if (!StringUtils.isEmpty(userRetryIntervalStr)) {
284                        userRetryIntervalStr = ELUtils.resolveAppName(userRetryIntervalStr, jobConf);
285                    }
286                    if (!StringUtils.isEmpty(userRetryPolicyStr)) {
287                        userRetryPolicyStr = ELUtils.resolveAppName(userRetryPolicyStr, jobConf);
288                    }
289                }
290                catch (Exception e) {
291                    throw new WorkflowException(ErrorCode.E0703, e.getMessage());
292                }
293
294                String actionConf = XmlUtils.prettyPrint(eActionConf).toString();
295                def.addNode(new ActionNodeDef(eNode.getAttributeValue(NAME_A), actionConf, actionHandlerClass,
296                        transitions[0], transitions[1], credStr, userRetryMaxStr, userRetryIntervalStr,
297                        userRetryPolicyStr));
298            } else if (SLA_INFO.equals(eNode.getName()) || CREDENTIALS.equals(eNode.getName())) {
299                // No operation is required
300            } else if (eNode.getName().equals(GLOBAL)) {
301                if(jobConf.get(OOZIE_GLOBAL) != null) {
302                    gData = getGlobalFromString(jobConf.get(OOZIE_GLOBAL));
303                    handleDefaultsAndGlobal(gData, null, eNode);
304                }
305                gData = parseGlobalSection(ns, eNode);
306            } else if (eNode.getName().equals(PARAMETERS)) {
307                // No operation is required
308            } else {
309                throw new WorkflowException(ErrorCode.E0703, eNode.getName());
310            }
311        }
312        return def;
313    }
314
315    /**
316     * Read the GlobalSectionData from Base64 string.
317     * @param globalStr
318     * @return GlobalSectionData
319     * @throws WorkflowException
320     */
321    private GlobalSectionData getGlobalFromString(String globalStr) throws WorkflowException {
322        GlobalSectionData globalSectionData = new GlobalSectionData();
323        try {
324            byte[] data = Base64.decodeBase64(globalStr);
325            Inflater inflater = new Inflater();
326            DataInputStream ois = new DataInputStream(new InflaterInputStream(new ByteArrayInputStream(data), inflater));
327            globalSectionData.readFields(ois);
328            ois.close();
329        } catch (Exception ex) {
330            throw new WorkflowException(ErrorCode.E0700, "Error while processing global section conf");
331        }
332        return globalSectionData;
333    }
334
335
336    /**
337     * Write the GlobalSectionData to a Base64 string.
338     * @param globalSectionData
339     * @return String
340     * @throws WorkflowException
341     */
342    private String getGlobalString(GlobalSectionData globalSectionData) throws WorkflowException {
343        ByteArrayOutputStream baos = new ByteArrayOutputStream();
344        DataOutputStream oos = null;
345        try {
346            Deflater def = new Deflater();
347            oos = new DataOutputStream(new DeflaterOutputStream(baos, def));
348            globalSectionData.write(oos);
349            oos.close();
350        } catch (IOException e) {
351            throw new WorkflowException(ErrorCode.E0700, "Error while processing global section conf");
352        }
353        return Base64.encodeBase64String(baos.toByteArray());
354    }
355
356    private void addChildElement(Element parent, Namespace ns, String childName, String childValue) {
357        Element child = new Element(childName, ns);
358        child.setText(childValue);
359        parent.addContent(child);
360    }
361
362    private class GlobalSectionData implements Writable {
363        String jobTracker;
364        String nameNode;
365        List<String> jobXmls;
366        Configuration conf;
367
368        public GlobalSectionData() {
369        }
370
371        public GlobalSectionData(String jobTracker, String nameNode, List<String> jobXmls, Configuration conf) {
372            this.jobTracker = jobTracker;
373            this.nameNode = nameNode;
374            this.jobXmls = jobXmls;
375            this.conf = conf;
376        }
377
378        @Override
379        public void write(DataOutput dataOutput) throws IOException {
380            WritableUtils.writeStr(dataOutput, jobTracker);
381            WritableUtils.writeStr(dataOutput, nameNode);
382
383            if(jobXmls != null && !jobXmls.isEmpty()) {
384                dataOutput.writeInt(jobXmls.size());
385                for (String content : jobXmls) {
386                    WritableUtils.writeStr(dataOutput, content);
387                }
388            } else {
389                dataOutput.writeInt(0);
390            }
391            if(conf != null) {
392                WritableUtils.writeStr(dataOutput, XmlUtils.prettyPrint(conf).toString());
393            } else {
394                WritableUtils.writeStr(dataOutput, null);
395            }
396        }
397
398        @Override
399        public void readFields(DataInput dataInput) throws IOException {
400            jobTracker = WritableUtils.readStr(dataInput);
401            nameNode = WritableUtils.readStr(dataInput);
402            int length = dataInput.readInt();
403            if (length > 0) {
404                jobXmls = new ArrayList<String>();
405                for (int i = 0; i < length; i++) {
406                    jobXmls.add(WritableUtils.readStr(dataInput));
407                }
408            }
409            String confString = WritableUtils.readStr(dataInput);
410            if(confString != null) {
411                conf = new XConfiguration(new StringReader(confString));
412            }
413        }
414    }
415
416    private GlobalSectionData parseGlobalSection(Namespace ns, Element global) throws WorkflowException {
417        GlobalSectionData gData = null;
418        if (global != null) {
419            String globalJobTracker = null;
420            Element globalJobTrackerElement = global.getChild(JOB_TRACKER, ns);
421            if (globalJobTrackerElement != null) {
422                globalJobTracker = globalJobTrackerElement.getValue();
423            }
424
425            String globalNameNode = null;
426            Element globalNameNodeElement = global.getChild(NAME_NODE, ns);
427            if (globalNameNodeElement != null) {
428                globalNameNode = globalNameNodeElement.getValue();
429            }
430
431            List<String> globalJobXmls = null;
432            @SuppressWarnings("unchecked")
433            List<Element> globalJobXmlElements = global.getChildren(JOB_XML, ns);
434            if (!globalJobXmlElements.isEmpty()) {
435                globalJobXmls = new ArrayList<String>(globalJobXmlElements.size());
436                for(Element jobXmlElement: globalJobXmlElements) {
437                    globalJobXmls.add(jobXmlElement.getText());
438                }
439            }
440
441            Configuration globalConf = null;
442            Element globalConfigurationElement = global.getChild(CONFIGURATION, ns);
443            if (globalConfigurationElement != null) {
444                try {
445                    globalConf = new XConfiguration(new StringReader(XmlUtils.prettyPrint(globalConfigurationElement).toString()));
446                } catch (IOException ioe) {
447                    throw new WorkflowException(ErrorCode.E0700, "Error while processing global section conf");
448                }
449            }
450            gData = new GlobalSectionData(globalJobTracker, globalNameNode, globalJobXmls, globalConf);
451        }
452        return gData;
453    }
454
455    private void handleDefaultsAndGlobal(GlobalSectionData gData, Configuration configDefault, Element actionElement)
456            throws WorkflowException {
457
458        ActionExecutor ae = Services.get().get(ActionService.class).getExecutor(actionElement.getName());
459        if (ae == null && !GLOBAL.equals(actionElement.getName())) {
460            throw new WorkflowException(ErrorCode.E0723, actionElement.getName(), ActionService.class.getName());
461        }
462
463        Namespace actionNs = actionElement.getNamespace();
464
465        // If this is the global section or ActionExecutor.requiresNameNodeJobTracker() returns true, we parse the action's
466        // <name-node> and <job-tracker> fields.  If those aren't defined, we take them from the <global> section.  If those
467        // aren't defined, we take them from the oozie-site defaults.  If those aren't defined, we throw a WorkflowException.
468        // However, for the SubWorkflow and FS Actions, as well as the <global> section, we don't throw the WorkflowException.
469        // Also, we only parse the NN (not the JT) for the FS Action.
470        if (SubWorkflowActionExecutor.ACTION_TYPE.equals(actionElement.getName()) ||
471                FsActionExecutor.ACTION_TYPE.equals(actionElement.getName()) ||
472                GLOBAL.equals(actionElement.getName()) || ae.requiresNameNodeJobTracker()) {
473            if (actionElement.getChild(NAME_NODE, actionNs) == null) {
474                if (gData != null && gData.nameNode != null) {
475                    addChildElement(actionElement, actionNs, NAME_NODE, gData.nameNode);
476                } else if (defaultNameNode != null) {
477                    addChildElement(actionElement, actionNs, NAME_NODE, defaultNameNode);
478                } else if (!(SubWorkflowActionExecutor.ACTION_TYPE.equals(actionElement.getName()) ||
479                        FsActionExecutor.ACTION_TYPE.equals(actionElement.getName()) ||
480                        GLOBAL.equals(actionElement.getName()))) {
481                    throw new WorkflowException(ErrorCode.E0701, "No " + NAME_NODE + " defined");
482                }
483            }
484            if (actionElement.getChild(JOB_TRACKER, actionNs) == null &&
485                    !FsActionExecutor.ACTION_TYPE.equals(actionElement.getName())) {
486                if (gData != null && gData.jobTracker != null) {
487                    addChildElement(actionElement, actionNs, JOB_TRACKER, gData.jobTracker);
488                } else if (defaultJobTracker != null) {
489                    addChildElement(actionElement, actionNs, JOB_TRACKER, defaultJobTracker);
490                } else if (!(SubWorkflowActionExecutor.ACTION_TYPE.equals(actionElement.getName()) ||
491                        GLOBAL.equals(actionElement.getName()))) {
492                    throw new WorkflowException(ErrorCode.E0701, "No " + JOB_TRACKER + " defined");
493                }
494            }
495        }
496
497        // If this is the global section or ActionExecutor.supportsConfigurationJobXML() returns true, we parse the action's
498        // <configuration> and <job-xml> fields.  We also merge this with those from the <global> section, if given.  If none are
499        // defined, empty values are placed.  Exceptions are thrown if there's an error parsing, but not if they're not given.
500        if ( GLOBAL.equals(actionElement.getName()) || ae.supportsConfigurationJobXML()) {
501            @SuppressWarnings("unchecked")
502            List<Element> actionJobXmls = actionElement.getChildren(JOB_XML, actionNs);
503            if (gData != null && gData.jobXmls != null) {
504                for(String gJobXml : gData.jobXmls) {
505                    boolean alreadyExists = false;
506                    for (Element actionXml : actionJobXmls) {
507                        if (gJobXml.equals(actionXml.getText())) {
508                            alreadyExists = true;
509                            break;
510                        }
511                    }
512                    if (!alreadyExists) {
513                        Element ejobXml = new Element(JOB_XML, actionNs);
514                        ejobXml.setText(gJobXml);
515                        actionElement.addContent(ejobXml);
516                    }
517                }
518            }
519
520            try {
521                XConfiguration actionConf = new XConfiguration();
522                if (configDefault != null)
523                    XConfiguration.copy(configDefault, actionConf);
524                if (gData != null && gData.conf != null) {
525                    XConfiguration.copy(gData.conf, actionConf);
526                }
527                Element actionConfiguration = actionElement.getChild(CONFIGURATION, actionNs);
528                if (actionConfiguration != null) {
529                    //copy and override
530                    XConfiguration.copy(new XConfiguration(new StringReader(XmlUtils.prettyPrint(actionConfiguration).toString())),
531                            actionConf);
532                }
533                int position = actionElement.indexOf(actionConfiguration);
534                actionElement.removeContent(actionConfiguration); //replace with enhanced one
535                Element eConfXml = XmlUtils.parseXml(actionConf.toXmlString(false));
536                eConfXml.detach();
537                eConfXml.setNamespace(actionNs);
538                if (position > 0) {
539                    actionElement.addContent(position, eConfXml);
540                }
541                else {
542                    actionElement.addContent(eConfXml);
543                }
544            }
545            catch (IOException e) {
546                throw new WorkflowException(ErrorCode.E0700, "Error while processing action conf");
547            }
548            catch (JDOMException e) {
549                throw new WorkflowException(ErrorCode.E0700, "Error while processing action conf");
550            }
551        }
552    }
553}