001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.command.coord;
019
020import java.io.IOException;
021import java.io.InputStreamReader;
022import java.io.Reader;
023import java.io.StringReader;
024import java.io.StringWriter;
025import java.net.URI;
026import java.net.URISyntaxException;
027import java.util.ArrayList;
028import java.util.Calendar;
029import java.util.Date;
030import java.util.HashMap;
031import java.util.HashSet;
032import java.util.Iterator;
033import java.util.List;
034import java.util.Map;
035import java.util.Set;
036import java.util.TreeSet;
037
038import javax.xml.transform.stream.StreamSource;
039import javax.xml.validation.Validator;
040
041import org.apache.hadoop.conf.Configuration;
042import org.apache.hadoop.fs.FileSystem;
043import org.apache.hadoop.fs.Path;
044import org.apache.oozie.CoordinatorJobBean;
045import org.apache.oozie.ErrorCode;
046import org.apache.oozie.client.CoordinatorJob;
047import org.apache.oozie.client.Job;
048import org.apache.oozie.client.OozieClient;
049import org.apache.oozie.client.CoordinatorJob.Execution;
050import org.apache.oozie.command.CommandException;
051import org.apache.oozie.command.SubmitTransitionXCommand;
052import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
053import org.apache.oozie.coord.CoordELEvaluator;
054import org.apache.oozie.coord.CoordELFunctions;
055import org.apache.oozie.coord.CoordinatorJobException;
056import org.apache.oozie.coord.TimeUnit;
057import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
058import org.apache.oozie.executor.jpa.JPAExecutorException;
059import org.apache.oozie.service.DagXLogInfoService;
060import org.apache.oozie.service.HadoopAccessorException;
061import org.apache.oozie.service.HadoopAccessorService;
062import org.apache.oozie.service.JPAService;
063import org.apache.oozie.service.SchemaService;
064import org.apache.oozie.service.Service;
065import org.apache.oozie.service.Services;
066import org.apache.oozie.service.UUIDService;
067import org.apache.oozie.service.SchemaService.SchemaName;
068import org.apache.oozie.service.UUIDService.ApplicationType;
069import org.apache.oozie.util.ConfigUtils;
070import org.apache.oozie.util.DateUtils;
071import org.apache.oozie.util.ELEvaluator;
072import org.apache.oozie.util.ELUtils;
073import org.apache.oozie.util.IOUtils;
074import org.apache.oozie.util.InstrumentUtils;
075import org.apache.oozie.util.LogUtils;
076import org.apache.oozie.util.ParamChecker;
077import org.apache.oozie.util.ParameterVerifier;
078import org.apache.oozie.util.ParameterVerifierException;
079import org.apache.oozie.util.PropertiesUtils;
080import org.apache.oozie.util.XConfiguration;
081import org.apache.oozie.util.XLog;
082import org.apache.oozie.util.XmlUtils;
083import org.jdom.Attribute;
084import org.jdom.Element;
085import org.jdom.JDOMException;
086import org.jdom.Namespace;
087import org.xml.sax.SAXException;
088
089/**
090 * This class provides the functionalities to resolve a coordinator job XML and write the job information into a DB
091 * table.
092 * <p/>
093 * Specifically it performs the following functions: 1. Resolve all the variables or properties using job
094 * configurations. 2. Insert all datasets definition as part of the <data-in> and <data-out> tags. 3. Validate the XML
095 * at runtime.
096 */
097public class CoordSubmitXCommand extends SubmitTransitionXCommand {
098
099    protected Configuration conf;
100    private final String bundleId;
101    private final String coordName;
102    protected boolean dryrun;
103    protected JPAService jpaService = null;
104    private CoordinatorJob.Status prevStatus = CoordinatorJob.Status.PREP;
105
106    public static final String CONFIG_DEFAULT = "coord-config-default.xml";
107    public static final String COORDINATOR_XML_FILE = "coordinator.xml";
108    public final String COORD_INPUT_EVENTS ="input-events";
109    public final String COORD_OUTPUT_EVENTS = "output-events";
110    public final String COORD_INPUT_EVENTS_DATA_IN ="data-in";
111    public final String COORD_OUTPUT_EVENTS_DATA_OUT = "data-out";
112
113    private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>();
114    private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>();
115
116    protected CoordinatorJobBean coordJob = null;
117    /**
118     * Default timeout for normal jobs, in minutes, after which coordinator input check will timeout
119     */
120    public static final String CONF_DEFAULT_TIMEOUT_NORMAL = Service.CONF_PREFIX + "coord.normal.default.timeout";
121
122    public static final String CONF_DEFAULT_CONCURRENCY = Service.CONF_PREFIX + "coord.default.concurrency";
123
124    public static final String CONF_DEFAULT_THROTTLE = Service.CONF_PREFIX + "coord.default.throttle";
125
126    public static final String CONF_MAT_THROTTLING_FACTOR = Service.CONF_PREFIX
127            + "coord.materialization.throttling.factor";
128
129    /**
130     * Default MAX timeout in minutes, after which coordinator input check will timeout
131     */
132    public static final String CONF_DEFAULT_MAX_TIMEOUT = Service.CONF_PREFIX + "coord.default.max.timeout";
133
134    public static final String CONF_QUEUE_SIZE = Service.CONF_PREFIX + "CallableQueueService.queue.size";
135
136    public static final String CONF_CHECK_MAX_FREQUENCY = Service.CONF_PREFIX + "coord.check.maximum.frequency";
137
138    private ELEvaluator evalFreq = null;
139    private ELEvaluator evalNofuncs = null;
140    private ELEvaluator evalData = null;
141    private ELEvaluator evalInst = null;
142    private ELEvaluator evalAction = null;
143    private ELEvaluator evalSla = null;
144    private ELEvaluator evalTimeout = null;
145
146    static {
147        String[] badUserProps = { PropertiesUtils.YEAR, PropertiesUtils.MONTH, PropertiesUtils.DAY,
148                PropertiesUtils.HOUR, PropertiesUtils.MINUTE, PropertiesUtils.DAYS, PropertiesUtils.HOURS,
149                PropertiesUtils.MINUTES, PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB,
150                PropertiesUtils.TB, PropertiesUtils.PB, PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN,
151                PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN, PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS };
152        PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES);
153
154        String[] badDefaultProps = { PropertiesUtils.HADOOP_USER};
155        PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES);
156        PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES);
157    }
158
159    /**
160     * Constructor to create the Coordinator Submit Command.
161     *
162     * @param conf : Configuration for Coordinator job
163     */
164    public CoordSubmitXCommand(Configuration conf) {
165        super("coord_submit", "coord_submit", 1);
166        this.conf = ParamChecker.notNull(conf, "conf");
167        this.bundleId = null;
168        this.coordName = null;
169    }
170
171    /**
172     * Constructor to create the Coordinator Submit Command by bundle job.
173     *
174     * @param conf : Configuration for Coordinator job
175     * @param bundleId : bundle id
176     * @param coordName : coord name
177     */
178    public CoordSubmitXCommand(Configuration conf, String bundleId, String coordName) {
179        super("coord_submit", "coord_submit", 1);
180        this.conf = ParamChecker.notNull(conf, "conf");
181        this.bundleId = ParamChecker.notEmpty(bundleId, "bundleId");
182        this.coordName = ParamChecker.notEmpty(coordName, "coordName");
183    }
184
185    /**
186     * Constructor to create the Coordinator Submit Command.
187     *
188     * @param dryrun : if dryrun
189     * @param conf : Configuration for Coordinator job
190     */
191    public CoordSubmitXCommand(boolean dryrun, Configuration conf) {
192        this(conf);
193        this.dryrun = dryrun;
194    }
195
196    /* (non-Javadoc)
197     * @see org.apache.oozie.command.XCommand#execute()
198     */
199    @Override
200    protected String submit() throws CommandException {
201        LOG.info("STARTED Coordinator Submit");
202        String jobId = submitJob();
203        LOG.info("ENDED Coordinator Submit jobId=" + jobId);
204        return jobId;
205    }
206
207    protected String submitJob() throws CommandException {
208        String jobId = null;
209        InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
210
211        boolean exceptionOccured = false;
212        try {
213            mergeDefaultConfig();
214
215            String appXml = readAndValidateXml();
216            coordJob.setOrigJobXml(appXml);
217            LOG.debug("jobXml after initial validation " + XmlUtils.prettyPrint(appXml).toString());
218
219            Element eXml = XmlUtils.parseXml(appXml);
220
221            String appNamespace = readAppNamespace(eXml);
222            coordJob.setAppNamespace(appNamespace);
223
224            ParameterVerifier.verifyParameters(conf, eXml);
225
226            appXml = XmlUtils.removeComments(appXml);
227            initEvaluators();
228            Element eJob = basicResolveAndIncludeDS(appXml, conf, coordJob);
229
230            validateCoordinatorJob();
231
232            // checking if the coordinator application data input/output events
233            // specify multiple data instance values in erroneous manner
234            checkMultipleTimeInstances(eJob, COORD_INPUT_EVENTS, COORD_INPUT_EVENTS_DATA_IN);
235            checkMultipleTimeInstances(eJob, COORD_OUTPUT_EVENTS, COORD_OUTPUT_EVENTS_DATA_OUT);
236
237            LOG.debug("jobXml after all validation " + XmlUtils.prettyPrint(eJob).toString());
238
239            jobId = storeToDB(appXml, eJob, coordJob);
240            // log job info for coordinator job
241            LogUtils.setLogInfo(coordJob, logInfo);
242            LOG = XLog.resetPrefix(LOG);
243
244            if (!dryrun) {
245                queueMaterializeTransitionXCommand(jobId);
246            }
247            else {
248                return getDryRun(coordJob);
249            }
250        }
251        catch (JDOMException jex) {
252            exceptionOccured = true;
253            LOG.warn("ERROR: ", jex);
254            throw new CommandException(ErrorCode.E0700, jex.getMessage(), jex);
255        }
256        catch (CoordinatorJobException cex) {
257            exceptionOccured = true;
258            LOG.warn("ERROR:  ", cex);
259            throw new CommandException(cex);
260        }
261        catch (ParameterVerifierException pex) {
262            exceptionOccured = true;
263            LOG.warn("ERROR: ", pex);
264            throw new CommandException(pex);
265        }
266        catch (IllegalArgumentException iex) {
267            exceptionOccured = true;
268            LOG.warn("ERROR:  ", iex);
269            throw new CommandException(ErrorCode.E1003, iex.getMessage(), iex);
270        }
271        catch (Exception ex) {
272            exceptionOccured = true;
273            LOG.warn("ERROR:  ", ex);
274            throw new CommandException(ErrorCode.E0803, ex.getMessage(), ex);
275        }
276        finally {
277            if (exceptionOccured) {
278                if (coordJob.getId() == null || coordJob.getId().equalsIgnoreCase("")) {
279                    coordJob.setStatus(CoordinatorJob.Status.FAILED);
280                    coordJob.resetPending();
281                }
282            }
283        }
284        return jobId;
285    }
286
287    /**
288     * Gets the dryrun output.
289     *
290     * @param jobId the job id
291     * @return the dry run
292     * @throws Exception the exception
293     */
294    protected String getDryRun(CoordinatorJobBean coordJob) throws Exception{
295        Date startTime = coordJob.getStartTime();
296        long startTimeMilli = startTime.getTime();
297        long endTimeMilli = startTimeMilli + (3600 * 1000);
298        Date jobEndTime = coordJob.getEndTime();
299        Date endTime = new Date(endTimeMilli);
300        if (endTime.compareTo(jobEndTime) > 0) {
301            endTime = jobEndTime;
302        }
303        String jobId = coordJob.getId();
304        LOG.info("[" + jobId + "]: Update status to RUNNING");
305        coordJob.setStatus(Job.Status.RUNNING);
306        coordJob.setPending();
307        CoordActionMaterializeCommand coordActionMatCom = new CoordActionMaterializeCommand(jobId, startTime,
308                endTime);
309        Configuration jobConf = null;
310        try {
311            jobConf = new XConfiguration(new StringReader(coordJob.getConf()));
312        }
313        catch (IOException e1) {
314            LOG.warn("Configuration parse error. read from DB :" + coordJob.getConf(), e1);
315        }
316        String action = coordActionMatCom.materializeJobs(true, coordJob, jobConf, null);
317        String output = coordJob.getJobXml() + System.getProperty("line.separator")
318        + "***actions for instance***" + action;
319        return output;
320    }
321
322    /**
323     * Queue MaterializeTransitionXCommand
324     */
325    protected void queueMaterializeTransitionXCommand(String jobId) {
326        // submit a command to materialize jobs for the next 1 hour (3600 secs)
327        // so we don't wait 10 mins for the Service to run.
328        queue(new CoordMaterializeTransitionXCommand(jobId, 3600), 100);
329    }
330
331    /**
332     * Method that validates values in the definition for correctness. Placeholder to add more.
333     */
334    private void validateCoordinatorJob() throws Exception {
335        // check if startTime < endTime
336        if (!coordJob.getStartTime().before(coordJob.getEndTime())) {
337            throw new IllegalArgumentException("Coordinator Start Time must be earlier than End Time.");
338        }
339
340        try {
341            // Check if a coord job with cron frequency will materialize actions
342            int freq = Integer.parseInt(coordJob.getFrequency());
343
344            // Check if the frequency is faster than 5 min if enabled
345            if (Services.get().getConf().getBoolean(CONF_CHECK_MAX_FREQUENCY, true)) {
346                CoordinatorJob.Timeunit unit = coordJob.getTimeUnit();
347                if (freq == 0 || (freq < 5 && unit == CoordinatorJob.Timeunit.MINUTE)) {
348                    throw new IllegalArgumentException("Coordinator job with frequency [" + freq +
349                            "] minutes is faster than allowed maximum of 5 minutes ("
350                            + CONF_CHECK_MAX_FREQUENCY + " is set to true)");
351                }
352            }
353        } catch (NumberFormatException e) {
354            Date start = coordJob.getStartTime();
355            Calendar cal = Calendar.getInstance();
356            cal.setTime(start);
357            cal.add(Calendar.MINUTE, -1);
358            start = cal.getTime();
359
360            Date nextTime = CoordCommandUtils.getNextValidActionTimeForCronFrequency(start, coordJob);
361            if (!nextTime.before(coordJob.getEndTime())) {
362                throw new IllegalArgumentException("Coordinator job with frequency '" +
363                        coordJob.getFrequency() + "' materializes no actions between start and end time.");
364            }
365        }
366    }
367
368  /*
369  * Check against multiple data instance values inside a single <instance> <start-instance> or <end-instance> tag
370  * If found, the job is not submitted and user is informed to correct the error, instead of defaulting to the first instance value in the list
371  */
372    private void checkMultipleTimeInstances(Element eCoordJob, String eventType, String dataType) throws CoordinatorJobException {
373        Element eventsSpec, dataSpec, instance;
374        List<Element> instanceSpecList;
375        Namespace ns = eCoordJob.getNamespace();
376        String instanceValue;
377        eventsSpec = eCoordJob.getChild(eventType, ns);
378        if (eventsSpec != null) {
379            dataSpec = eventsSpec.getChild(dataType, ns);
380            if (dataSpec != null) {
381                // In case of input-events, there can be multiple child <instance> datasets. Iterating to ensure none of them have errors
382                instanceSpecList = dataSpec.getChildren("instance", ns);
383                Iterator instanceIter = instanceSpecList.iterator();
384                while(instanceIter.hasNext()) {
385                    instance = ((Element) instanceIter.next());
386                    if(instance.getContentSize() == 0) { //empty string or whitespace
387                        throw new CoordinatorJobException(ErrorCode.E1021, "<instance> tag within " + eventType + " is empty!");
388                    }
389                    instanceValue = instance.getContent(0).toString();
390                    boolean isInvalid = false;
391                    try {
392                        isInvalid = evalAction.checkForExistence(instanceValue, ",");
393                    } catch (Exception e) {
394                        handleELParseException(eventType, dataType, instanceValue);
395                    }
396                    if (isInvalid) { // reaching this block implies instance is not empty i.e. length > 0
397                        handleExpresionWithMultipleInstances(eventType, dataType, instanceValue);
398                    }
399                }
400
401                // In case of input-events, there can be multiple child <start-instance> datasets. Iterating to ensure none of them have errors
402                instanceSpecList = dataSpec.getChildren("start-instance", ns);
403                instanceIter = instanceSpecList.iterator();
404                while(instanceIter.hasNext()) {
405                    instance = ((Element) instanceIter.next());
406                    if(instance.getContentSize() == 0) { //empty string or whitespace
407                        throw new CoordinatorJobException(ErrorCode.E1021, "<start-instance> tag within " + eventType + " is empty!");
408                    }
409                    instanceValue = instance.getContent(0).toString();
410                    boolean isInvalid = false;
411                    try {
412                        isInvalid = evalAction.checkForExistence(instanceValue, ",");
413                    } catch (Exception e) {
414                        handleELParseException(eventType, dataType, instanceValue);
415                    }
416                    if (isInvalid) { // reaching this block implies start instance is not empty i.e. length > 0
417                        handleExpresionWithStartMultipleInstances(eventType, dataType, instanceValue);
418                    }
419                }
420
421                // In case of input-events, there can be multiple child <end-instance> datasets. Iterating to ensure none of them have errors
422                instanceSpecList = dataSpec.getChildren("end-instance", ns);
423                instanceIter = instanceSpecList.iterator();
424                while(instanceIter.hasNext()) {
425                    instance = ((Element) instanceIter.next());
426                    if(instance.getContentSize() == 0) { //empty string or whitespace
427                        throw new CoordinatorJobException(ErrorCode.E1021, "<end-instance> tag within " + eventType + " is empty!");
428                    }
429                    instanceValue = instance.getContent(0).toString();
430                    boolean isInvalid = false;
431                    try {
432                        isInvalid = evalAction.checkForExistence(instanceValue, ",");
433                    } catch (Exception e) {
434                        handleELParseException(eventType, dataType, instanceValue);
435                    }
436                    if (isInvalid) { // reaching this block implies instance is not empty i.e. length > 0
437                        handleExpresionWithMultipleEndInstances(eventType, dataType, instanceValue);
438                    }
439                }
440
441            }
442        }
443    }
444
445    private void handleELParseException(String eventType, String dataType, String instanceValue)
446            throws CoordinatorJobException {
447        String correctAction = null;
448        if(dataType.equals(COORD_INPUT_EVENTS_DATA_IN)) {
449            correctAction = "Coordinator app definition should have valid <instance> tag for data-in";
450        } else if(dataType.equals(COORD_OUTPUT_EVENTS_DATA_OUT)) {
451            correctAction = "Coordinator app definition should have valid <instance> tag for data-out";
452        }
453        throw new CoordinatorJobException(ErrorCode.E1021, eventType + " instance '" + instanceValue
454                + "' is not valid. Coordinator job NOT SUBMITTED. " + correctAction);
455    }
456
457    private void handleExpresionWithMultipleInstances(String eventType, String dataType, String instanceValue)
458            throws CoordinatorJobException {
459        String correctAction = null;
460        if(dataType.equals(COORD_INPUT_EVENTS_DATA_IN)) {
461            correctAction = "Coordinator app definition should have separate <instance> tag per data-in instance";
462        } else if(dataType.equals(COORD_OUTPUT_EVENTS_DATA_OUT)) {
463            correctAction = "Coordinator app definition can have only one <instance> tag per data-out instance";
464        }
465        throw new CoordinatorJobException(ErrorCode.E1021, eventType + " instance '" + instanceValue
466                + "' contains more than one date instance. Coordinator job NOT SUBMITTED. " + correctAction);
467    }
468
469    private void handleExpresionWithStartMultipleInstances(String eventType, String dataType, String instanceValue)
470            throws CoordinatorJobException {
471        String correctAction = "Coordinator app definition should not have multiple start-instances";
472        throw new CoordinatorJobException(ErrorCode.E1021, eventType + " start-instance '" + instanceValue
473                + "' contains more than one date start-instance. Coordinator job NOT SUBMITTED. " + correctAction);
474    }
475
476    private void handleExpresionWithMultipleEndInstances(String eventType, String dataType, String instanceValue)
477            throws CoordinatorJobException {
478        String correctAction = "Coordinator app definition should not have multiple end-instances";
479        throw new CoordinatorJobException(ErrorCode.E1021, eventType + " end-instance '" + instanceValue
480                + "' contains more than one date end-instance. Coordinator job NOT SUBMITTED. " + correctAction);
481    }
482    /**
483     * Read the application XML and validate against coordinator Schema
484     *
485     * @return validated coordinator XML
486     * @throws CoordinatorJobException thrown if unable to read or validate coordinator xml
487     */
488    protected String readAndValidateXml() throws CoordinatorJobException {
489        String appPath = ParamChecker.notEmpty(conf.get(OozieClient.COORDINATOR_APP_PATH),
490                OozieClient.COORDINATOR_APP_PATH);
491        String coordXml = readDefinition(appPath);
492        validateXml(coordXml);
493        return coordXml;
494    }
495
496    /**
497     * Validate against Coordinator XSD file
498     *
499     * @param xmlContent : Input coordinator xml
500     * @throws CoordinatorJobException thrown if unable to validate coordinator xml
501     */
502    private void validateXml(String xmlContent) throws CoordinatorJobException {
503        javax.xml.validation.Schema schema = Services.get().get(SchemaService.class).getSchema(SchemaName.COORDINATOR);
504        Validator validator = schema.newValidator();
505        try {
506            validator.validate(new StreamSource(new StringReader(xmlContent)));
507        }
508        catch (SAXException ex) {
509            LOG.warn("SAXException :", ex);
510            throw new CoordinatorJobException(ErrorCode.E0701, ex.getMessage(), ex);
511        }
512        catch (IOException ex) {
513            LOG.warn("IOException :", ex);
514            throw new CoordinatorJobException(ErrorCode.E0702, ex.getMessage(), ex);
515        }
516    }
517
518    /**
519     * Read the application XML schema namespace
520     *
521     * @param coordXmlElement input coordinator xml Element
522     * @return app xml namespace
523     * @throws CoordinatorJobException
524     */
525    private String readAppNamespace(Element coordXmlElement) throws CoordinatorJobException {
526        Namespace ns = coordXmlElement.getNamespace();
527        if (ns != null && bundleId != null && ns.getURI().equals(SchemaService.COORDINATOR_NAMESPACE_URI_1)) {
528            throw new CoordinatorJobException(ErrorCode.E1319, "bundle app can not submit coordinator namespace "
529                    + SchemaService.COORDINATOR_NAMESPACE_URI_1 + ", please use 0.2 or later");
530        }
531        if (ns != null) {
532            return ns.getURI();
533        }
534        else {
535            throw new CoordinatorJobException(ErrorCode.E0700, "the application xml namespace is not given");
536        }
537    }
538
539    /**
540     * Merge default configuration with user-defined configuration.
541     *
542     * @throws CommandException thrown if failed to read or merge configurations
543     */
544    protected void mergeDefaultConfig() throws CommandException {
545        Path configDefault = null;
546        try {
547            String coordAppPathStr = conf.get(OozieClient.COORDINATOR_APP_PATH);
548            Path coordAppPath = new Path(coordAppPathStr);
549            String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
550            HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
551            Configuration fsConf = has.createJobConf(coordAppPath.toUri().getAuthority());
552            FileSystem fs = has.createFileSystem(user, coordAppPath.toUri(), fsConf);
553
554            // app path could be a directory
555            if (!fs.isFile(coordAppPath)) {
556                configDefault = new Path(coordAppPath, CONFIG_DEFAULT);
557            } else {
558                configDefault = new Path(coordAppPath.getParent(), CONFIG_DEFAULT);
559            }
560
561            if (fs.exists(configDefault)) {
562                Configuration defaultConf = new XConfiguration(fs.open(configDefault));
563                PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES);
564                XConfiguration.injectDefaults(defaultConf, conf);
565            }
566            else {
567                LOG.info("configDefault Doesn't exist " + configDefault);
568            }
569            PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES);
570
571            // Resolving all variables in the job properties.
572            // This ensures the Hadoop Configuration semantics is preserved.
573            XConfiguration resolvedVarsConf = new XConfiguration();
574            for (Map.Entry<String, String> entry : conf) {
575                resolvedVarsConf.set(entry.getKey(), conf.get(entry.getKey()));
576            }
577            conf = resolvedVarsConf;
578        }
579        catch (IOException e) {
580            throw new CommandException(ErrorCode.E0702, e.getMessage() + " : Problem reading default config "
581                    + configDefault, e);
582        }
583        catch (HadoopAccessorException e) {
584            throw new CommandException(e);
585        }
586        LOG.debug("Merged CONF :" + XmlUtils.prettyPrint(conf).toString());
587    }
588
589    /**
590     * The method resolve all the variables that are defined in configuration. It also include the data set definition
591     * from dataset file into XML.
592     *
593     * @param appXml : Original job XML
594     * @param conf : Configuration of the job
595     * @param coordJob : Coordinator job bean to be populated.
596     * @return Resolved and modified job XML element.
597     * @throws CoordinatorJobException thrown if failed to resolve basic entities or include referred datasets
598     * @throws Exception thrown if failed to resolve basic entities or include referred datasets
599     */
600    public Element basicResolveAndIncludeDS(String appXml, Configuration conf, CoordinatorJobBean coordJob)
601    throws CoordinatorJobException, Exception {
602        Element basicResolvedApp = resolveInitial(conf, appXml, coordJob);
603        includeDataSets(basicResolvedApp, conf);
604        return basicResolvedApp;
605    }
606
607    /**
608     * Insert data set into data-in and data-out tags.
609     *
610     * @param eAppXml : coordinator application XML
611     * @param eDatasets : DataSet XML
612     */
613    @SuppressWarnings("unchecked")
614    private void insertDataSet(Element eAppXml, Element eDatasets) {
615        // Adding DS definition in the coordinator XML
616        Element inputList = eAppXml.getChild("input-events", eAppXml.getNamespace());
617        if (inputList != null) {
618            for (Element dataIn : (List<Element>) inputList.getChildren("data-in", eAppXml.getNamespace())) {
619                Element eDataset = findDataSet(eDatasets, dataIn.getAttributeValue("dataset"));
620                dataIn.getContent().add(0, eDataset);
621            }
622        }
623        Element outputList = eAppXml.getChild("output-events", eAppXml.getNamespace());
624        if (outputList != null) {
625            for (Element dataOut : (List<Element>) outputList.getChildren("data-out", eAppXml.getNamespace())) {
626                Element eDataset = findDataSet(eDatasets, dataOut.getAttributeValue("dataset"));
627                dataOut.getContent().add(0, eDataset);
628            }
629        }
630    }
631
632    /**
633     * Find a specific dataset from a list of Datasets.
634     *
635     * @param eDatasets : List of data sets
636     * @param name : queried data set name
637     * @return one Dataset element. otherwise throw Exception
638     */
639    @SuppressWarnings("unchecked")
640    private static Element findDataSet(Element eDatasets, String name) {
641        for (Element eDataset : (List<Element>) eDatasets.getChildren("dataset", eDatasets.getNamespace())) {
642            if (eDataset.getAttributeValue("name").equals(name)) {
643                eDataset = (Element) eDataset.clone();
644                eDataset.detach();
645                return eDataset;
646            }
647        }
648        throw new RuntimeException("undefined dataset: " + name);
649    }
650
651    /**
652     * Initialize all the required EL Evaluators.
653     */
654    protected void initEvaluators() {
655        evalFreq = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-submit-freq");
656        evalNofuncs = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-submit-nofuncs");
657        evalInst = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-submit-instances");
658        evalAction = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-action-start");
659        evalTimeout = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-wait-timeout");
660    }
661
662    /**
663     * Resolve basic entities using job Configuration.
664     *
665     * @param conf :Job configuration
666     * @param appXml : Original job XML
667     * @param coordJob : Coordinator job bean to be populated.
668     * @return Resolved job XML element.
669     * @throws CoordinatorJobException thrown if failed to resolve basic entities
670     * @throws Exception thrown if failed to resolve basic entities
671     */
672    @SuppressWarnings("unchecked")
673    protected Element resolveInitial(Configuration conf, String appXml, CoordinatorJobBean coordJob)
674    throws CoordinatorJobException, Exception {
675        Element eAppXml = XmlUtils.parseXml(appXml);
676        // job's main attributes
677        // frequency
678        String val = resolveAttribute("frequency", eAppXml, evalFreq);
679        int ival = 0;
680
681        val = ParamChecker.checkFrequency(val);
682        coordJob.setFrequency(val);
683        TimeUnit tmp = (evalFreq.getVariable("timeunit") == null) ? TimeUnit.MINUTE : ((TimeUnit) evalFreq
684                .getVariable("timeunit"));
685        try {
686            Integer.parseInt(val);
687        }
688        catch (NumberFormatException ex) {
689            tmp=TimeUnit.CRON;
690        }
691
692        addAnAttribute("freq_timeunit", eAppXml, tmp.toString());
693        // TimeUnit
694        coordJob.setTimeUnit(CoordinatorJob.Timeunit.valueOf(tmp.toString()));
695        // End Of Duration
696        tmp = evalFreq.getVariable("endOfDuration") == null ? TimeUnit.NONE : ((TimeUnit) evalFreq
697                .getVariable("endOfDuration"));
698        addAnAttribute("end_of_duration", eAppXml, tmp.toString());
699        // coordJob.setEndOfDuration(tmp) // TODO: Add new attribute in Job bean
700
701        // Application name
702        if (this.coordName == null) {
703            String name = ELUtils.resolveAppName(eAppXml.getAttribute("name").getValue(), conf);
704            coordJob.setAppName(name);
705        }
706        else {
707            // this coord job is created from bundle
708            coordJob.setAppName(this.coordName);
709        }
710
711        // start time
712        val = resolveAttribute("start", eAppXml, evalNofuncs);
713        ParamChecker.checkDateOozieTZ(val, "start");
714        coordJob.setStartTime(DateUtils.parseDateOozieTZ(val));
715        // end time
716        val = resolveAttribute("end", eAppXml, evalNofuncs);
717        ParamChecker.checkDateOozieTZ(val, "end");
718        coordJob.setEndTime(DateUtils.parseDateOozieTZ(val));
719        // Time zone
720        val = resolveAttribute("timezone", eAppXml, evalNofuncs);
721        ParamChecker.checkTimeZone(val, "timezone");
722        coordJob.setTimeZone(val);
723
724        // controls
725        val = resolveTagContents("timeout", eAppXml.getChild("controls", eAppXml.getNamespace()), evalTimeout);
726        if (val != null && val != "") {
727            int t = Integer.parseInt(val);
728            tmp = (evalTimeout.getVariable("timeunit") == null) ? TimeUnit.MINUTE : ((TimeUnit) evalTimeout
729                    .getVariable("timeunit"));
730            switch (tmp) {
731                case HOUR:
732                    val = String.valueOf(t * 60);
733                    break;
734                case DAY:
735                    val = String.valueOf(t * 60 * 24);
736                    break;
737                case MONTH:
738                    val = String.valueOf(t * 60 * 24 * 30);
739                    break;
740                default:
741                    break;
742            }
743        }
744        else {
745            val = Services.get().getConf().get(CONF_DEFAULT_TIMEOUT_NORMAL);
746        }
747
748        ival = ParamChecker.checkInteger(val, "timeout");
749        if (ival < 0 || ival > Services.get().getConf().getInt(CONF_DEFAULT_MAX_TIMEOUT, 129600)) {
750            ival = Services.get().getConf().getInt(CONF_DEFAULT_MAX_TIMEOUT, 129600);
751        }
752        coordJob.setTimeout(ival);
753
754        val = resolveTagContents("concurrency", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs);
755        if (val == null || val.isEmpty()) {
756            val = Services.get().getConf().get(CONF_DEFAULT_CONCURRENCY, "1");
757        }
758        ival = ParamChecker.checkInteger(val, "concurrency");
759        coordJob.setConcurrency(ival);
760
761        val = resolveTagContents("throttle", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs);
762        if (val == null || val.isEmpty()) {
763            int defaultThrottle = Services.get().getConf().getInt(CONF_DEFAULT_THROTTLE, 12);
764            ival = defaultThrottle;
765        }
766        else {
767            ival = ParamChecker.checkInteger(val, "throttle");
768        }
769        int maxQueue = Services.get().getConf().getInt(CONF_QUEUE_SIZE, 10000);
770        float factor = Services.get().getConf().getFloat(CONF_MAT_THROTTLING_FACTOR, 0.10f);
771        int maxThrottle = (int) (maxQueue * factor);
772        if (ival > maxThrottle || ival < 1) {
773            ival = maxThrottle;
774        }
775        LOG.debug("max throttle " + ival);
776        coordJob.setMatThrottling(ival);
777
778        val = resolveTagContents("execution", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs);
779        if (val == "") {
780            val = Execution.FIFO.toString();
781        }
782        coordJob.setExecutionOrder(Execution.valueOf(val));
783        String[] acceptedVals = { Execution.LIFO.toString(), Execution.FIFO.toString(), Execution.LAST_ONLY.toString(),
784            Execution.NONE.toString()};
785        ParamChecker.isMember(val, acceptedVals, "execution");
786
787        // datasets
788        resolveTagContents("include", eAppXml.getChild("datasets", eAppXml.getNamespace()), evalNofuncs);
789        // for each data set
790        resolveDataSets(eAppXml);
791        HashMap<String, String> dataNameList = new HashMap<String, String>();
792        resolveIODataset(eAppXml);
793        resolveIOEvents(eAppXml, dataNameList);
794
795        resolveTagContents("app-path", eAppXml.getChild("action", eAppXml.getNamespace()).getChild("workflow",
796                eAppXml.getNamespace()), evalNofuncs);
797        // TODO: If action or workflow tag is missing, NullPointerException will
798        // occur
799        Element configElem = eAppXml.getChild("action", eAppXml.getNamespace()).getChild("workflow",
800                eAppXml.getNamespace()).getChild("configuration", eAppXml.getNamespace());
801        evalData = CoordELEvaluator.createELEvaluatorForDataEcho(conf, "coord-job-submit-data", dataNameList);
802        if (configElem != null) {
803            for (Element propElem : (List<Element>) configElem.getChildren("property", configElem.getNamespace())) {
804                resolveTagContents("name", propElem, evalData);
805                // Want to check the data-integrity but don't want to modify the
806                // XML
807                // for properties only
808                Element tmpProp = (Element) propElem.clone();
809                resolveTagContents("value", tmpProp, evalData);
810            }
811        }
812        evalSla = CoordELEvaluator.createELEvaluatorForDataAndConf(conf, "coord-sla-submit", dataNameList);
813        resolveSLA(eAppXml, coordJob);
814        return eAppXml;
815    }
816
817    /**
818     * Resolve SLA events
819     *
820     * @param eAppXml job XML
821     * @param coordJob coordinator job bean
822     * @throws CommandException thrown if failed to resolve sla events
823     */
824    private void resolveSLA(Element eAppXml, CoordinatorJobBean coordJob) throws CommandException {
825        Element eSla = XmlUtils.getSLAElement(eAppXml.getChild("action", eAppXml.getNamespace()));
826
827        if (eSla != null) {
828            String slaXml = XmlUtils.prettyPrint(eSla).toString();
829            try {
830                // EL evaluation
831                slaXml = evalSla.evaluate(slaXml, String.class);
832                // Validate against semantic SXD
833                XmlUtils.validateData(slaXml, SchemaName.SLA_ORIGINAL);
834            }
835            catch (Exception e) {
836                throw new CommandException(ErrorCode.E1004, "Validation ERROR :" + e.getMessage(), e);
837            }
838        }
839    }
840
841    /**
842     * Resolve input-events/data-in and output-events/data-out tags.
843     *
844     * @param eJobOrg : Job element
845     * @throws CoordinatorJobException thrown if failed to resolve input and output events
846     */
847    @SuppressWarnings("unchecked")
848    private void resolveIOEvents(Element eJobOrg, HashMap<String, String> dataNameList) throws CoordinatorJobException {
849        // Resolving input-events/data-in
850        // Clone the job and don't update anything in the original
851        Element eJob = (Element) eJobOrg.clone();
852        Element inputList = eJob.getChild("input-events", eJob.getNamespace());
853        if (inputList != null) {
854            TreeSet<String> eventNameSet = new TreeSet<String>();
855            for (Element dataIn : (List<Element>) inputList.getChildren("data-in", eJob.getNamespace())) {
856                String dataInName = dataIn.getAttributeValue("name");
857                dataNameList.put(dataInName, "data-in");
858                // check whether there is any duplicate data-in name
859                if (eventNameSet.contains(dataInName)) {
860                    throw new RuntimeException("Duplicate dataIn name " + dataInName);
861                }
862                else {
863                    eventNameSet.add(dataInName);
864                }
865                resolveTagContents("instance", dataIn, evalInst);
866                resolveTagContents("start-instance", dataIn, evalInst);
867                resolveTagContents("end-instance", dataIn, evalInst);
868
869            }
870        }
871        // Resolving output-events/data-out
872        Element outputList = eJob.getChild("output-events", eJob.getNamespace());
873        if (outputList != null) {
874            TreeSet<String> eventNameSet = new TreeSet<String>();
875            for (Element dataOut : (List<Element>) outputList.getChildren("data-out", eJob.getNamespace())) {
876                String dataOutName = dataOut.getAttributeValue("name");
877                dataNameList.put(dataOutName, "data-out");
878                // check whether there is any duplicate data-out name
879                if (eventNameSet.contains(dataOutName)) {
880                    throw new RuntimeException("Duplicate dataIn name " + dataOutName);
881                }
882                else {
883                    eventNameSet.add(dataOutName);
884                }
885                resolveTagContents("instance", dataOut, evalInst);
886
887            }
888        }
889
890    }
891
892    /**
893     * Resolve input-events/dataset and output-events/dataset tags.
894     *
895     * @param eJob : Job element
896     * @throws CoordinatorJobException thrown if failed to resolve input and output events
897     */
898    @SuppressWarnings("unchecked")
899    private void resolveIODataset(Element eAppXml) throws CoordinatorJobException {
900        // Resolving input-events/data-in
901        Element inputList = eAppXml.getChild("input-events", eAppXml.getNamespace());
902        if (inputList != null) {
903            for (Element dataIn : (List<Element>) inputList.getChildren("data-in", eAppXml.getNamespace())) {
904                resolveAttribute("dataset", dataIn, evalInst);
905
906            }
907        }
908        // Resolving output-events/data-out
909        Element outputList = eAppXml.getChild("output-events", eAppXml.getNamespace());
910        if (outputList != null) {
911            for (Element dataOut : (List<Element>) outputList.getChildren("data-out", eAppXml.getNamespace())) {
912                resolveAttribute("dataset", dataOut, evalInst);
913
914            }
915        }
916
917    }
918
919
920    /**
921     * Add an attribute into XML element.
922     *
923     * @param attrName :attribute name
924     * @param elem : Element to add attribute
925     * @param value :Value of attribute
926     */
927    private void addAnAttribute(String attrName, Element elem, String value) {
928        elem.setAttribute(attrName, value);
929    }
930
931    /**
932     * Resolve datasets using job configuration.
933     *
934     * @param eAppXml : Job Element XML
935     * @throws Exception thrown if failed to resolve datasets
936     */
937    @SuppressWarnings("unchecked")
938    private void resolveDataSets(Element eAppXml) throws Exception {
939        Element datasetList = eAppXml.getChild("datasets", eAppXml.getNamespace());
940        if (datasetList != null) {
941
942            List<Element> dsElems = datasetList.getChildren("dataset", eAppXml.getNamespace());
943            resolveDataSets(dsElems);
944            resolveTagContents("app-path", eAppXml.getChild("action", eAppXml.getNamespace()).getChild("workflow",
945                    eAppXml.getNamespace()), evalNofuncs);
946        }
947    }
948
949    /**
950     * Resolve datasets using job configuration.
951     *
952     * @param dsElems : Data set XML element.
953     * @throws CoordinatorJobException thrown if failed to resolve datasets
954     */
955    private void resolveDataSets(List<Element> dsElems) throws CoordinatorJobException {
956        for (Element dsElem : dsElems) {
957            // Setting up default TimeUnit and EndOFDuraion
958            evalFreq.setVariable("timeunit", TimeUnit.MINUTE);
959            evalFreq.setVariable("endOfDuration", TimeUnit.NONE);
960
961            String val = resolveAttribute("frequency", dsElem, evalFreq);
962            int ival = ParamChecker.checkInteger(val, "frequency");
963            ParamChecker.checkGTZero(ival, "frequency");
964            addAnAttribute("freq_timeunit", dsElem, evalFreq.getVariable("timeunit") == null ? TimeUnit.MINUTE
965                    .toString() : ((TimeUnit) evalFreq.getVariable("timeunit")).toString());
966            addAnAttribute("end_of_duration", dsElem, evalFreq.getVariable("endOfDuration") == null ? TimeUnit.NONE
967                    .toString() : ((TimeUnit) evalFreq.getVariable("endOfDuration")).toString());
968            val = resolveAttribute("initial-instance", dsElem, evalNofuncs);
969            ParamChecker.checkDateOozieTZ(val, "initial-instance");
970            checkInitialInstance(val);
971            val = resolveAttribute("timezone", dsElem, evalNofuncs);
972            ParamChecker.checkTimeZone(val, "timezone");
973            resolveTagContents("uri-template", dsElem, evalNofuncs);
974            resolveTagContents("done-flag", dsElem, evalNofuncs);
975        }
976    }
977
978    /**
979     * Resolve the content of a tag.
980     *
981     * @param tagName : Tag name of job XML i.e. <timeout> 10 </timeout>
982     * @param elem : Element where the tag exists.
983     * @param eval : EL evealuator
984     * @return Resolved tag content.
985     * @throws CoordinatorJobException thrown if failed to resolve tag content
986     */
987    @SuppressWarnings("unchecked")
988    private String resolveTagContents(String tagName, Element elem, ELEvaluator eval) throws CoordinatorJobException {
989        String ret = "";
990        if (elem != null) {
991            for (Element tagElem : (List<Element>) elem.getChildren(tagName, elem.getNamespace())) {
992                if (tagElem != null) {
993                    String updated;
994                    try {
995                        updated = CoordELFunctions.evalAndWrap(eval, tagElem.getText().trim());
996
997                    }
998                    catch (Exception e) {
999                        throw new CoordinatorJobException(ErrorCode.E1004, e.getMessage(), e);
1000                    }
1001                    tagElem.removeContent();
1002                    tagElem.addContent(updated);
1003                    ret += updated;
1004                }
1005            }
1006        }
1007        return ret;
1008    }
1009
1010    /**
1011     * Resolve an attribute value.
1012     *
1013     * @param attrName : Attribute name.
1014     * @param elem : XML Element where attribute is defiend
1015     * @param eval : ELEvaluator used to resolve
1016     * @return Resolved attribute value
1017     * @throws CoordinatorJobException thrown if failed to resolve an attribute value
1018     */
1019    private String resolveAttribute(String attrName, Element elem, ELEvaluator eval) throws CoordinatorJobException {
1020        Attribute attr = elem.getAttribute(attrName);
1021        String val = null;
1022        if (attr != null) {
1023            try {
1024                val = CoordELFunctions.evalAndWrap(eval, attr.getValue().trim());
1025            }
1026            catch (Exception e) {
1027                throw new CoordinatorJobException(ErrorCode.E1004, e.getMessage(), e);
1028            }
1029            attr.setValue(val);
1030        }
1031        return val;
1032    }
1033
1034    /**
1035     * Include referred datasets into XML.
1036     *
1037     * @param resolvedXml : Job XML element.
1038     * @param conf : Job configuration
1039     * @throws CoordinatorJobException thrown if failed to include referred datasets into XML
1040     */
1041    @SuppressWarnings("unchecked")
1042    protected void includeDataSets(Element resolvedXml, Configuration conf) throws CoordinatorJobException {
1043        Element datasets = resolvedXml.getChild("datasets", resolvedXml.getNamespace());
1044        Element allDataSets = new Element("all_datasets", resolvedXml.getNamespace());
1045        List<String> dsList = new ArrayList<String>();
1046        if (datasets != null) {
1047            for (Element includeElem : (List<Element>) datasets.getChildren("include", datasets.getNamespace())) {
1048                String incDSFile = includeElem.getTextTrim();
1049                includeOneDSFile(incDSFile, dsList, allDataSets, datasets.getNamespace());
1050            }
1051            for (Element e : (List<Element>) datasets.getChildren("dataset", datasets.getNamespace())) {
1052                String dsName = e.getAttributeValue("name");
1053                if (dsList.contains(dsName)) {// Override with this DS
1054                    // Remove duplicate
1055                    removeDataSet(allDataSets, dsName);
1056                }
1057                else {
1058                    dsList.add(dsName);
1059                }
1060                allDataSets.addContent((Element) e.clone());
1061            }
1062        }
1063        insertDataSet(resolvedXml, allDataSets);
1064        resolvedXml.removeChild("datasets", resolvedXml.getNamespace());
1065    }
1066
1067    /**
1068     * Include one dataset file.
1069     *
1070     * @param incDSFile : Include data set filename.
1071     * @param dsList :List of dataset names to verify the duplicate.
1072     * @param allDataSets : Element that includes all dataset definitions.
1073     * @param dsNameSpace : Data set name space
1074     * @throws CoordinatorJobException thrown if failed to include one dataset file
1075     */
1076    @SuppressWarnings("unchecked")
1077    private void includeOneDSFile(String incDSFile, List<String> dsList, Element allDataSets, Namespace dsNameSpace)
1078    throws CoordinatorJobException {
1079        Element tmpDataSets = null;
1080        try {
1081            String dsXml = readDefinition(incDSFile);
1082            LOG.debug("DSFILE :" + incDSFile + "\n" + dsXml);
1083            tmpDataSets = XmlUtils.parseXml(dsXml);
1084        }
1085        catch (JDOMException e) {
1086            LOG.warn("Error parsing included dataset [{0}].  Message [{1}]", incDSFile, e.getMessage());
1087            throw new CoordinatorJobException(ErrorCode.E0700, e.getMessage());
1088        }
1089        resolveDataSets(tmpDataSets.getChildren("dataset"));
1090        for (Element e : (List<Element>) tmpDataSets.getChildren("dataset")) {
1091            String dsName = e.getAttributeValue("name");
1092            if (dsList.contains(dsName)) {
1093                throw new RuntimeException("Duplicate Dataset " + dsName);
1094            }
1095            dsList.add(dsName);
1096            Element tmp = (Element) e.clone();
1097            // TODO: Don't like to over-write the external/include DS's namespace
1098            tmp.setNamespace(dsNameSpace);
1099            tmp.getChild("uri-template").setNamespace(dsNameSpace);
1100            if (e.getChild("done-flag") != null) {
1101                tmp.getChild("done-flag").setNamespace(dsNameSpace);
1102            }
1103            allDataSets.addContent(tmp);
1104        }
1105        // nested include
1106        for (Element includeElem : (List<Element>) tmpDataSets.getChildren("include", tmpDataSets.getNamespace())) {
1107            String incFile = includeElem.getTextTrim();
1108            includeOneDSFile(incFile, dsList, allDataSets, dsNameSpace);
1109        }
1110    }
1111
1112    /**
1113     * Remove a dataset from a list of dataset.
1114     *
1115     * @param eDatasets : List of dataset
1116     * @param name : Dataset name to be removed.
1117     */
1118    @SuppressWarnings("unchecked")
1119    private static void removeDataSet(Element eDatasets, String name) {
1120        for (Element eDataset : (List<Element>) eDatasets.getChildren("dataset", eDatasets.getNamespace())) {
1121            if (eDataset.getAttributeValue("name").equals(name)) {
1122                eDataset.detach();
1123                return;
1124            }
1125        }
1126        throw new RuntimeException("undefined dataset: " + name);
1127    }
1128
1129    /**
1130     * Read coordinator definition.
1131     *
1132     * @param appPath application path.
1133     * @return coordinator definition.
1134     * @throws CoordinatorJobException thrown if the definition could not be read.
1135     */
1136    protected String readDefinition(String appPath) throws CoordinatorJobException {
1137        String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
1138        // Configuration confHadoop = CoordUtils.getHadoopConf(conf);
1139        try {
1140            URI uri = new URI(appPath);
1141            LOG.debug("user =" + user);
1142            HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
1143            Configuration fsConf = has.createJobConf(uri.getAuthority());
1144            FileSystem fs = has.createFileSystem(user, uri, fsConf);
1145            Path appDefPath = null;
1146
1147            // app path could be a directory
1148            Path path = new Path(uri.getPath());
1149            // check file exists for dataset include file, app xml already checked
1150            if (!fs.exists(path)) {
1151                throw new URISyntaxException(path.toString(), "path not existed : " + path.toString());
1152            }
1153            if (!fs.isFile(path)) {
1154                appDefPath = new Path(path, COORDINATOR_XML_FILE);
1155            } else {
1156                appDefPath = path;
1157            }
1158
1159            Reader reader = new InputStreamReader(fs.open(appDefPath));
1160            StringWriter writer = new StringWriter();
1161            IOUtils.copyCharStream(reader, writer);
1162            return writer.toString();
1163        }
1164        catch (IOException ex) {
1165            LOG.warn("IOException :" + XmlUtils.prettyPrint(conf), ex);
1166            throw new CoordinatorJobException(ErrorCode.E1001, ex.getMessage(), ex);
1167        }
1168        catch (URISyntaxException ex) {
1169            LOG.warn("URISyException :" + ex.getMessage());
1170            throw new CoordinatorJobException(ErrorCode.E1002, appPath, ex.getMessage(), ex);
1171        }
1172        catch (HadoopAccessorException ex) {
1173            throw new CoordinatorJobException(ex);
1174        }
1175        catch (Exception ex) {
1176            LOG.warn("Exception :", ex);
1177            throw new CoordinatorJobException(ErrorCode.E1001, ex.getMessage(), ex);
1178        }
1179    }
1180
1181    /**
1182     * Write a coordinator job into database
1183     *
1184     *@param appXML : Coordinator definition xml
1185     * @param eJob : XML element of job
1186     * @param coordJob : Coordinator job bean
1187     * @return Job id
1188     * @throws CommandException thrown if unable to save coordinator job to db
1189     */
1190    protected String storeToDB(String appXML, Element eJob, CoordinatorJobBean coordJob) throws CommandException {
1191        String jobId = Services.get().get(UUIDService.class).generateId(ApplicationType.COORDINATOR);
1192        coordJob.setId(jobId);
1193
1194        coordJob.setAppPath(conf.get(OozieClient.COORDINATOR_APP_PATH));
1195        coordJob.setCreatedTime(new Date());
1196        coordJob.setUser(conf.get(OozieClient.USER_NAME));
1197        String group = ConfigUtils.getWithDeprecatedCheck(conf, OozieClient.JOB_ACL, OozieClient.GROUP_NAME, null);
1198        coordJob.setGroup(group);
1199        coordJob.setConf(XmlUtils.prettyPrint(conf).toString());
1200        coordJob.setJobXml(XmlUtils.prettyPrint(eJob).toString());
1201        coordJob.setLastActionNumber(0);
1202        coordJob.setLastModifiedTime(new Date());
1203
1204        if (!dryrun) {
1205            coordJob.setLastModifiedTime(new Date());
1206            try {
1207                CoordJobQueryExecutor.getInstance().insert(coordJob);
1208            }
1209            catch (JPAExecutorException jpaee) {
1210                coordJob.setId(null);
1211                coordJob.setStatus(CoordinatorJob.Status.FAILED);
1212                throw new CommandException(jpaee);
1213            }
1214        }
1215        return jobId;
1216    }
1217
1218    /*
1219     * this method checks if the initial-instance specified for a particular
1220       is not a date earlier than the oozie server default Jan 01, 1970 00:00Z UTC
1221     */
1222    private void checkInitialInstance(String val) throws CoordinatorJobException, IllegalArgumentException {
1223        Date initialInstance, givenInstance;
1224        try {
1225            initialInstance = DateUtils.parseDateUTC("1970-01-01T00:00Z");
1226            givenInstance = DateUtils.parseDateOozieTZ(val);
1227        }
1228        catch (Exception e) {
1229            throw new IllegalArgumentException("Unable to parse dataset initial-instance string '" + val +
1230                                               "' to Date object. ",e);
1231        }
1232        if(givenInstance.compareTo(initialInstance) < 0) {
1233            throw new CoordinatorJobException(ErrorCode.E1021, "Dataset initial-instance " + val +
1234                    " is earlier than the default initial instance " + DateUtils.formatDateOozieTZ(initialInstance));
1235        }
1236    }
1237
1238    /* (non-Javadoc)
1239     * @see org.apache.oozie.command.XCommand#getEntityKey()
1240     */
1241    @Override
1242    public String getEntityKey() {
1243        return null;
1244    }
1245
1246    /* (non-Javadoc)
1247     * @see org.apache.oozie.command.XCommand#isLockRequired()
1248     */
1249    @Override
1250    protected boolean isLockRequired() {
1251        return false;
1252    }
1253
1254    /* (non-Javadoc)
1255     * @see org.apache.oozie.command.XCommand#loadState()
1256     */
1257    @Override
1258    protected void loadState() throws CommandException {
1259        jpaService = Services.get().get(JPAService.class);
1260        if (jpaService == null) {
1261            throw new CommandException(ErrorCode.E0610);
1262        }
1263        coordJob = new CoordinatorJobBean();
1264        if (this.bundleId != null) {
1265            // this coord job is created from bundle
1266            coordJob.setBundleId(this.bundleId);
1267            // first use bundle id if submit thru bundle
1268            logInfo.setParameter(DagXLogInfoService.JOB, this.bundleId);
1269            LogUtils.setLogInfo(logInfo);
1270        }
1271        if (this.coordName != null) {
1272            // this coord job is created from bundle
1273            coordJob.setAppName(this.coordName);
1274        }
1275        setJob(coordJob);
1276
1277    }
1278
1279    /* (non-Javadoc)
1280     * @see org.apache.oozie.command.XCommand#verifyPrecondition()
1281     */
1282    @Override
1283    protected void verifyPrecondition() throws CommandException {
1284
1285    }
1286
1287    /* (non-Javadoc)
1288     * @see org.apache.oozie.command.TransitionXCommand#notifyParent()
1289     */
1290    @Override
1291    public void notifyParent() throws CommandException {
1292        // update bundle action
1293        if (coordJob.getBundleId() != null) {
1294            LOG.debug("Updating bundle record: " + coordJob.getBundleId() + " for coord id: " + coordJob.getId());
1295            BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus);
1296            bundleStatusUpdate.call();
1297        }
1298    }
1299
1300    /* (non-Javadoc)
1301     * @see org.apache.oozie.command.TransitionXCommand#updateJob()
1302     */
1303    @Override
1304    public void updateJob() throws CommandException {
1305    }
1306
1307    /* (non-Javadoc)
1308     * @see org.apache.oozie.command.TransitionXCommand#getJob()
1309     */
1310    @Override
1311    public Job getJob() {
1312        return coordJob;
1313    }
1314
1315    @Override
1316    public void performWrites() throws CommandException {
1317    }
1318}