001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.command.coord;
020
021import java.io.IOException;
022import java.io.InputStreamReader;
023import java.io.Reader;
024import java.io.StringReader;
025import java.io.StringWriter;
026import java.net.URI;
027import java.net.URISyntaxException;
028import java.util.ArrayList;
029import java.util.Calendar;
030import java.util.Date;
031import java.util.HashMap;
032import java.util.HashSet;
033import java.util.Iterator;
034import java.util.List;
035import java.util.Map;
036import java.util.Set;
037import java.util.TreeSet;
038
039import javax.xml.transform.stream.StreamSource;
040import javax.xml.validation.Validator;
041
042import org.apache.hadoop.conf.Configuration;
043import org.apache.hadoop.fs.FileSystem;
044import org.apache.hadoop.fs.Path;
045import org.apache.oozie.CoordinatorJobBean;
046import org.apache.oozie.ErrorCode;
047import org.apache.oozie.client.CoordinatorJob;
048import org.apache.oozie.client.Job;
049import org.apache.oozie.client.OozieClient;
050import org.apache.oozie.client.CoordinatorJob.Execution;
051import org.apache.oozie.command.CommandException;
052import org.apache.oozie.command.SubmitTransitionXCommand;
053import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
054import org.apache.oozie.coord.CoordELEvaluator;
055import org.apache.oozie.coord.CoordELFunctions;
056import org.apache.oozie.coord.CoordUtils;
057import org.apache.oozie.coord.CoordinatorJobException;
058import org.apache.oozie.coord.TimeUnit;
059import org.apache.oozie.coord.input.logic.CoordInputLogicEvaluator;
060import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
061import org.apache.oozie.executor.jpa.JPAExecutorException;
062import org.apache.oozie.service.CoordMaterializeTriggerService;
063import org.apache.oozie.service.ConfigurationService;
064import org.apache.oozie.service.HadoopAccessorException;
065import org.apache.oozie.service.HadoopAccessorService;
066import org.apache.oozie.service.JPAService;
067import org.apache.oozie.service.SchemaService;
068import org.apache.oozie.service.Service;
069import org.apache.oozie.service.Services;
070import org.apache.oozie.service.UUIDService;
071import org.apache.oozie.service.SchemaService.SchemaName;
072import org.apache.oozie.service.UUIDService.ApplicationType;
073import org.apache.oozie.util.ConfigUtils;
074import org.apache.oozie.util.DateUtils;
075import org.apache.oozie.util.ELEvaluator;
076import org.apache.oozie.util.ELUtils;
077import org.apache.oozie.util.IOUtils;
078import org.apache.oozie.util.InstrumentUtils;
079import org.apache.oozie.util.LogUtils;
080import org.apache.oozie.util.ParamChecker;
081import org.apache.oozie.util.ParameterVerifier;
082import org.apache.oozie.util.ParameterVerifierException;
083import org.apache.oozie.util.PropertiesUtils;
084import org.apache.oozie.util.XConfiguration;
085import org.apache.oozie.util.XmlUtils;
086import org.jdom.Attribute;
087import org.jdom.Element;
088import org.jdom.JDOMException;
089import org.jdom.Namespace;
090import org.xml.sax.SAXException;
091
092/**
093 * This class provides the functionalities to resolve a coordinator job XML and write the job information into a DB
094 * table.
095 * <p>
096 * Specifically it performs the following functions: 1. Resolve all the variables or properties using job
097 * configurations. 2. Insert all datasets definition as part of the &lt;data-in&gt; and &lt;data-out&gt; tags. 3. Validate the XML
098 * at runtime.
099 */
100public class CoordSubmitXCommand extends SubmitTransitionXCommand {
101
102    protected Configuration conf;
103    protected final String bundleId;
104    protected final String coordName;
105    protected boolean dryrun;
106    protected JPAService jpaService = null;
107    private CoordinatorJob.Status prevStatus = CoordinatorJob.Status.PREP;
108
109    public static final String CONFIG_DEFAULT = "coord-config-default.xml";
110    public static final String COORDINATOR_XML_FILE = "coordinator.xml";
111    public final String COORD_INPUT_EVENTS ="input-events";
112    public final String COORD_OUTPUT_EVENTS = "output-events";
113    public final String COORD_INPUT_EVENTS_DATA_IN ="data-in";
114    public final String COORD_OUTPUT_EVENTS_DATA_OUT = "data-out";
115
116    private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>();
117    private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>();
118
119    protected CoordinatorJobBean coordJob = null;
120    /**
121     * Default timeout for normal jobs, in minutes, after which coordinator input check will timeout
122     */
123    public static final String CONF_DEFAULT_TIMEOUT_NORMAL = Service.CONF_PREFIX + "coord.normal.default.timeout";
124
125    public static final String CONF_DEFAULT_CONCURRENCY = Service.CONF_PREFIX + "coord.default.concurrency";
126
127    public static final String CONF_DEFAULT_THROTTLE = Service.CONF_PREFIX + "coord.default.throttle";
128
129    public static final String CONF_MAT_THROTTLING_FACTOR = Service.CONF_PREFIX
130            + "coord.materialization.throttling.factor";
131
132    /**
133     * Default MAX timeout in minutes, after which coordinator input check will timeout
134     */
135    public static final String CONF_DEFAULT_MAX_TIMEOUT = Service.CONF_PREFIX + "coord.default.max.timeout";
136
137    public static final String CONF_QUEUE_SIZE = Service.CONF_PREFIX + "CallableQueueService.queue.size";
138
139    public static final String CONF_CHECK_MAX_FREQUENCY = Service.CONF_PREFIX + "coord.check.maximum.frequency";
140
141    private ELEvaluator evalFreq = null;
142    private ELEvaluator evalNofuncs = null;
143    private ELEvaluator evalData = null;
144    private ELEvaluator evalInst = null;
145    private ELEvaluator evalAction = null;
146    private ELEvaluator evalSla = null;
147    private ELEvaluator evalTimeout = null;
148    private ELEvaluator evalInitialInstance = null;
149
150    static {
151        String[] badUserProps = { PropertiesUtils.YEAR, PropertiesUtils.MONTH, PropertiesUtils.DAY,
152                PropertiesUtils.HOUR, PropertiesUtils.MINUTE, PropertiesUtils.DAYS, PropertiesUtils.HOURS,
153                PropertiesUtils.MINUTES, PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB,
154                PropertiesUtils.TB, PropertiesUtils.PB, PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN,
155                PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN, PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS };
156        PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES);
157
158        String[] badDefaultProps = { PropertiesUtils.HADOOP_USER};
159        PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES);
160        PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES);
161    }
162
163    /**
164     * Constructor to create the Coordinator Submit Command.
165     *
166     * @param conf : Configuration for Coordinator job
167     */
168    public CoordSubmitXCommand(Configuration conf) {
169        super("coord_submit", "coord_submit", 1);
170        this.conf = ParamChecker.notNull(conf, "conf");
171        this.bundleId = null;
172        this.coordName = null;
173    }
174
175    /**
176     * Constructor to create the Coordinator Submit Command by bundle job.
177     *
178     * @param conf : Configuration for Coordinator job
179     * @param bundleId : bundle id
180     * @param coordName : coord name
181     */
182    protected CoordSubmitXCommand(Configuration conf, String bundleId, String coordName) {
183        super("coord_submit", "coord_submit", 1);
184        this.conf = ParamChecker.notNull(conf, "conf");
185        this.bundleId = ParamChecker.notEmpty(bundleId, "bundleId");
186        this.coordName = ParamChecker.notEmpty(coordName, "coordName");
187    }
188
189    /**
190     * Constructor to create the Coordinator Submit Command.
191     *
192     * @param dryrun : if dryrun
193     * @param conf : Configuration for Coordinator job
194     */
195    public CoordSubmitXCommand(boolean dryrun, Configuration conf) {
196        this(conf);
197        this.dryrun = dryrun;
198    }
199
200    /* (non-Javadoc)
201     * @see org.apache.oozie.command.XCommand#execute()
202     */
203    @Override
204    protected String submit() throws CommandException {
205        LOG.info("STARTED Coordinator Submit");
206        String jobId = submitJob();
207        LOG.info("ENDED Coordinator Submit jobId=" + jobId);
208        return jobId;
209    }
210
211    protected String submitJob() throws CommandException {
212        String jobId = null;
213        InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
214
215        boolean exceptionOccured = false;
216        try {
217            mergeDefaultConfig();
218
219            String appXml = readAndValidateXml();
220            coordJob.setOrigJobXml(appXml);
221            LOG.debug("jobXml after initial validation " + XmlUtils.prettyPrint(appXml).toString());
222
223            Element eXml = XmlUtils.parseXml(appXml);
224
225            String appNamespace = readAppNamespace(eXml);
226            coordJob.setAppNamespace(appNamespace);
227
228            ParameterVerifier.verifyParameters(conf, eXml);
229
230            appXml = XmlUtils.removeComments(appXml);
231            initEvaluators();
232            Element eJob = basicResolveAndIncludeDS(appXml, conf, coordJob);
233
234            validateCoordinatorJob();
235
236            // checking if the coordinator application data input/output events
237            // specify multiple data instance values in erroneous manner
238            checkMultipleTimeInstances(eJob, COORD_INPUT_EVENTS, COORD_INPUT_EVENTS_DATA_IN);
239            checkMultipleTimeInstances(eJob, COORD_OUTPUT_EVENTS, COORD_OUTPUT_EVENTS_DATA_OUT);
240
241            LOG.debug("jobXml after all validation " + XmlUtils.prettyPrint(eJob).toString());
242
243            jobId = storeToDB(appXml, eJob, coordJob);
244            // log job info for coordinator job
245            LogUtils.setLogInfo(coordJob);
246
247            if (!dryrun) {
248                queueMaterializeTransitionXCommand(jobId);
249            }
250            else {
251                return getDryRun(coordJob);
252            }
253        }
254        catch (JDOMException jex) {
255            exceptionOccured = true;
256            LOG.warn("ERROR: ", jex);
257            throw new CommandException(ErrorCode.E0700, jex.getMessage(), jex);
258        }
259        catch (CoordinatorJobException cex) {
260            exceptionOccured = true;
261            LOG.warn("ERROR:  ", cex);
262            throw new CommandException(cex);
263        }
264        catch (ParameterVerifierException pex) {
265            exceptionOccured = true;
266            LOG.warn("ERROR: ", pex);
267            throw new CommandException(pex);
268        }
269        catch (IllegalArgumentException iex) {
270            exceptionOccured = true;
271            LOG.warn("ERROR:  ", iex);
272            throw new CommandException(ErrorCode.E1003, iex.getMessage(), iex);
273        }
274        catch (Exception ex) {
275            exceptionOccured = true;
276            LOG.warn("ERROR:  ", ex);
277            throw new CommandException(ErrorCode.E0803, ex.getMessage(), ex);
278        }
279        finally {
280            if (exceptionOccured) {
281                if (coordJob.getId() == null || coordJob.getId().equalsIgnoreCase("")) {
282                    coordJob.setStatus(CoordinatorJob.Status.FAILED);
283                    coordJob.resetPending();
284                }
285            }
286        }
287        return jobId;
288    }
289
290    /**
291     * Gets the dryrun output.
292     *
293     * @param coordJob the coordinatorJobBean
294     * @return the dry run
295     * @throws Exception the exception
296     */
297    protected String getDryRun(CoordinatorJobBean coordJob) throws Exception{
298        int materializationWindow = ConfigurationService
299                .getInt(CoordMaterializeTriggerService.CONF_MATERIALIZATION_WINDOW);
300        Date startTime = coordJob.getStartTime();
301        long startTimeMilli = startTime.getTime();
302        long endTimeMilli = startTimeMilli + (materializationWindow * 1000);
303        Date jobEndTime = coordJob.getEndTime();
304        Date endTime = new Date(endTimeMilli);
305        if (endTime.compareTo(jobEndTime) > 0) {
306            endTime = jobEndTime;
307        }
308        String jobId = coordJob.getId();
309        LOG.info("[" + jobId + "]: Update status to RUNNING");
310        coordJob.setStatus(Job.Status.RUNNING);
311        coordJob.setPending();
312        Configuration jobConf = null;
313        try {
314            jobConf = new XConfiguration(new StringReader(coordJob.getConf()));
315        }
316        catch (IOException e1) {
317            LOG.warn("Configuration parse error. read from DB :" + coordJob.getConf(), e1);
318        }
319        String action = new CoordMaterializeTransitionXCommand(coordJob, materializationWindow, startTime,
320                endTime).materializeActions(true);
321        String output = coordJob.getJobXml() + System.getProperty("line.separator")
322        + "***actions for instance***" + action;
323        return output;
324    }
325
326    /**
327     * Queue MaterializeTransitionXCommand
328     */
329    protected void queueMaterializeTransitionXCommand(String jobId) {
330        int materializationWindow = ConfigurationService
331                .getInt(CoordMaterializeTriggerService.CONF_MATERIALIZATION_WINDOW);
332        queue(new CoordMaterializeTransitionXCommand(jobId, materializationWindow), 100);
333    }
334
335    /**
336     * Method that validates values in the definition for correctness. Placeholder to add more.
337     */
338    private void validateCoordinatorJob() throws Exception {
339        // check if startTime < endTime
340        if (!coordJob.getStartTime().before(coordJob.getEndTime())) {
341            throw new IllegalArgumentException("Coordinator Start Time must be earlier than End Time.");
342        }
343
344        try {
345            // Check if a coord job with cron frequency will materialize actions
346            int freq = Integer.parseInt(coordJob.getFrequency());
347
348            // Check if the frequency is faster than 5 min if enabled
349            if (ConfigurationService.getBoolean(CONF_CHECK_MAX_FREQUENCY)) {
350                CoordinatorJob.Timeunit unit = coordJob.getTimeUnit();
351                if (freq == 0 || (freq < 5 && unit == CoordinatorJob.Timeunit.MINUTE)) {
352                    throw new IllegalArgumentException("Coordinator job with frequency [" + freq +
353                            "] minutes is faster than allowed maximum of 5 minutes ("
354                            + CONF_CHECK_MAX_FREQUENCY + " is set to true)");
355                }
356            }
357        } catch (NumberFormatException e) {
358            Date start = coordJob.getStartTime();
359            Calendar cal = Calendar.getInstance();
360            cal.setTime(start);
361            cal.add(Calendar.MINUTE, -1);
362            start = cal.getTime();
363
364            Date nextTime = CoordCommandUtils.getNextValidActionTimeForCronFrequency(start, coordJob);
365            if (nextTime == null) {
366                throw new IllegalArgumentException("Invalid coordinator cron frequency: " + coordJob.getFrequency());
367            }
368            if (!nextTime.before(coordJob.getEndTime())) {
369                throw new IllegalArgumentException("Coordinator job with frequency '" +
370                        coordJob.getFrequency() + "' materializes no actions between start and end time.");
371            }
372        }
373    }
374
375  /*
376  * Check against multiple data instance values inside a single <instance> <start-instance> or <end-instance> tag
377  * If found, the job is not submitted and user is informed to correct the error, instead of defaulting to the first instance value in the list
378  */
379    private void checkMultipleTimeInstances(Element eCoordJob, String eventType, String dataType) throws CoordinatorJobException {
380        Element eventsSpec, dataSpec, instance;
381        List<Element> instanceSpecList;
382        Namespace ns = eCoordJob.getNamespace();
383        String instanceValue;
384        eventsSpec = eCoordJob.getChild(eventType, ns);
385        if (eventsSpec != null) {
386            dataSpec = eventsSpec.getChild(dataType, ns);
387            if (dataSpec != null) {
388                // In case of input-events, there can be multiple child <instance> datasets. Iterating to ensure none of them have errors
389                instanceSpecList = dataSpec.getChildren("instance", ns);
390                Iterator instanceIter = instanceSpecList.iterator();
391                while(instanceIter.hasNext()) {
392                    instance = ((Element) instanceIter.next());
393                    if(instance.getContentSize() == 0) { //empty string or whitespace
394                        throw new CoordinatorJobException(ErrorCode.E1021, "<instance> tag within " + eventType + " is empty!");
395                    }
396                    instanceValue = instance.getContent(0).toString();
397                    boolean isInvalid = false;
398                    try {
399                        isInvalid = evalAction.checkForExistence(instanceValue, ",");
400                    } catch (Exception e) {
401                        handleELParseException(eventType, dataType, instanceValue);
402                    }
403                    if (isInvalid) { // reaching this block implies instance is not empty i.e. length > 0
404                        handleExpresionWithMultipleInstances(eventType, dataType, instanceValue);
405                    }
406                }
407
408                // In case of input-events, there can be multiple child <start-instance> datasets. Iterating to ensure none of them have errors
409                instanceSpecList = dataSpec.getChildren("start-instance", ns);
410                instanceIter = instanceSpecList.iterator();
411                while(instanceIter.hasNext()) {
412                    instance = ((Element) instanceIter.next());
413                    if(instance.getContentSize() == 0) { //empty string or whitespace
414                        throw new CoordinatorJobException(ErrorCode.E1021, "<start-instance> tag within " + eventType + " is empty!");
415                    }
416                    instanceValue = instance.getContent(0).toString();
417                    boolean isInvalid = false;
418                    try {
419                        isInvalid = evalAction.checkForExistence(instanceValue, ",");
420                    } catch (Exception e) {
421                        handleELParseException(eventType, dataType, instanceValue);
422                    }
423                    if (isInvalid) { // reaching this block implies start instance is not empty i.e. length > 0
424                        handleExpresionWithStartMultipleInstances(eventType, dataType, instanceValue);
425                    }
426                }
427
428                // In case of input-events, there can be multiple child <end-instance> datasets. Iterating to ensure none of them have errors
429                instanceSpecList = dataSpec.getChildren("end-instance", ns);
430                instanceIter = instanceSpecList.iterator();
431                while(instanceIter.hasNext()) {
432                    instance = ((Element) instanceIter.next());
433                    if(instance.getContentSize() == 0) { //empty string or whitespace
434                        throw new CoordinatorJobException(ErrorCode.E1021, "<end-instance> tag within " + eventType + " is empty!");
435                    }
436                    instanceValue = instance.getContent(0).toString();
437                    boolean isInvalid = false;
438                    try {
439                        isInvalid = evalAction.checkForExistence(instanceValue, ",");
440                    } catch (Exception e) {
441                        handleELParseException(eventType, dataType, instanceValue);
442                    }
443                    if (isInvalid) { // reaching this block implies instance is not empty i.e. length > 0
444                        handleExpresionWithMultipleEndInstances(eventType, dataType, instanceValue);
445                    }
446                }
447
448            }
449        }
450    }
451
452    private void handleELParseException(String eventType, String dataType, String instanceValue)
453            throws CoordinatorJobException {
454        String correctAction = null;
455        if(dataType.equals(COORD_INPUT_EVENTS_DATA_IN)) {
456            correctAction = "Coordinator app definition should have valid <instance> tag for data-in";
457        } else if(dataType.equals(COORD_OUTPUT_EVENTS_DATA_OUT)) {
458            correctAction = "Coordinator app definition should have valid <instance> tag for data-out";
459        }
460        throw new CoordinatorJobException(ErrorCode.E1021, eventType + " instance '" + instanceValue
461                + "' is not valid. Coordinator job NOT SUBMITTED. " + correctAction);
462    }
463
464    private void handleExpresionWithMultipleInstances(String eventType, String dataType, String instanceValue)
465            throws CoordinatorJobException {
466        String correctAction = null;
467        if(dataType.equals(COORD_INPUT_EVENTS_DATA_IN)) {
468            correctAction = "Coordinator app definition should have separate <instance> tag per data-in instance";
469        } else if(dataType.equals(COORD_OUTPUT_EVENTS_DATA_OUT)) {
470            correctAction = "Coordinator app definition can have only one <instance> tag per data-out instance";
471        }
472        throw new CoordinatorJobException(ErrorCode.E1021, eventType + " instance '" + instanceValue
473                + "' contains more than one date instance. Coordinator job NOT SUBMITTED. " + correctAction);
474    }
475
476    private void handleExpresionWithStartMultipleInstances(String eventType, String dataType, String instanceValue)
477            throws CoordinatorJobException {
478        String correctAction = "Coordinator app definition should not have multiple start-instances";
479        throw new CoordinatorJobException(ErrorCode.E1021, eventType + " start-instance '" + instanceValue
480                + "' contains more than one date start-instance. Coordinator job NOT SUBMITTED. " + correctAction);
481    }
482
483    private void handleExpresionWithMultipleEndInstances(String eventType, String dataType, String instanceValue)
484            throws CoordinatorJobException {
485        String correctAction = "Coordinator app definition should not have multiple end-instances";
486        throw new CoordinatorJobException(ErrorCode.E1021, eventType + " end-instance '" + instanceValue
487                + "' contains more than one date end-instance. Coordinator job NOT SUBMITTED. " + correctAction);
488    }
489    /**
490     * Read the application XML and validate against coordinator Schema
491     *
492     * @return validated coordinator XML
493     * @throws CoordinatorJobException thrown if unable to read or validate coordinator xml
494     */
495    protected String readAndValidateXml() throws CoordinatorJobException {
496        String appPath = ParamChecker.notEmpty(conf.get(OozieClient.COORDINATOR_APP_PATH),
497                OozieClient.COORDINATOR_APP_PATH);
498        String coordXml = readDefinition(appPath);
499        validateXml(coordXml);
500        return coordXml;
501    }
502
503    /**
504     * Validate against Coordinator XSD file
505     *
506     * @param xmlContent : Input coordinator xml
507     * @throws CoordinatorJobException thrown if unable to validate coordinator xml
508     */
509    private void validateXml(String xmlContent) throws CoordinatorJobException {
510        javax.xml.validation.Schema schema = Services.get().get(SchemaService.class).getSchema(SchemaName.COORDINATOR);
511        Validator validator = schema.newValidator();
512        try {
513            validator.validate(new StreamSource(new StringReader(xmlContent)));
514        }
515        catch (SAXException ex) {
516            LOG.warn("SAXException :", ex);
517            throw new CoordinatorJobException(ErrorCode.E0701, ex.getMessage(), ex);
518        }
519        catch (IOException ex) {
520            LOG.warn("IOException :", ex);
521            throw new CoordinatorJobException(ErrorCode.E0702, ex.getMessage(), ex);
522        }
523    }
524
525    /**
526     * Read the application XML schema namespace
527     *
528     * @param coordXmlElement input coordinator xml Element
529     * @return app xml namespace
530     * @throws CoordinatorJobException
531     */
532    private String readAppNamespace(Element coordXmlElement) throws CoordinatorJobException {
533        Namespace ns = coordXmlElement.getNamespace();
534        if (ns != null && bundleId != null && ns.getURI().equals(SchemaService.COORDINATOR_NAMESPACE_URI_1)) {
535            throw new CoordinatorJobException(ErrorCode.E1319, "bundle app can not submit coordinator namespace "
536                    + SchemaService.COORDINATOR_NAMESPACE_URI_1 + ", please use 0.2 or later");
537        }
538        if (ns != null) {
539            return ns.getURI();
540        }
541        else {
542            throw new CoordinatorJobException(ErrorCode.E0700, "the application xml namespace is not given");
543        }
544    }
545
546    /**
547     * Merge default configuration with user-defined configuration.
548     *
549     * @throws CommandException thrown if failed to read or merge configurations
550     */
551    protected void mergeDefaultConfig() throws CommandException {
552        Path configDefault = null;
553        try {
554            String coordAppPathStr = conf.get(OozieClient.COORDINATOR_APP_PATH);
555            Path coordAppPath = new Path(coordAppPathStr);
556            String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
557            HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
558            Configuration fsConf = has.createJobConf(coordAppPath.toUri().getAuthority());
559            FileSystem fs = has.createFileSystem(user, coordAppPath.toUri(), fsConf);
560
561            // app path could be a directory
562            if (!fs.isFile(coordAppPath)) {
563                configDefault = new Path(coordAppPath, CONFIG_DEFAULT);
564            } else {
565                configDefault = new Path(coordAppPath.getParent(), CONFIG_DEFAULT);
566            }
567
568            if (fs.exists(configDefault)) {
569                Configuration defaultConf = new XConfiguration(fs.open(configDefault));
570                PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES);
571                XConfiguration.injectDefaults(defaultConf, conf);
572            }
573            else {
574                LOG.info("configDefault Doesn't exist " + configDefault);
575            }
576            PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES);
577
578            // Resolving all variables in the job properties.
579            // This ensures the Hadoop Configuration semantics is preserved.
580            XConfiguration resolvedVarsConf = new XConfiguration();
581            for (Map.Entry<String, String> entry : conf) {
582                resolvedVarsConf.set(entry.getKey(), conf.get(entry.getKey()));
583            }
584            conf = resolvedVarsConf;
585        }
586        catch (IOException e) {
587            throw new CommandException(ErrorCode.E0702, e.getMessage() + " : Problem reading default config "
588                    + configDefault, e);
589        }
590        catch (HadoopAccessorException e) {
591            throw new CommandException(e);
592        }
593        LOG.debug("Merged CONF :" + XmlUtils.prettyPrint(conf).toString());
594    }
595
596    /**
597     * The method resolve all the variables that are defined in configuration. It also include the data set definition
598     * from dataset file into XML.
599     *
600     * @param appXml : Original job XML
601     * @param conf : Configuration of the job
602     * @param coordJob : Coordinator job bean to be populated.
603     * @return Resolved and modified job XML element.
604     * @throws CoordinatorJobException thrown if failed to resolve basic entities or include referred datasets
605     * @throws Exception thrown if failed to resolve basic entities or include referred datasets
606     */
607    public Element basicResolveAndIncludeDS(String appXml, Configuration conf, CoordinatorJobBean coordJob)
608    throws CoordinatorJobException, Exception {
609        Element basicResolvedApp = resolveInitial(conf, appXml, coordJob);
610        includeDataSets(basicResolvedApp, conf);
611        return basicResolvedApp;
612    }
613
614    /**
615     * Insert data set into data-in and data-out tags.
616     *
617     * @param eAppXml : coordinator application XML
618     * @param eDatasets : DataSet XML
619     */
620    @SuppressWarnings("unchecked")
621    private void insertDataSet(Element eAppXml, Element eDatasets) {
622        // Adding DS definition in the coordinator XML
623        Element inputList = eAppXml.getChild("input-events", eAppXml.getNamespace());
624        if (inputList != null) {
625            for (Element dataIn : (List<Element>) inputList.getChildren("data-in", eAppXml.getNamespace())) {
626                Element eDataset = findDataSet(eDatasets, dataIn.getAttributeValue("dataset"));
627                dataIn.getContent().add(0, eDataset);
628            }
629        }
630        Element outputList = eAppXml.getChild("output-events", eAppXml.getNamespace());
631        if (outputList != null) {
632            for (Element dataOut : (List<Element>) outputList.getChildren("data-out", eAppXml.getNamespace())) {
633                Element eDataset = findDataSet(eDatasets, dataOut.getAttributeValue("dataset"));
634                dataOut.getContent().add(0, eDataset);
635            }
636        }
637    }
638
639    /**
640     * Find a specific dataset from a list of Datasets.
641     *
642     * @param eDatasets : List of data sets
643     * @param name : queried data set name
644     * @return one Dataset element. otherwise throw Exception
645     */
646    @SuppressWarnings("unchecked")
647    private static Element findDataSet(Element eDatasets, String name) {
648        for (Element eDataset : (List<Element>) eDatasets.getChildren("dataset", eDatasets.getNamespace())) {
649            if (eDataset.getAttributeValue("name").equals(name)) {
650                eDataset = (Element) eDataset.clone();
651                eDataset.detach();
652                return eDataset;
653            }
654        }
655        throw new RuntimeException("undefined dataset: " + name);
656    }
657
658    /**
659     * Initialize all the required EL Evaluators.
660     */
661    protected void initEvaluators() {
662        evalFreq = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-submit-freq");
663        evalNofuncs = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-submit-nofuncs");
664        evalInst = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-submit-instances");
665        evalAction = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-action-start");
666        evalTimeout = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-wait-timeout");
667        evalInitialInstance = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-submit-initial-instance");
668
669    }
670
671    /**
672     * Resolve basic entities using job Configuration.
673     *
674     * @param conf :Job configuration
675     * @param appXml : Original job XML
676     * @param coordJob : Coordinator job bean to be populated.
677     * @return Resolved job XML element.
678     * @throws CoordinatorJobException thrown if failed to resolve basic entities
679     * @throws Exception thrown if failed to resolve basic entities
680     */
681    @SuppressWarnings("unchecked")
682    protected Element resolveInitial(Configuration conf, String appXml, CoordinatorJobBean coordJob)
683    throws CoordinatorJobException, Exception {
684        Element eAppXml = XmlUtils.parseXml(appXml);
685        // job's main attributes
686        // frequency
687        String val = resolveAttribute("frequency", eAppXml, evalFreq);
688        int ival = 0;
689
690        val = ParamChecker.checkFrequency(val);
691        coordJob.setFrequency(val);
692        TimeUnit tmp = (evalFreq.getVariable("timeunit") == null) ? TimeUnit.MINUTE : ((TimeUnit) evalFreq
693                .getVariable("timeunit"));
694        try {
695            Integer.parseInt(val);
696        }
697        catch (NumberFormatException ex) {
698            tmp=TimeUnit.CRON;
699        }
700
701        addAnAttribute("freq_timeunit", eAppXml, tmp.toString());
702        // TimeUnit
703        coordJob.setTimeUnit(CoordinatorJob.Timeunit.valueOf(tmp.toString()));
704        // End Of Duration
705        tmp = evalFreq.getVariable("endOfDuration") == null ? TimeUnit.NONE : ((TimeUnit) evalFreq
706                .getVariable("endOfDuration"));
707        addAnAttribute("end_of_duration", eAppXml, tmp.toString());
708        // coordJob.setEndOfDuration(tmp) // TODO: Add new attribute in Job bean
709
710        // Application name
711        if (this.coordName == null) {
712            String name = ELUtils.resolveAppName(eAppXml.getAttribute("name").getValue(), conf);
713            coordJob.setAppName(name);
714        }
715        else {
716            // this coord job is created from bundle
717            coordJob.setAppName(this.coordName);
718        }
719
720        // start time
721        val = resolveAttribute("start", eAppXml, evalNofuncs);
722        ParamChecker.checkDateOozieTZ(val, "start");
723        coordJob.setStartTime(DateUtils.parseDateOozieTZ(val));
724        // end time
725        val = resolveAttribute("end", eAppXml, evalNofuncs);
726        ParamChecker.checkDateOozieTZ(val, "end");
727        coordJob.setEndTime(DateUtils.parseDateOozieTZ(val));
728        // Time zone
729        val = resolveAttribute("timezone", eAppXml, evalNofuncs);
730        ParamChecker.checkTimeZone(val, "timezone");
731        coordJob.setTimeZone(val);
732
733        // controls
734        val = resolveTagContents("timeout", eAppXml.getChild("controls", eAppXml.getNamespace()), evalTimeout);
735        if (val != null && val != "") {
736            int t = Integer.parseInt(val);
737            tmp = (evalTimeout.getVariable("timeunit") == null) ? TimeUnit.MINUTE : ((TimeUnit) evalTimeout
738                    .getVariable("timeunit"));
739            switch (tmp) {
740                case HOUR:
741                    val = String.valueOf(t * 60);
742                    break;
743                case DAY:
744                    val = String.valueOf(t * 60 * 24);
745                    break;
746                case MONTH:
747                    val = String.valueOf(t * 60 * 24 * 30);
748                    break;
749                default:
750                    break;
751            }
752        }
753        else {
754            val = ConfigurationService.get(CONF_DEFAULT_TIMEOUT_NORMAL);
755        }
756
757        ival = ParamChecker.checkInteger(val, "timeout");
758        if (ival < 0 || ival > ConfigurationService.getInt(CONF_DEFAULT_MAX_TIMEOUT)) {
759            ival = ConfigurationService.getInt(CONF_DEFAULT_MAX_TIMEOUT);
760        }
761        coordJob.setTimeout(ival);
762
763        val = resolveTagContents("concurrency", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs);
764        if (val == null || val.isEmpty()) {
765            val = ConfigurationService.get(CONF_DEFAULT_CONCURRENCY);
766        }
767        ival = ParamChecker.checkInteger(val, "concurrency");
768        coordJob.setConcurrency(ival);
769
770        val = resolveTagContents("throttle", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs);
771        if (val == null || val.isEmpty()) {
772            int defaultThrottle = ConfigurationService.getInt(CONF_DEFAULT_THROTTLE);
773            ival = defaultThrottle;
774        }
775        else {
776            ival = ParamChecker.checkInteger(val, "throttle");
777        }
778        int maxQueue = ConfigurationService.getInt(CONF_QUEUE_SIZE);
779        float factor = ConfigurationService.getFloat(CONF_MAT_THROTTLING_FACTOR);
780        int maxThrottle = (int) (maxQueue * factor);
781        if (ival > maxThrottle || ival < 1) {
782            ival = maxThrottle;
783        }
784        LOG.debug("max throttle " + ival);
785        coordJob.setMatThrottling(ival);
786
787        val = resolveTagContents("execution", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs);
788        if (val == "") {
789            val = Execution.FIFO.toString();
790        }
791        coordJob.setExecutionOrder(Execution.valueOf(val));
792        String[] acceptedVals = { Execution.LIFO.toString(), Execution.FIFO.toString(), Execution.LAST_ONLY.toString(),
793            Execution.NONE.toString()};
794        ParamChecker.isMember(val, acceptedVals, "execution");
795
796        // datasets
797        resolveTagContents("include", eAppXml.getChild("datasets", eAppXml.getNamespace()), evalNofuncs);
798        // for each data set
799        resolveDataSets(eAppXml);
800        HashMap<String, String> dataNameList = new HashMap<String, String>();
801        resolveIODataset(eAppXml);
802        resolveIOEvents(eAppXml, dataNameList);
803
804        if (CoordUtils.isInputLogicSpecified(eAppXml)) {
805            resolveInputLogic(eAppXml.getChild(CoordInputLogicEvaluator.INPUT_LOGIC, eAppXml.getNamespace()), evalInst,
806                    dataNameList);
807        }
808
809        resolveTagContents("app-path", eAppXml.getChild("action", eAppXml.getNamespace()).getChild("workflow",
810                eAppXml.getNamespace()), evalNofuncs);
811        // TODO: If action or workflow tag is missing, NullPointerException will
812        // occur
813        Element configElem = eAppXml.getChild("action", eAppXml.getNamespace()).getChild("workflow",
814                eAppXml.getNamespace()).getChild("configuration", eAppXml.getNamespace());
815        evalData = CoordELEvaluator.createELEvaluatorForDataEcho(conf, "coord-job-submit-data", dataNameList);
816        if (configElem != null) {
817            for (Element propElem : (List<Element>) configElem.getChildren("property", configElem.getNamespace())) {
818                resolveTagContents("name", propElem, evalData);
819                // Want to check the data-integrity but don't want to modify the
820                // XML
821                // for properties only
822                Element tmpProp = (Element) propElem.clone();
823                resolveTagContents("value", tmpProp, evalData);
824            }
825        }
826        evalSla = CoordELEvaluator.createELEvaluatorForDataAndConf(conf, "coord-sla-submit", dataNameList);
827        resolveSLA(eAppXml, coordJob);
828        return eAppXml;
829    }
830
831    /**
832     * Resolve SLA events
833     *
834     * @param eAppXml job XML
835     * @param coordJob coordinator job bean
836     * @throws CommandException thrown if failed to resolve sla events
837     */
838    private void resolveSLA(Element eAppXml, CoordinatorJobBean coordJob) throws CommandException {
839        Element eSla = XmlUtils.getSLAElement(eAppXml.getChild("action", eAppXml.getNamespace()));
840
841        if (eSla != null) {
842            String slaXml = XmlUtils.prettyPrint(eSla).toString();
843            try {
844                // EL evaluation
845                slaXml = evalSla.evaluate(slaXml, String.class);
846                // Validate against semantic SXD
847                XmlUtils.validateData(slaXml, SchemaName.SLA_ORIGINAL);
848            }
849            catch (Exception e) {
850                throw new CommandException(ErrorCode.E1004, "Validation ERROR :" + e.getMessage(), e);
851            }
852        }
853    }
854
855    /**
856     * Resolve input-events/data-in and output-events/data-out tags.
857     *
858     * @param eJobOrg : Job element
859     * @throws CoordinatorJobException thrown if failed to resolve input and output events
860     */
861    @SuppressWarnings("unchecked")
862    private void resolveIOEvents(Element eJobOrg, HashMap<String, String> dataNameList) throws CoordinatorJobException {
863        // Resolving input-events/data-in
864        // Clone the job and don't update anything in the original
865        Element eJob = (Element) eJobOrg.clone();
866        Element inputList = eJob.getChild("input-events", eJob.getNamespace());
867        if (inputList != null) {
868            TreeSet<String> eventNameSet = new TreeSet<String>();
869            for (Element dataIn : (List<Element>) inputList.getChildren("data-in", eJob.getNamespace())) {
870                String dataInName = dataIn.getAttributeValue("name");
871                dataNameList.put(dataInName, "data-in");
872                // check whether there is any duplicate data-in name
873                if (eventNameSet.contains(dataInName)) {
874                    throw new RuntimeException("Duplicate dataIn name " + dataInName);
875                }
876                else {
877                    eventNameSet.add(dataInName);
878                }
879                resolveTagContents("instance", dataIn, evalInst);
880                resolveTagContents("start-instance", dataIn, evalInst);
881                resolveTagContents("end-instance", dataIn, evalInst);
882
883            }
884        }
885        // Resolving output-events/data-out
886        Element outputList = eJob.getChild("output-events", eJob.getNamespace());
887        if (outputList != null) {
888            TreeSet<String> eventNameSet = new TreeSet<String>();
889            for (Element dataOut : (List<Element>) outputList.getChildren("data-out", eJob.getNamespace())) {
890                String dataOutName = dataOut.getAttributeValue("name");
891                dataNameList.put(dataOutName, "data-out");
892                // check whether there is any duplicate data-out name
893                if (eventNameSet.contains(dataOutName)) {
894                    throw new RuntimeException("Duplicate dataIn name " + dataOutName);
895                }
896                else {
897                    eventNameSet.add(dataOutName);
898                }
899                resolveTagContents("instance", dataOut, evalInst);
900
901            }
902        }
903
904    }
905
906    private void resolveInputLogic(Element root, ELEvaluator evalInputLogic, HashMap<String, String> dataNameList)
907            throws Exception {
908        for (Object event : root.getChildren()) {
909            Element inputElement = (Element) event;
910            resolveAttribute("dataset", inputElement, evalInputLogic);
911            String name=resolveAttribute("name", inputElement, evalInputLogic);
912            resolveAttribute("or", inputElement, evalInputLogic);
913            resolveAttribute("and", inputElement, evalInputLogic);
914            resolveAttribute("combine", inputElement, evalInputLogic);
915
916            if (name != null) {
917                dataNameList.put(name, "data-in");
918            }
919
920            if (!inputElement.getChildren().isEmpty()) {
921                resolveInputLogic(inputElement, evalInputLogic, dataNameList);
922            }
923        }
924    }
925
926    /**
927     * Resolve input-events/dataset and output-events/dataset tags.
928     *
929     * @param eJob : Job element
930     * @throws CoordinatorJobException thrown if failed to resolve input and output events
931     */
932    @SuppressWarnings("unchecked")
933    private void resolveIODataset(Element eAppXml) throws CoordinatorJobException {
934        // Resolving input-events/data-in
935        Element inputList = eAppXml.getChild("input-events", eAppXml.getNamespace());
936        if (inputList != null) {
937            for (Element dataIn : (List<Element>) inputList.getChildren("data-in", eAppXml.getNamespace())) {
938                resolveAttribute("dataset", dataIn, evalInst);
939
940            }
941        }
942        // Resolving output-events/data-out
943        Element outputList = eAppXml.getChild("output-events", eAppXml.getNamespace());
944        if (outputList != null) {
945            for (Element dataOut : (List<Element>) outputList.getChildren("data-out", eAppXml.getNamespace())) {
946                resolveAttribute("dataset", dataOut, evalInst);
947
948            }
949        }
950
951    }
952
953
954    /**
955     * Add an attribute into XML element.
956     *
957     * @param attrName :attribute name
958     * @param elem : Element to add attribute
959     * @param value :Value of attribute
960     */
961    private void addAnAttribute(String attrName, Element elem, String value) {
962        elem.setAttribute(attrName, value);
963    }
964
965    /**
966     * Resolve datasets using job configuration.
967     *
968     * @param eAppXml : Job Element XML
969     * @throws Exception thrown if failed to resolve datasets
970     */
971    @SuppressWarnings("unchecked")
972    private void resolveDataSets(Element eAppXml) throws Exception {
973        Element datasetList = eAppXml.getChild("datasets", eAppXml.getNamespace());
974        if (datasetList != null) {
975
976            List<Element> dsElems = datasetList.getChildren("dataset", eAppXml.getNamespace());
977            resolveDataSets(dsElems);
978            resolveTagContents("app-path", eAppXml.getChild("action", eAppXml.getNamespace()).getChild("workflow",
979                    eAppXml.getNamespace()), evalNofuncs);
980        }
981    }
982
983    /**
984     * Resolve datasets using job configuration.
985     *
986     * @param dsElems : Data set XML element.
987     * @throws CoordinatorJobException thrown if failed to resolve datasets
988     */
989    private void resolveDataSets(List<Element> dsElems) throws CoordinatorJobException {
990        for (Element dsElem : dsElems) {
991            // Setting up default TimeUnit and EndOFDuraion
992            evalFreq.setVariable("timeunit", TimeUnit.MINUTE);
993            evalFreq.setVariable("endOfDuration", TimeUnit.NONE);
994
995            String val = resolveAttribute("frequency", dsElem, evalFreq);
996            int ival = ParamChecker.checkInteger(val, "frequency");
997            ParamChecker.checkGTZero(ival, "frequency");
998            addAnAttribute("freq_timeunit", dsElem, evalFreq.getVariable("timeunit") == null ? TimeUnit.MINUTE
999                    .toString() : ((TimeUnit) evalFreq.getVariable("timeunit")).toString());
1000            addAnAttribute("end_of_duration", dsElem, evalFreq.getVariable("endOfDuration") == null ? TimeUnit.NONE
1001                    .toString() : ((TimeUnit) evalFreq.getVariable("endOfDuration")).toString());
1002            val = resolveAttribute("initial-instance", dsElem, evalInitialInstance);
1003            ParamChecker.checkDateOozieTZ(val, "initial-instance");
1004            checkInitialInstance(val);
1005            val = resolveAttribute("timezone", dsElem, evalNofuncs);
1006            ParamChecker.checkTimeZone(val, "timezone");
1007            resolveTagContents("uri-template", dsElem, evalNofuncs);
1008            resolveTagContents("done-flag", dsElem, evalNofuncs);
1009        }
1010    }
1011
1012    /**
1013     * Resolve the content of a tag.
1014     *
1015     * @param tagName : Tag name of job XML i.e. <timeout> 10 </timeout>
1016     * @param elem : Element where the tag exists.
1017     * @param eval : EL evealuator
1018     * @return Resolved tag content.
1019     * @throws CoordinatorJobException thrown if failed to resolve tag content
1020     */
1021    @SuppressWarnings("unchecked")
1022    private String resolveTagContents(String tagName, Element elem, ELEvaluator eval) throws CoordinatorJobException {
1023        String ret = "";
1024        if (elem != null) {
1025            for (Element tagElem : (List<Element>) elem.getChildren(tagName, elem.getNamespace())) {
1026                if (tagElem != null) {
1027                    String updated;
1028                    try {
1029                        updated = CoordELFunctions.evalAndWrap(eval, tagElem.getText().trim());
1030
1031                    }
1032                    catch (Exception e) {
1033                        throw new CoordinatorJobException(ErrorCode.E1004, e.getMessage(), e);
1034                    }
1035                    tagElem.removeContent();
1036                    tagElem.addContent(updated);
1037                    ret += updated;
1038                }
1039            }
1040        }
1041        return ret;
1042    }
1043
1044    /**
1045     * Resolve an attribute value.
1046     *
1047     * @param attrName : Attribute name.
1048     * @param elem : XML Element where attribute is defiend
1049     * @param eval : ELEvaluator used to resolve
1050     * @return Resolved attribute value
1051     * @throws CoordinatorJobException thrown if failed to resolve an attribute value
1052     */
1053    private String resolveAttribute(String attrName, Element elem, ELEvaluator eval) throws CoordinatorJobException {
1054        Attribute attr = elem.getAttribute(attrName);
1055        String val = null;
1056        if (attr != null) {
1057            try {
1058                val = CoordELFunctions.evalAndWrap(eval, attr.getValue().trim());
1059            }
1060            catch (Exception e) {
1061                throw new CoordinatorJobException(ErrorCode.E1004, e.getMessage(), e);
1062            }
1063            attr.setValue(val);
1064        }
1065        return val;
1066    }
1067
1068    /**
1069     * Include referred datasets into XML.
1070     *
1071     * @param resolvedXml : Job XML element.
1072     * @param conf : Job configuration
1073     * @throws CoordinatorJobException thrown if failed to include referred datasets into XML
1074     */
1075    @SuppressWarnings("unchecked")
1076    protected void includeDataSets(Element resolvedXml, Configuration conf) throws CoordinatorJobException {
1077        Element datasets = resolvedXml.getChild("datasets", resolvedXml.getNamespace());
1078        Element allDataSets = new Element("all_datasets", resolvedXml.getNamespace());
1079        List<String> dsList = new ArrayList<String>();
1080        if (datasets != null) {
1081            for (Element includeElem : (List<Element>) datasets.getChildren("include", datasets.getNamespace())) {
1082                String incDSFile = includeElem.getTextTrim();
1083                includeOneDSFile(incDSFile, dsList, allDataSets, datasets.getNamespace());
1084            }
1085            for (Element e : (List<Element>) datasets.getChildren("dataset", datasets.getNamespace())) {
1086                String dsName = e.getAttributeValue("name");
1087                if (dsList.contains(dsName)) {// Override with this DS
1088                    // Remove duplicate
1089                    removeDataSet(allDataSets, dsName);
1090                }
1091                else {
1092                    dsList.add(dsName);
1093                }
1094                allDataSets.addContent((Element) e.clone());
1095            }
1096        }
1097        insertDataSet(resolvedXml, allDataSets);
1098        resolvedXml.removeChild("datasets", resolvedXml.getNamespace());
1099    }
1100
1101    /**
1102     * Include one dataset file.
1103     *
1104     * @param incDSFile : Include data set filename.
1105     * @param dsList :List of dataset names to verify the duplicate.
1106     * @param allDataSets : Element that includes all dataset definitions.
1107     * @param dsNameSpace : Data set name space
1108     * @throws CoordinatorJobException thrown if failed to include one dataset file
1109     */
1110    @SuppressWarnings("unchecked")
1111    private void includeOneDSFile(String incDSFile, List<String> dsList, Element allDataSets, Namespace dsNameSpace)
1112    throws CoordinatorJobException {
1113        Element tmpDataSets = null;
1114        try {
1115            String dsXml = readDefinition(incDSFile);
1116            LOG.debug("DSFILE :" + incDSFile + "\n" + dsXml);
1117            tmpDataSets = XmlUtils.parseXml(dsXml);
1118        }
1119        catch (JDOMException e) {
1120            LOG.warn("Error parsing included dataset [{0}].  Message [{1}]", incDSFile, e.getMessage());
1121            throw new CoordinatorJobException(ErrorCode.E0700, e.getMessage());
1122        }
1123        resolveDataSets(tmpDataSets.getChildren("dataset"));
1124        for (Element e : (List<Element>) tmpDataSets.getChildren("dataset")) {
1125            String dsName = e.getAttributeValue("name");
1126            if (dsList.contains(dsName)) {
1127                throw new RuntimeException("Duplicate Dataset " + dsName);
1128            }
1129            dsList.add(dsName);
1130            Element tmp = (Element) e.clone();
1131            // TODO: Don't like to over-write the external/include DS's namespace
1132            tmp.setNamespace(dsNameSpace);
1133            tmp.getChild("uri-template").setNamespace(dsNameSpace);
1134            if (e.getChild("done-flag") != null) {
1135                tmp.getChild("done-flag").setNamespace(dsNameSpace);
1136            }
1137            allDataSets.addContent(tmp);
1138        }
1139        // nested include
1140        for (Element includeElem : (List<Element>) tmpDataSets.getChildren("include", tmpDataSets.getNamespace())) {
1141            String incFile = includeElem.getTextTrim();
1142            includeOneDSFile(incFile, dsList, allDataSets, dsNameSpace);
1143        }
1144    }
1145
1146    /**
1147     * Remove a dataset from a list of dataset.
1148     *
1149     * @param eDatasets : List of dataset
1150     * @param name : Dataset name to be removed.
1151     */
1152    @SuppressWarnings("unchecked")
1153    private static void removeDataSet(Element eDatasets, String name) {
1154        for (Element eDataset : (List<Element>) eDatasets.getChildren("dataset", eDatasets.getNamespace())) {
1155            if (eDataset.getAttributeValue("name").equals(name)) {
1156                eDataset.detach();
1157                return;
1158            }
1159        }
1160        throw new RuntimeException("undefined dataset: " + name);
1161    }
1162
1163    /**
1164     * Read coordinator definition.
1165     *
1166     * @param appPath application path.
1167     * @return coordinator definition.
1168     * @throws CoordinatorJobException thrown if the definition could not be read.
1169     */
1170    protected String readDefinition(String appPath) throws CoordinatorJobException {
1171        String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
1172        // Configuration confHadoop = CoordUtils.getHadoopConf(conf);
1173        try {
1174            URI uri = new URI(appPath);
1175            LOG.debug("user =" + user);
1176            HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
1177            Configuration fsConf = has.createJobConf(uri.getAuthority());
1178            FileSystem fs = has.createFileSystem(user, uri, fsConf);
1179            Path appDefPath = null;
1180
1181            // app path could be a directory
1182            Path path = new Path(uri.getPath());
1183            // check file exists for dataset include file, app xml already checked
1184            if (!fs.exists(path)) {
1185                throw new URISyntaxException(path.toString(), "path not existed : " + path.toString());
1186            }
1187            if (!fs.isFile(path)) {
1188                appDefPath = new Path(path, COORDINATOR_XML_FILE);
1189            } else {
1190                appDefPath = path;
1191            }
1192
1193            Reader reader = new InputStreamReader(fs.open(appDefPath));
1194            StringWriter writer = new StringWriter();
1195            IOUtils.copyCharStream(reader, writer);
1196            return writer.toString();
1197        }
1198        catch (IOException ex) {
1199            LOG.warn("IOException :" + XmlUtils.prettyPrint(conf), ex);
1200            throw new CoordinatorJobException(ErrorCode.E1001, ex.getMessage(), ex);
1201        }
1202        catch (URISyntaxException ex) {
1203            LOG.warn("URISyException :" + ex.getMessage());
1204            throw new CoordinatorJobException(ErrorCode.E1002, appPath, ex.getMessage(), ex);
1205        }
1206        catch (HadoopAccessorException ex) {
1207            throw new CoordinatorJobException(ex);
1208        }
1209        catch (Exception ex) {
1210            LOG.warn("Exception :", ex);
1211            throw new CoordinatorJobException(ErrorCode.E1001, ex.getMessage(), ex);
1212        }
1213    }
1214
1215    /**
1216     * Write a coordinator job into database
1217     *
1218     *@param appXML : Coordinator definition xml
1219     * @param eJob : XML element of job
1220     * @param coordJob : Coordinator job bean
1221     * @return Job id
1222     * @throws CommandException thrown if unable to save coordinator job to db
1223     */
1224    protected String storeToDB(String appXML, Element eJob, CoordinatorJobBean coordJob) throws CommandException {
1225        String jobId = Services.get().get(UUIDService.class).generateId(ApplicationType.COORDINATOR);
1226        coordJob.setId(jobId);
1227
1228        coordJob.setAppPath(conf.get(OozieClient.COORDINATOR_APP_PATH));
1229        coordJob.setCreatedTime(new Date());
1230        coordJob.setUser(conf.get(OozieClient.USER_NAME));
1231        String group = ConfigUtils.getWithDeprecatedCheck(conf, OozieClient.JOB_ACL, OozieClient.GROUP_NAME, null);
1232        coordJob.setGroup(group);
1233        coordJob.setConf(XmlUtils.prettyPrint(conf).toString());
1234        coordJob.setJobXml(XmlUtils.prettyPrint(eJob).toString());
1235        coordJob.setLastActionNumber(0);
1236        coordJob.setLastModifiedTime(new Date());
1237
1238        if (!dryrun) {
1239            coordJob.setLastModifiedTime(new Date());
1240            try {
1241                CoordJobQueryExecutor.getInstance().insert(coordJob);
1242            }
1243            catch (JPAExecutorException jpaee) {
1244                coordJob.setId(null);
1245                coordJob.setStatus(CoordinatorJob.Status.FAILED);
1246                throw new CommandException(jpaee);
1247            }
1248        }
1249        return jobId;
1250    }
1251
1252    /*
1253     * this method checks if the initial-instance specified for a particular
1254       is not a date earlier than the oozie server default Jan 01, 1970 00:00Z UTC
1255     */
1256    private void checkInitialInstance(String val) throws CoordinatorJobException, IllegalArgumentException {
1257        Date initialInstance, givenInstance;
1258        try {
1259            initialInstance = DateUtils.parseDateUTC("1970-01-01T00:00Z");
1260            givenInstance = DateUtils.parseDateOozieTZ(val);
1261        }
1262        catch (Exception e) {
1263            throw new IllegalArgumentException("Unable to parse dataset initial-instance string '" + val +
1264                                               "' to Date object. ",e);
1265        }
1266        if(givenInstance.compareTo(initialInstance) < 0) {
1267            throw new CoordinatorJobException(ErrorCode.E1021, "Dataset initial-instance " + val +
1268                    " is earlier than the default initial instance " + DateUtils.formatDateOozieTZ(initialInstance));
1269        }
1270    }
1271
1272    /* (non-Javadoc)
1273     * @see org.apache.oozie.command.XCommand#getEntityKey()
1274     */
1275    @Override
1276    public String getEntityKey() {
1277        return null;
1278    }
1279
1280    /* (non-Javadoc)
1281     * @see org.apache.oozie.command.XCommand#isLockRequired()
1282     */
1283    @Override
1284    protected boolean isLockRequired() {
1285        return false;
1286    }
1287
1288    /* (non-Javadoc)
1289     * @see org.apache.oozie.command.XCommand#loadState()
1290     */
1291    @Override
1292    protected void loadState() throws CommandException {
1293        jpaService = Services.get().get(JPAService.class);
1294        if (jpaService == null) {
1295            throw new CommandException(ErrorCode.E0610);
1296        }
1297        coordJob = new CoordinatorJobBean();
1298        if (this.bundleId != null) {
1299            // this coord job is created from bundle
1300            coordJob.setBundleId(this.bundleId);
1301            // first use bundle id if submit thru bundle
1302            LogUtils.setLogInfo(this.bundleId);
1303        }
1304        if (this.coordName != null) {
1305            // this coord job is created from bundle
1306            coordJob.setAppName(this.coordName);
1307        }
1308        setJob(coordJob);
1309
1310    }
1311
1312    /* (non-Javadoc)
1313     * @see org.apache.oozie.command.XCommand#verifyPrecondition()
1314     */
1315    @Override
1316    protected void verifyPrecondition() throws CommandException {
1317
1318    }
1319
1320    /* (non-Javadoc)
1321     * @see org.apache.oozie.command.TransitionXCommand#notifyParent()
1322     */
1323    @Override
1324    public void notifyParent() throws CommandException {
1325        // update bundle action
1326        if (coordJob.getBundleId() != null) {
1327            LOG.debug("Updating bundle record: " + coordJob.getBundleId() + " for coord id: " + coordJob.getId());
1328            BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus);
1329            bundleStatusUpdate.call();
1330        }
1331    }
1332
1333    /* (non-Javadoc)
1334     * @see org.apache.oozie.command.TransitionXCommand#updateJob()
1335     */
1336    @Override
1337    public void updateJob() throws CommandException {
1338    }
1339
1340    /* (non-Javadoc)
1341     * @see org.apache.oozie.command.TransitionXCommand#getJob()
1342     */
1343    @Override
1344    public Job getJob() {
1345        return coordJob;
1346    }
1347
1348    @Override
1349    public void performWrites() throws CommandException {
1350    }
1351}