001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.coord;
019    
020    import java.io.IOException;
021    import java.io.InputStreamReader;
022    import java.io.Reader;
023    import java.io.StringReader;
024    import java.io.StringWriter;
025    import java.net.URI;
026    import java.net.URISyntaxException;
027    import java.util.ArrayList;
028    import java.util.Date;
029    import java.util.HashMap;
030    import java.util.HashSet;
031    import java.util.Iterator;
032    import java.util.List;
033    import java.util.Map;
034    import java.util.Set;
035    import java.util.TreeSet;
036    
037    import javax.xml.transform.stream.StreamSource;
038    import javax.xml.validation.Validator;
039    
040    import org.apache.hadoop.conf.Configuration;
041    import org.apache.hadoop.fs.FileSystem;
042    import org.apache.hadoop.fs.Path;
043    import org.apache.oozie.CoordinatorJobBean;
044    import org.apache.oozie.ErrorCode;
045    import org.apache.oozie.client.CoordinatorJob;
046    import org.apache.oozie.client.Job;
047    import org.apache.oozie.client.OozieClient;
048    import org.apache.oozie.client.CoordinatorJob.Execution;
049    import org.apache.oozie.command.CommandException;
050    import org.apache.oozie.command.SubmitTransitionXCommand;
051    import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
052    import org.apache.oozie.coord.CoordELEvaluator;
053    import org.apache.oozie.coord.CoordELFunctions;
054    import org.apache.oozie.coord.CoordinatorJobException;
055    import org.apache.oozie.coord.TimeUnit;
056    import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor;
057    import org.apache.oozie.executor.jpa.JPAExecutorException;
058    import org.apache.oozie.service.DagXLogInfoService;
059    import org.apache.oozie.service.HadoopAccessorException;
060    import org.apache.oozie.service.HadoopAccessorService;
061    import org.apache.oozie.service.JPAService;
062    import org.apache.oozie.service.SchemaService;
063    import org.apache.oozie.service.Service;
064    import org.apache.oozie.service.Services;
065    import org.apache.oozie.service.UUIDService;
066    import org.apache.oozie.service.SchemaService.SchemaName;
067    import org.apache.oozie.service.UUIDService.ApplicationType;
068    import org.apache.oozie.util.ConfigUtils;
069    import org.apache.oozie.util.DateUtils;
070    import org.apache.oozie.util.ELEvaluator;
071    import org.apache.oozie.util.ELUtils;
072    import org.apache.oozie.util.IOUtils;
073    import org.apache.oozie.util.InstrumentUtils;
074    import org.apache.oozie.util.LogUtils;
075    import org.apache.oozie.util.ParamChecker;
076    import org.apache.oozie.util.ParameterVerifier;
077    import org.apache.oozie.util.ParameterVerifierException;
078    import org.apache.oozie.util.PropertiesUtils;
079    import org.apache.oozie.util.XConfiguration;
080    import org.apache.oozie.util.XLog;
081    import org.apache.oozie.util.XmlUtils;
082    import org.jdom.Attribute;
083    import org.jdom.Element;
084    import org.jdom.JDOMException;
085    import org.jdom.Namespace;
086    import org.xml.sax.SAXException;
087    
088    /**
089     * This class provides the functionalities to resolve a coordinator job XML and write the job information into a DB
090     * table.
091     * <p/>
092     * Specifically it performs the following functions: 1. Resolve all the variables or properties using job
093     * configurations. 2. Insert all datasets definition as part of the <data-in> and <data-out> tags. 3. Validate the XML
094     * at runtime.
095     */
096    public class CoordSubmitXCommand extends SubmitTransitionXCommand {
097    
098        private Configuration conf;
099        private final String bundleId;
100        private final String coordName;
101        private boolean dryrun;
102        private JPAService jpaService = null;
103        private CoordinatorJob.Status prevStatus = CoordinatorJob.Status.PREP;
104    
105        public static final String CONFIG_DEFAULT = "coord-config-default.xml";
106        public static final String COORDINATOR_XML_FILE = "coordinator.xml";
107        public final String COORD_INPUT_EVENTS ="input-events";
108        public final String COORD_OUTPUT_EVENTS = "output-events";
109        public final String COORD_INPUT_EVENTS_DATA_IN ="data-in";
110        public final String COORD_OUTPUT_EVENTS_DATA_OUT = "data-out";
111    
112        private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>();
113        private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>();
114    
115        private CoordinatorJobBean coordJob = null;
116        /**
117         * Default timeout for normal jobs, in minutes, after which coordinator input check will timeout
118         */
119        public static final String CONF_DEFAULT_TIMEOUT_NORMAL = Service.CONF_PREFIX + "coord.normal.default.timeout";
120    
121        public static final String CONF_DEFAULT_CONCURRENCY = Service.CONF_PREFIX + "coord.default.concurrency";
122    
123        public static final String CONF_DEFAULT_THROTTLE = Service.CONF_PREFIX + "coord.default.throttle";
124    
125        public static final String CONF_MAT_THROTTLING_FACTOR = Service.CONF_PREFIX
126                + "coord.materialization.throttling.factor";
127    
128        /**
129         * Default MAX timeout in minutes, after which coordinator input check will timeout
130         */
131        public static final String CONF_DEFAULT_MAX_TIMEOUT = Service.CONF_PREFIX + "coord.default.max.timeout";
132    
133    
134        public static final String CONF_QUEUE_SIZE = Service.CONF_PREFIX + "CallableQueueService.queue.size";
135    
136        private ELEvaluator evalFreq = null;
137        private ELEvaluator evalNofuncs = null;
138        private ELEvaluator evalData = null;
139        private ELEvaluator evalInst = null;
140        private ELEvaluator evalAction = null;
141        private ELEvaluator evalSla = null;
142    
143        static {
144            String[] badUserProps = { PropertiesUtils.YEAR, PropertiesUtils.MONTH, PropertiesUtils.DAY,
145                    PropertiesUtils.HOUR, PropertiesUtils.MINUTE, PropertiesUtils.DAYS, PropertiesUtils.HOURS,
146                    PropertiesUtils.MINUTES, PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB,
147                    PropertiesUtils.TB, PropertiesUtils.PB, PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN,
148                    PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN, PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS };
149            PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES);
150    
151            String[] badDefaultProps = { PropertiesUtils.HADOOP_USER};
152            PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES);
153            PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES);
154        }
155    
156        /**
157         * Constructor to create the Coordinator Submit Command.
158         *
159         * @param conf : Configuration for Coordinator job
160         */
161        public CoordSubmitXCommand(Configuration conf) {
162            super("coord_submit", "coord_submit", 1);
163            this.conf = ParamChecker.notNull(conf, "conf");
164            this.bundleId = null;
165            this.coordName = null;
166        }
167    
168        /**
169         * Constructor to create the Coordinator Submit Command by bundle job.
170         *
171         * @param conf : Configuration for Coordinator job
172         * @param bundleId : bundle id
173         * @param coordName : coord name
174         */
175        public CoordSubmitXCommand(Configuration conf, String bundleId, String coordName) {
176            super("coord_submit", "coord_submit", 1);
177            this.conf = ParamChecker.notNull(conf, "conf");
178            this.bundleId = ParamChecker.notEmpty(bundleId, "bundleId");
179            this.coordName = ParamChecker.notEmpty(coordName, "coordName");
180        }
181    
182        /**
183         * Constructor to create the Coordinator Submit Command.
184         *
185         * @param dryrun : if dryrun
186         * @param conf : Configuration for Coordinator job
187         */
188        public CoordSubmitXCommand(boolean dryrun, Configuration conf) {
189            this(conf);
190            this.dryrun = dryrun;
191        }
192    
193        /* (non-Javadoc)
194         * @see org.apache.oozie.command.XCommand#execute()
195         */
196        @Override
197        protected String submit() throws CommandException {
198            String jobId = null;
199            LOG.info("STARTED Coordinator Submit");
200            InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
201    
202            boolean exceptionOccured = false;
203            try {
204                mergeDefaultConfig();
205    
206                String appXml = readAndValidateXml();
207                coordJob.setOrigJobXml(appXml);
208                LOG.debug("jobXml after initial validation " + XmlUtils.prettyPrint(appXml).toString());
209    
210                Element eXml = XmlUtils.parseXml(appXml);
211    
212                String appNamespace = readAppNamespace(eXml);
213                coordJob.setAppNamespace(appNamespace);
214    
215                ParameterVerifier.verifyParameters(conf, eXml);
216    
217                appXml = XmlUtils.removeComments(appXml);
218                initEvaluators();
219                Element eJob = basicResolveAndIncludeDS(appXml, conf, coordJob);
220    
221                validateCoordinatorJob();
222    
223                // checking if the coordinator application data input/output events
224                // specify multiple data instance values in erroneous manner
225                checkMultipleTimeInstances(eJob, COORD_INPUT_EVENTS, COORD_INPUT_EVENTS_DATA_IN);
226                checkMultipleTimeInstances(eJob, COORD_OUTPUT_EVENTS, COORD_OUTPUT_EVENTS_DATA_OUT);
227    
228                LOG.debug("jobXml after all validation " + XmlUtils.prettyPrint(eJob).toString());
229    
230                jobId = storeToDB(eJob, coordJob);
231                // log job info for coordinator job
232                LogUtils.setLogInfo(coordJob, logInfo);
233                LOG = XLog.resetPrefix(LOG);
234    
235                if (!dryrun) {
236                    // submit a command to materialize jobs for the next 1 hour (3600 secs)
237                    // so we don't wait 10 mins for the Service to run.
238                    queue(new CoordMaterializeTransitionXCommand(jobId, 3600), 100);
239                }
240                else {
241                    Date startTime = coordJob.getStartTime();
242                    long startTimeMilli = startTime.getTime();
243                    long endTimeMilli = startTimeMilli + (3600 * 1000);
244                    Date jobEndTime = coordJob.getEndTime();
245                    Date endTime = new Date(endTimeMilli);
246                    if (endTime.compareTo(jobEndTime) > 0) {
247                        endTime = jobEndTime;
248                    }
249                    jobId = coordJob.getId();
250                    LOG.info("[" + jobId + "]: Update status to RUNNING");
251                    coordJob.setStatus(Job.Status.RUNNING);
252                    coordJob.setPending();
253                    CoordActionMaterializeCommand coordActionMatCom = new CoordActionMaterializeCommand(jobId, startTime,
254                            endTime);
255                    Configuration jobConf = null;
256                    try {
257                        jobConf = new XConfiguration(new StringReader(coordJob.getConf()));
258                    }
259                    catch (IOException e1) {
260                        LOG.warn("Configuration parse error. read from DB :" + coordJob.getConf(), e1);
261                    }
262                    String action = coordActionMatCom.materializeJobs(true, coordJob, jobConf, null);
263                    String output = coordJob.getJobXml() + System.getProperty("line.separator")
264                    + "***actions for instance***" + action;
265                    return output;
266                }
267            }
268            catch (JDOMException jex) {
269                exceptionOccured = true;
270                LOG.warn("ERROR: ", jex);
271                throw new CommandException(ErrorCode.E0700, jex.getMessage(), jex);
272            }
273            catch (CoordinatorJobException cex) {
274                exceptionOccured = true;
275                LOG.warn("ERROR:  ", cex);
276                throw new CommandException(cex);
277            }
278            catch (ParameterVerifierException pex) {
279                exceptionOccured = true;
280                LOG.warn("ERROR: ", pex);
281                throw new CommandException(pex);
282            }
283            catch (IllegalArgumentException iex) {
284                exceptionOccured = true;
285                LOG.warn("ERROR:  ", iex);
286                throw new CommandException(ErrorCode.E1003, iex.getMessage(), iex);
287            }
288            catch (Exception ex) {
289                exceptionOccured = true;
290                LOG.warn("ERROR:  ", ex);
291                throw new CommandException(ErrorCode.E0803, ex.getMessage(), ex);
292            }
293            finally {
294                if (exceptionOccured) {
295                    if(coordJob.getId() == null || coordJob.getId().equalsIgnoreCase("")){
296                        coordJob.setStatus(CoordinatorJob.Status.FAILED);
297                        coordJob.resetPending();
298                    }
299                }
300            }
301    
302            LOG.info("ENDED Coordinator Submit jobId=" + jobId);
303            return jobId;
304        }
305    
306        /**
307         * Method that validates values in the definition for correctness. Placeholder to add more.
308         */
309        private void validateCoordinatorJob() {
310            // check if startTime < endTime
311            if (coordJob.getStartTime().after(coordJob.getEndTime())) {
312                throw new IllegalArgumentException("Coordinator Start Time cannot be greater than End Time.");
313            }
314        }
315    
316      /*
317      * Check against multiple data instance values inside a single <instance> <start-instance> or <end-instance> tag
318      * If found, the job is not submitted and user is informed to correct the error, instead of defaulting to the first instance value in the list
319      */
320        private void checkMultipleTimeInstances(Element eCoordJob, String eventType, String dataType) throws CoordinatorJobException {
321            Element eventsSpec, dataSpec, instance;
322            List<Element> instanceSpecList;
323            Namespace ns = eCoordJob.getNamespace();
324            String instanceValue;
325            eventsSpec = eCoordJob.getChild(eventType, ns);
326            if (eventsSpec != null) {
327                dataSpec = eventsSpec.getChild(dataType, ns);
328                if (dataSpec != null) {
329                    // In case of input-events, there can be multiple child <instance> datasets. Iterating to ensure none of them have errors
330                    instanceSpecList = dataSpec.getChildren("instance", ns);
331                    Iterator instanceIter = instanceSpecList.iterator();
332                    while(instanceIter.hasNext()) {
333                        instance = ((Element) instanceIter.next());
334                        if(instance.getContentSize() == 0) { //empty string or whitespace
335                            throw new CoordinatorJobException(ErrorCode.E1021, "<instance> tag within " + eventType + " is empty!");
336                        }
337                        instanceValue = instance.getContent(0).toString();
338                        boolean isInvalid = false;
339                        try {
340                            isInvalid = evalAction.checkForExistence(instanceValue, ",");
341                        } catch (Exception e) {
342                            handleELParseException(eventType, dataType, instanceValue);
343                        }
344                        if (isInvalid) { // reaching this block implies instance is not empty i.e. length > 0
345                            handleExpresionWithMultipleInstances(eventType, dataType, instanceValue);
346                        }
347                    }
348    
349                    // In case of input-events, there can be multiple child <start-instance> datasets. Iterating to ensure none of them have errors
350                    instanceSpecList = dataSpec.getChildren("start-instance", ns);
351                    instanceIter = instanceSpecList.iterator();
352                    while(instanceIter.hasNext()) {
353                        instance = ((Element) instanceIter.next());
354                        if(instance.getContentSize() == 0) { //empty string or whitespace
355                            throw new CoordinatorJobException(ErrorCode.E1021, "<start-instance> tag within " + eventType + " is empty!");
356                        }
357                        instanceValue = instance.getContent(0).toString();
358                        boolean isInvalid = false;
359                        try {
360                            isInvalid = evalAction.checkForExistence(instanceValue, ",");
361                        } catch (Exception e) {
362                            handleELParseException(eventType, dataType, instanceValue);
363                        }
364                        if (isInvalid) { // reaching this block implies start instance is not empty i.e. length > 0
365                            handleExpresionWithStartMultipleInstances(eventType, dataType, instanceValue);
366                        }
367                    }
368    
369                    // In case of input-events, there can be multiple child <end-instance> datasets. Iterating to ensure none of them have errors
370                    instanceSpecList = dataSpec.getChildren("end-instance", ns);
371                    instanceIter = instanceSpecList.iterator();
372                    while(instanceIter.hasNext()) {
373                        instance = ((Element) instanceIter.next());
374                        if(instance.getContentSize() == 0) { //empty string or whitespace
375                            throw new CoordinatorJobException(ErrorCode.E1021, "<end-instance> tag within " + eventType + " is empty!");
376                        }
377                        instanceValue = instance.getContent(0).toString();
378                        boolean isInvalid = false;
379                        try {
380                            isInvalid = evalAction.checkForExistence(instanceValue, ",");
381                        } catch (Exception e) {
382                            handleELParseException(eventType, dataType, instanceValue);
383                        }
384                        if (isInvalid) { // reaching this block implies instance is not empty i.e. length > 0
385                            handleExpresionWithMultipleEndInstances(eventType, dataType, instanceValue);
386                        }
387                    }
388    
389                }
390            }
391        }
392    
393        private void handleELParseException(String eventType, String dataType, String instanceValue)
394                throws CoordinatorJobException {
395            String correctAction = null;
396            if(dataType.equals(COORD_INPUT_EVENTS_DATA_IN)) {
397                correctAction = "Coordinator app definition should have valid <instance> tag for data-in";
398            } else if(dataType.equals(COORD_OUTPUT_EVENTS_DATA_OUT)) {
399                correctAction = "Coordinator app definition should have valid <instance> tag for data-out";
400            }
401            throw new CoordinatorJobException(ErrorCode.E1021, eventType + " instance '" + instanceValue
402                    + "' is not valid. Coordinator job NOT SUBMITTED. " + correctAction);
403        }
404    
405        private void handleExpresionWithMultipleInstances(String eventType, String dataType, String instanceValue)
406                throws CoordinatorJobException {
407            String correctAction = null;
408            if(dataType.equals(COORD_INPUT_EVENTS_DATA_IN)) {
409                correctAction = "Coordinator app definition should have separate <instance> tag per data-in instance";
410            } else if(dataType.equals(COORD_OUTPUT_EVENTS_DATA_OUT)) {
411                correctAction = "Coordinator app definition can have only one <instance> tag per data-out instance";
412            }
413            throw new CoordinatorJobException(ErrorCode.E1021, eventType + " instance '" + instanceValue
414                    + "' contains more than one date instance. Coordinator job NOT SUBMITTED. " + correctAction);
415        }
416    
417        private void handleExpresionWithStartMultipleInstances(String eventType, String dataType, String instanceValue)
418                throws CoordinatorJobException {
419            String correctAction = "Coordinator app definition should not have multiple start-instances";
420            throw new CoordinatorJobException(ErrorCode.E1021, eventType + " start-instance '" + instanceValue
421                    + "' contains more than one date start-instance. Coordinator job NOT SUBMITTED. " + correctAction);
422        }
423    
424        private void handleExpresionWithMultipleEndInstances(String eventType, String dataType, String instanceValue)
425                throws CoordinatorJobException {
426            String correctAction = "Coordinator app definition should not have multiple end-instances";
427            throw new CoordinatorJobException(ErrorCode.E1021, eventType + " end-instance '" + instanceValue
428                    + "' contains more than one date end-instance. Coordinator job NOT SUBMITTED. " + correctAction);
429        }
430    
431        /**
432         * Read the application XML and validate against coordinator Schema
433         *
434         * @return validated coordinator XML
435         * @throws CoordinatorJobException thrown if unable to read or validate coordinator xml
436         */
437        private String readAndValidateXml() throws CoordinatorJobException {
438            String appPath = ParamChecker.notEmpty(conf.get(OozieClient.COORDINATOR_APP_PATH),
439                    OozieClient.COORDINATOR_APP_PATH);
440            String coordXml = readDefinition(appPath);
441            validateXml(coordXml);
442            return coordXml;
443        }
444    
445        /**
446         * Validate against Coordinator XSD file
447         *
448         * @param xmlContent : Input coordinator xml
449         * @throws CoordinatorJobException thrown if unable to validate coordinator xml
450         */
451        private void validateXml(String xmlContent) throws CoordinatorJobException {
452            javax.xml.validation.Schema schema = Services.get().get(SchemaService.class).getSchema(SchemaName.COORDINATOR);
453            Validator validator = schema.newValidator();
454            try {
455                validator.validate(new StreamSource(new StringReader(xmlContent)));
456            }
457            catch (SAXException ex) {
458                LOG.warn("SAXException :", ex);
459                throw new CoordinatorJobException(ErrorCode.E0701, ex.getMessage(), ex);
460            }
461            catch (IOException ex) {
462                LOG.warn("IOException :", ex);
463                throw new CoordinatorJobException(ErrorCode.E0702, ex.getMessage(), ex);
464            }
465        }
466    
467        /**
468         * Read the application XML schema namespace
469         *
470         * @param coordXmlElement input coordinator xml Element
471         * @return app xml namespace
472         * @throws CoordinatorJobException
473         */
474        private String readAppNamespace(Element coordXmlElement) throws CoordinatorJobException {
475            Namespace ns = coordXmlElement.getNamespace();
476            if (ns != null && bundleId != null && ns.getURI().equals(SchemaService.COORDINATOR_NAMESPACE_URI_1)) {
477                throw new CoordinatorJobException(ErrorCode.E1319, "bundle app can not submit coordinator namespace "
478                        + SchemaService.COORDINATOR_NAMESPACE_URI_1 + ", please use 0.2 or later");
479            }
480            if (ns != null) {
481                return ns.getURI();
482            }
483            else {
484                throw new CoordinatorJobException(ErrorCode.E0700, "the application xml namespace is not given");
485            }
486        }
487    
488        /**
489         * Merge default configuration with user-defined configuration.
490         *
491         * @throws CommandException thrown if failed to read or merge configurations
492         */
493        protected void mergeDefaultConfig() throws CommandException {
494            Path configDefault = null;
495            try {
496                String coordAppPathStr = conf.get(OozieClient.COORDINATOR_APP_PATH);
497                Path coordAppPath = new Path(coordAppPathStr);
498                String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
499                HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
500                Configuration fsConf = has.createJobConf(coordAppPath.toUri().getAuthority());
501                FileSystem fs = has.createFileSystem(user, coordAppPath.toUri(), fsConf);
502    
503                // app path could be a directory
504                if (!fs.isFile(coordAppPath)) {
505                    configDefault = new Path(coordAppPath, CONFIG_DEFAULT);
506                } else {
507                    configDefault = new Path(coordAppPath.getParent(), CONFIG_DEFAULT);
508                }
509    
510                if (fs.exists(configDefault)) {
511                    Configuration defaultConf = new XConfiguration(fs.open(configDefault));
512                    PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES);
513                    XConfiguration.injectDefaults(defaultConf, conf);
514                }
515                else {
516                    LOG.info("configDefault Doesn't exist " + configDefault);
517                }
518                PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES);
519    
520                // Resolving all variables in the job properties.
521                // This ensures the Hadoop Configuration semantics is preserved.
522                XConfiguration resolvedVarsConf = new XConfiguration();
523                for (Map.Entry<String, String> entry : conf) {
524                    resolvedVarsConf.set(entry.getKey(), conf.get(entry.getKey()));
525                }
526                conf = resolvedVarsConf;
527            }
528            catch (IOException e) {
529                throw new CommandException(ErrorCode.E0702, e.getMessage() + " : Problem reading default config "
530                        + configDefault, e);
531            }
532            catch (HadoopAccessorException e) {
533                throw new CommandException(e);
534            }
535            LOG.debug("Merged CONF :" + XmlUtils.prettyPrint(conf).toString());
536        }
537    
538        /**
539         * The method resolve all the variables that are defined in configuration. It also include the data set definition
540         * from dataset file into XML.
541         *
542         * @param appXml : Original job XML
543         * @param conf : Configuration of the job
544         * @param coordJob : Coordinator job bean to be populated.
545         * @return Resolved and modified job XML element.
546         * @throws CoordinatorJobException thrown if failed to resolve basic entities or include referred datasets
547         * @throws Exception thrown if failed to resolve basic entities or include referred datasets
548         */
549        public Element basicResolveAndIncludeDS(String appXml, Configuration conf, CoordinatorJobBean coordJob)
550        throws CoordinatorJobException, Exception {
551            Element basicResolvedApp = resolveInitial(conf, appXml, coordJob);
552            includeDataSets(basicResolvedApp, conf);
553            return basicResolvedApp;
554        }
555    
556        /**
557         * Insert data set into data-in and data-out tags.
558         *
559         * @param eAppXml : coordinator application XML
560         * @param eDatasets : DataSet XML
561         */
562        @SuppressWarnings("unchecked")
563        private void insertDataSet(Element eAppXml, Element eDatasets) {
564            // Adding DS definition in the coordinator XML
565            Element inputList = eAppXml.getChild("input-events", eAppXml.getNamespace());
566            if (inputList != null) {
567                for (Element dataIn : (List<Element>) inputList.getChildren("data-in", eAppXml.getNamespace())) {
568                    Element eDataset = findDataSet(eDatasets, dataIn.getAttributeValue("dataset"));
569                    dataIn.getContent().add(0, eDataset);
570                }
571            }
572            Element outputList = eAppXml.getChild("output-events", eAppXml.getNamespace());
573            if (outputList != null) {
574                for (Element dataOut : (List<Element>) outputList.getChildren("data-out", eAppXml.getNamespace())) {
575                    Element eDataset = findDataSet(eDatasets, dataOut.getAttributeValue("dataset"));
576                    dataOut.getContent().add(0, eDataset);
577                }
578            }
579        }
580    
581        /**
582         * Find a specific dataset from a list of Datasets.
583         *
584         * @param eDatasets : List of data sets
585         * @param name : queried data set name
586         * @return one Dataset element. otherwise throw Exception
587         */
588        @SuppressWarnings("unchecked")
589        private static Element findDataSet(Element eDatasets, String name) {
590            for (Element eDataset : (List<Element>) eDatasets.getChildren("dataset", eDatasets.getNamespace())) {
591                if (eDataset.getAttributeValue("name").equals(name)) {
592                    eDataset = (Element) eDataset.clone();
593                    eDataset.detach();
594                    return eDataset;
595                }
596            }
597            throw new RuntimeException("undefined dataset: " + name);
598        }
599    
600        /**
601         * Initialize all the required EL Evaluators.
602         */
603        protected void initEvaluators() {
604            evalFreq = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-submit-freq");
605            evalNofuncs = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-submit-nofuncs");
606            evalInst = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-submit-instances");
607            evalSla = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-sla-submit");
608            evalAction = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-action-start");
609        }
610    
611        /**
612         * Resolve basic entities using job Configuration.
613         *
614         * @param conf :Job configuration
615         * @param appXml : Original job XML
616         * @param coordJob : Coordinator job bean to be populated.
617         * @return Resolved job XML element.
618         * @throws CoordinatorJobException thrown if failed to resolve basic entities
619         * @throws Exception thrown if failed to resolve basic entities
620         */
621        @SuppressWarnings("unchecked")
622        protected Element resolveInitial(Configuration conf, String appXml, CoordinatorJobBean coordJob)
623        throws CoordinatorJobException, Exception {
624            Element eAppXml = XmlUtils.parseXml(appXml);
625            // job's main attributes
626            // frequency
627            String val = resolveAttribute("frequency", eAppXml, evalFreq);
628            int ival = ParamChecker.checkInteger(val, "frequency");
629            ParamChecker.checkGTZero(ival, "frequency");
630            coordJob.setFrequency(Integer.toString(ival));
631            TimeUnit tmp = (evalFreq.getVariable("timeunit") == null) ? TimeUnit.MINUTE : ((TimeUnit) evalFreq
632                    .getVariable("timeunit"));
633            addAnAttribute("freq_timeunit", eAppXml, tmp.toString());
634            // TimeUnit
635            coordJob.setTimeUnit(CoordinatorJob.Timeunit.valueOf(tmp.toString()));
636            // End Of Duration
637            tmp = evalFreq.getVariable("endOfDuration") == null ? TimeUnit.NONE : ((TimeUnit) evalFreq
638                    .getVariable("endOfDuration"));
639            addAnAttribute("end_of_duration", eAppXml, tmp.toString());
640            // coordJob.setEndOfDuration(tmp) // TODO: Add new attribute in Job bean
641    
642            // Application name
643            if (this.coordName == null) {
644                String name = ELUtils.resolveAppName(eAppXml.getAttribute("name").getValue(), conf);
645                coordJob.setAppName(name);
646            }
647            else {
648                // this coord job is created from bundle
649                coordJob.setAppName(this.coordName);
650            }
651    
652            // start time
653            val = resolveAttribute("start", eAppXml, evalNofuncs);
654            ParamChecker.checkDateOozieTZ(val, "start");
655            coordJob.setStartTime(DateUtils.parseDateOozieTZ(val));
656            // end time
657            val = resolveAttribute("end", eAppXml, evalNofuncs);
658            ParamChecker.checkDateOozieTZ(val, "end");
659            coordJob.setEndTime(DateUtils.parseDateOozieTZ(val));
660            // Time zone
661            val = resolveAttribute("timezone", eAppXml, evalNofuncs);
662            ParamChecker.checkTimeZone(val, "timezone");
663            coordJob.setTimeZone(val);
664    
665            // controls
666            val = resolveTagContents("timeout", eAppXml.getChild("controls", eAppXml.getNamespace()), evalFreq);
667            if (val == "") {
668                val = Services.get().getConf().get(CONF_DEFAULT_TIMEOUT_NORMAL);
669            }
670    
671            ival = ParamChecker.checkInteger(val, "timeout");
672            if (ival < 0 || ival > Services.get().getConf().getInt(CONF_DEFAULT_MAX_TIMEOUT, 129600)) {
673                ival = Services.get().getConf().getInt(CONF_DEFAULT_MAX_TIMEOUT, 129600);
674            }
675            coordJob.setTimeout(ival);
676    
677            val = resolveTagContents("concurrency", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs);
678            if (val == null || val.isEmpty()) {
679                val = Services.get().getConf().get(CONF_DEFAULT_CONCURRENCY, "1");
680            }
681            ival = ParamChecker.checkInteger(val, "concurrency");
682            coordJob.setConcurrency(ival);
683    
684            val = resolveTagContents("throttle", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs);
685            if (val == null || val.isEmpty()) {
686                int defaultThrottle = Services.get().getConf().getInt(CONF_DEFAULT_THROTTLE, 12);
687                ival = defaultThrottle;
688            }
689            else {
690                ival = ParamChecker.checkInteger(val, "throttle");
691            }
692            int maxQueue = Services.get().getConf().getInt(CONF_QUEUE_SIZE, 10000);
693            float factor = Services.get().getConf().getFloat(CONF_MAT_THROTTLING_FACTOR, 0.10f);
694            int maxThrottle = (int) (maxQueue * factor);
695            if (ival > maxThrottle || ival < 1) {
696                ival = maxThrottle;
697            }
698            LOG.debug("max throttle " + ival);
699            coordJob.setMatThrottling(ival);
700    
701            val = resolveTagContents("execution", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs);
702            if (val == "") {
703                val = Execution.FIFO.toString();
704            }
705            coordJob.setExecution(Execution.valueOf(val));
706            String[] acceptedVals = { Execution.LIFO.toString(), Execution.FIFO.toString(), Execution.LAST_ONLY.toString() };
707            ParamChecker.isMember(val, acceptedVals, "execution");
708    
709            // datasets
710            resolveTagContents("include", eAppXml.getChild("datasets", eAppXml.getNamespace()), evalNofuncs);
711            // for each data set
712            resolveDataSets(eAppXml);
713            HashMap<String, String> dataNameList = new HashMap<String, String>();
714            resolveIOEvents(eAppXml, dataNameList);
715    
716            resolveTagContents("app-path", eAppXml.getChild("action", eAppXml.getNamespace()).getChild("workflow",
717                    eAppXml.getNamespace()), evalNofuncs);
718            // TODO: If action or workflow tag is missing, NullPointerException will
719            // occur
720            Element configElem = eAppXml.getChild("action", eAppXml.getNamespace()).getChild("workflow",
721                    eAppXml.getNamespace()).getChild("configuration", eAppXml.getNamespace());
722            evalData = CoordELEvaluator.createELEvaluatorForDataEcho(conf, "coord-job-submit-data", dataNameList);
723            if (configElem != null) {
724                for (Element propElem : (List<Element>) configElem.getChildren("property", configElem.getNamespace())) {
725                    resolveTagContents("name", propElem, evalData);
726                    // Want to check the data-integrity but don't want to modify the
727                    // XML
728                    // for properties only
729                    Element tmpProp = (Element) propElem.clone();
730                    resolveTagContents("value", tmpProp, evalData);
731                }
732            }
733            resolveSLA(eAppXml, coordJob);
734            return eAppXml;
735        }
736    
737        /**
738         * Resolve SLA events
739         *
740         * @param eAppXml job XML
741         * @param coordJob coordinator job bean
742         * @throws CommandException thrown if failed to resolve sla events
743         */
744        private void resolveSLA(Element eAppXml, CoordinatorJobBean coordJob) throws CommandException {
745            Element eSla = XmlUtils.getSLAElement(eAppXml.getChild("action", eAppXml.getNamespace()));
746    
747            if (eSla != null) {
748                String slaXml = XmlUtils.prettyPrint(eSla).toString();
749                try {
750                    // EL evaluation
751                    slaXml = evalSla.evaluate(slaXml, String.class);
752                    // Validate against semantic SXD
753                    XmlUtils.validateData(slaXml, SchemaName.SLA_ORIGINAL);
754                }
755                catch (Exception e) {
756                    throw new CommandException(ErrorCode.E1004, "Validation ERROR :" + e.getMessage(), e);
757                }
758            }
759        }
760    
761        /**
762         * Resolve input-events/data-in and output-events/data-out tags.
763         *
764         * @param eJob : Job element
765         * @throws CoordinatorJobException thrown if failed to resolve input and output events
766         */
767        @SuppressWarnings("unchecked")
768        private void resolveIOEvents(Element eJobOrg, HashMap<String, String> dataNameList) throws CoordinatorJobException {
769            // Resolving input-events/data-in
770            // Clone the job and don't update anything in the original
771            Element eJob = (Element) eJobOrg.clone();
772            Element inputList = eJob.getChild("input-events", eJob.getNamespace());
773            if (inputList != null) {
774                TreeSet<String> eventNameSet = new TreeSet<String>();
775                for (Element dataIn : (List<Element>) inputList.getChildren("data-in", eJob.getNamespace())) {
776                    String dataInName = dataIn.getAttributeValue("name");
777                    dataNameList.put(dataInName, "data-in");
778                    // check whether there is any duplicate data-in name
779                    if (eventNameSet.contains(dataInName)) {
780                        throw new RuntimeException("Duplicate dataIn name " + dataInName);
781                    }
782                    else {
783                        eventNameSet.add(dataInName);
784                    }
785                    resolveTagContents("instance", dataIn, evalInst);
786                    resolveTagContents("start-instance", dataIn, evalInst);
787                    resolveTagContents("end-instance", dataIn, evalInst);
788                }
789            }
790            // Resolving output-events/data-out
791            Element outputList = eJob.getChild("output-events", eJob.getNamespace());
792            if (outputList != null) {
793                TreeSet<String> eventNameSet = new TreeSet<String>();
794                for (Element dataOut : (List<Element>) outputList.getChildren("data-out", eJob.getNamespace())) {
795                    String dataOutName = dataOut.getAttributeValue("name");
796                    dataNameList.put(dataOutName, "data-out");
797                    // check whether there is any duplicate data-out name
798                    if (eventNameSet.contains(dataOutName)) {
799                        throw new RuntimeException("Duplicate dataIn name " + dataOutName);
800                    }
801                    else {
802                        eventNameSet.add(dataOutName);
803                    }
804                    resolveTagContents("instance", dataOut, evalInst);
805                }
806            }
807    
808        }
809    
810        /**
811         * Add an attribute into XML element.
812         *
813         * @param attrName :attribute name
814         * @param elem : Element to add attribute
815         * @param value :Value of attribute
816         */
817        private void addAnAttribute(String attrName, Element elem, String value) {
818            elem.setAttribute(attrName, value);
819        }
820    
821        /**
822         * Resolve datasets using job configuration.
823         *
824         * @param eAppXml : Job Element XML
825         * @throws Exception thrown if failed to resolve datasets
826         */
827        @SuppressWarnings("unchecked")
828        private void resolveDataSets(Element eAppXml) throws Exception {
829            Element datasetList = eAppXml.getChild("datasets", eAppXml.getNamespace());
830            if (datasetList != null) {
831    
832                List<Element> dsElems = datasetList.getChildren("dataset", eAppXml.getNamespace());
833                resolveDataSets(dsElems);
834                resolveTagContents("app-path", eAppXml.getChild("action", eAppXml.getNamespace()).getChild("workflow",
835                        eAppXml.getNamespace()), evalNofuncs);
836            }
837        }
838    
839        /**
840         * Resolve datasets using job configuration.
841         *
842         * @param dsElems : Data set XML element.
843         * @throws CoordinatorJobException thrown if failed to resolve datasets
844         */
845        private void resolveDataSets(List<Element> dsElems) throws CoordinatorJobException {
846            for (Element dsElem : dsElems) {
847                // Setting up default TimeUnit and EndOFDuraion
848                evalFreq.setVariable("timeunit", TimeUnit.MINUTE);
849                evalFreq.setVariable("endOfDuration", TimeUnit.NONE);
850    
851                String val = resolveAttribute("frequency", dsElem, evalFreq);
852                int ival = ParamChecker.checkInteger(val, "frequency");
853                ParamChecker.checkGTZero(ival, "frequency");
854                addAnAttribute("freq_timeunit", dsElem, evalFreq.getVariable("timeunit") == null ? TimeUnit.MINUTE
855                        .toString() : ((TimeUnit) evalFreq.getVariable("timeunit")).toString());
856                addAnAttribute("end_of_duration", dsElem, evalFreq.getVariable("endOfDuration") == null ? TimeUnit.NONE
857                        .toString() : ((TimeUnit) evalFreq.getVariable("endOfDuration")).toString());
858                val = resolveAttribute("initial-instance", dsElem, evalNofuncs);
859                ParamChecker.checkDateOozieTZ(val, "initial-instance");
860                checkInitialInstance(val);
861                val = resolveAttribute("timezone", dsElem, evalNofuncs);
862                ParamChecker.checkTimeZone(val, "timezone");
863                resolveTagContents("uri-template", dsElem, evalNofuncs);
864                resolveTagContents("done-flag", dsElem, evalNofuncs);
865            }
866        }
867    
868        /**
869         * Resolve the content of a tag.
870         *
871         * @param tagName : Tag name of job XML i.e. <timeout> 10 </timeout>
872         * @param elem : Element where the tag exists.
873         * @param eval : EL evealuator
874         * @return Resolved tag content.
875         * @throws CoordinatorJobException thrown if failed to resolve tag content
876         */
877        @SuppressWarnings("unchecked")
878        private String resolveTagContents(String tagName, Element elem, ELEvaluator eval) throws CoordinatorJobException {
879            String ret = "";
880            if (elem != null) {
881                for (Element tagElem : (List<Element>) elem.getChildren(tagName, elem.getNamespace())) {
882                    if (tagElem != null) {
883                        String updated;
884                        try {
885                            updated = CoordELFunctions.evalAndWrap(eval, tagElem.getText().trim());
886    
887                        }
888                        catch (Exception e) {
889                            throw new CoordinatorJobException(ErrorCode.E1004, e.getMessage(), e);
890                        }
891                        tagElem.removeContent();
892                        tagElem.addContent(updated);
893                        ret += updated;
894                    }
895                }
896            }
897            return ret;
898        }
899    
900        /**
901         * Resolve an attribute value.
902         *
903         * @param attrName : Attribute name.
904         * @param elem : XML Element where attribute is defiend
905         * @param eval : ELEvaluator used to resolve
906         * @return Resolved attribute value
907         * @throws CoordinatorJobException thrown if failed to resolve an attribute value
908         */
909        private String resolveAttribute(String attrName, Element elem, ELEvaluator eval) throws CoordinatorJobException {
910            Attribute attr = elem.getAttribute(attrName);
911            String val = null;
912            if (attr != null) {
913                try {
914                    val = CoordELFunctions.evalAndWrap(eval, attr.getValue().trim());
915                }
916                catch (Exception e) {
917                    throw new CoordinatorJobException(ErrorCode.E1004, e.getMessage(), e);
918                }
919                attr.setValue(val);
920            }
921            return val;
922        }
923    
924        /**
925         * Include referred datasets into XML.
926         *
927         * @param resolvedXml : Job XML element.
928         * @param conf : Job configuration
929         * @throws CoordinatorJobException thrown if failed to include referred datasets into XML
930         */
931        @SuppressWarnings("unchecked")
932        protected void includeDataSets(Element resolvedXml, Configuration conf) throws CoordinatorJobException {
933            Element datasets = resolvedXml.getChild("datasets", resolvedXml.getNamespace());
934            Element allDataSets = new Element("all_datasets", resolvedXml.getNamespace());
935            List<String> dsList = new ArrayList<String>();
936            if (datasets != null) {
937                for (Element includeElem : (List<Element>) datasets.getChildren("include", datasets.getNamespace())) {
938                    String incDSFile = includeElem.getTextTrim();
939                    includeOneDSFile(incDSFile, dsList, allDataSets, datasets.getNamespace());
940                }
941                for (Element e : (List<Element>) datasets.getChildren("dataset", datasets.getNamespace())) {
942                    String dsName = e.getAttributeValue("name");
943                    if (dsList.contains(dsName)) {// Override with this DS
944                        // Remove duplicate
945                        removeDataSet(allDataSets, dsName);
946                    }
947                    else {
948                        dsList.add(dsName);
949                    }
950                    allDataSets.addContent((Element) e.clone());
951                }
952            }
953            insertDataSet(resolvedXml, allDataSets);
954            resolvedXml.removeChild("datasets", resolvedXml.getNamespace());
955        }
956    
957        /**
958         * Include one dataset file.
959         *
960         * @param incDSFile : Include data set filename.
961         * @param dsList :List of dataset names to verify the duplicate.
962         * @param allDataSets : Element that includes all dataset definitions.
963         * @param dsNameSpace : Data set name space
964         * @throws CoordinatorJobException thrown if failed to include one dataset file
965         */
966        @SuppressWarnings("unchecked")
967        private void includeOneDSFile(String incDSFile, List<String> dsList, Element allDataSets, Namespace dsNameSpace)
968        throws CoordinatorJobException {
969            Element tmpDataSets = null;
970            try {
971                String dsXml = readDefinition(incDSFile);
972                LOG.debug("DSFILE :" + incDSFile + "\n" + dsXml);
973                tmpDataSets = XmlUtils.parseXml(dsXml);
974            }
975            catch (JDOMException e) {
976                LOG.warn("Error parsing included dataset [{0}].  Message [{1}]", incDSFile, e.getMessage());
977                throw new CoordinatorJobException(ErrorCode.E0700, e.getMessage());
978            }
979            resolveDataSets(tmpDataSets.getChildren("dataset"));
980            for (Element e : (List<Element>) tmpDataSets.getChildren("dataset")) {
981                String dsName = e.getAttributeValue("name");
982                if (dsList.contains(dsName)) {
983                    throw new RuntimeException("Duplicate Dataset " + dsName);
984                }
985                dsList.add(dsName);
986                Element tmp = (Element) e.clone();
987                // TODO: Don't like to over-write the external/include DS's namespace
988                tmp.setNamespace(dsNameSpace);
989                tmp.getChild("uri-template").setNamespace(dsNameSpace);
990                if (e.getChild("done-flag") != null) {
991                    tmp.getChild("done-flag").setNamespace(dsNameSpace);
992                }
993                allDataSets.addContent(tmp);
994            }
995            // nested include
996            for (Element includeElem : (List<Element>) tmpDataSets.getChildren("include", tmpDataSets.getNamespace())) {
997                String incFile = includeElem.getTextTrim();
998                includeOneDSFile(incFile, dsList, allDataSets, dsNameSpace);
999            }
1000        }
1001    
1002        /**
1003         * Remove a dataset from a list of dataset.
1004         *
1005         * @param eDatasets : List of dataset
1006         * @param name : Dataset name to be removed.
1007         */
1008        @SuppressWarnings("unchecked")
1009        private static void removeDataSet(Element eDatasets, String name) {
1010            for (Element eDataset : (List<Element>) eDatasets.getChildren("dataset", eDatasets.getNamespace())) {
1011                if (eDataset.getAttributeValue("name").equals(name)) {
1012                    eDataset.detach();
1013                    return;
1014                }
1015            }
1016            throw new RuntimeException("undefined dataset: " + name);
1017        }
1018    
1019        /**
1020         * Read coordinator definition.
1021         *
1022         * @param appPath application path.
1023         * @return coordinator definition.
1024         * @throws CoordinatorJobException thrown if the definition could not be read.
1025         */
1026        protected String readDefinition(String appPath) throws CoordinatorJobException {
1027            String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
1028            // Configuration confHadoop = CoordUtils.getHadoopConf(conf);
1029            try {
1030                URI uri = new URI(appPath);
1031                LOG.debug("user =" + user);
1032                HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
1033                Configuration fsConf = has.createJobConf(uri.getAuthority());
1034                FileSystem fs = has.createFileSystem(user, uri, fsConf);
1035                Path appDefPath = null;
1036    
1037                // app path could be a directory
1038                Path path = new Path(uri.getPath());
1039                // check file exists for dataset include file, app xml already checked
1040                if (!fs.exists(path)) {
1041                    throw new URISyntaxException(path.toString(), "path not existed : " + path.toString());
1042                }
1043                if (!fs.isFile(path)) {
1044                    appDefPath = new Path(path, COORDINATOR_XML_FILE);
1045                } else {
1046                    appDefPath = path;
1047                }
1048    
1049                Reader reader = new InputStreamReader(fs.open(appDefPath));
1050                StringWriter writer = new StringWriter();
1051                IOUtils.copyCharStream(reader, writer);
1052                return writer.toString();
1053            }
1054            catch (IOException ex) {
1055                LOG.warn("IOException :" + XmlUtils.prettyPrint(conf), ex);
1056                throw new CoordinatorJobException(ErrorCode.E1001, ex.getMessage(), ex);
1057            }
1058            catch (URISyntaxException ex) {
1059                LOG.warn("URISyException :" + ex.getMessage());
1060                throw new CoordinatorJobException(ErrorCode.E1002, appPath, ex.getMessage(), ex);
1061            }
1062            catch (HadoopAccessorException ex) {
1063                throw new CoordinatorJobException(ex);
1064            }
1065            catch (Exception ex) {
1066                LOG.warn("Exception :", ex);
1067                throw new CoordinatorJobException(ErrorCode.E1001, ex.getMessage(), ex);
1068            }
1069        }
1070    
1071        /**
1072         * Write a coordinator job into database
1073         *
1074         * @param eJob : XML element of job
1075         * @param coordJob : Coordinator job bean
1076         * @return Job id
1077         * @throws CommandException thrown if unable to save coordinator job to db
1078         */
1079        private String storeToDB(Element eJob, CoordinatorJobBean coordJob) throws CommandException {
1080            String jobId = Services.get().get(UUIDService.class).generateId(ApplicationType.COORDINATOR);
1081            coordJob.setId(jobId);
1082    
1083            coordJob.setAppPath(conf.get(OozieClient.COORDINATOR_APP_PATH));
1084            coordJob.setCreatedTime(new Date());
1085            coordJob.setUser(conf.get(OozieClient.USER_NAME));
1086            String group = ConfigUtils.getWithDeprecatedCheck(conf, OozieClient.JOB_ACL, OozieClient.GROUP_NAME, null);
1087            coordJob.setGroup(group);
1088            coordJob.setConf(XmlUtils.prettyPrint(conf).toString());
1089            coordJob.setJobXml(XmlUtils.prettyPrint(eJob).toString());
1090            coordJob.setLastActionNumber(0);
1091            coordJob.setLastModifiedTime(new Date());
1092    
1093            if (!dryrun) {
1094                coordJob.setLastModifiedTime(new Date());
1095                try {
1096                    jpaService.execute(new CoordJobInsertJPAExecutor(coordJob));
1097                }
1098                catch (JPAExecutorException jpaee) {
1099                    coordJob.setId(null);
1100                    coordJob.setStatus(CoordinatorJob.Status.FAILED);
1101                    throw new CommandException(jpaee);
1102                }
1103            }
1104            return jobId;
1105        }
1106    
1107        /*
1108         * this method checks if the initial-instance specified for a particular
1109           is not a date earlier than the oozie server default Jan 01, 1970 00:00Z UTC
1110         */
1111        private void checkInitialInstance(String val) throws CoordinatorJobException, IllegalArgumentException {
1112            Date initialInstance, givenInstance;
1113            try {
1114                initialInstance = DateUtils.parseDateUTC("1970-01-01T00:00Z");
1115                givenInstance = DateUtils.parseDateOozieTZ(val);
1116            }
1117            catch (Exception e) {
1118                throw new IllegalArgumentException("Unable to parse dataset initial-instance string '" + val +
1119                                                   "' to Date object. ",e);
1120            }
1121            if(givenInstance.compareTo(initialInstance) < 0) {
1122                throw new CoordinatorJobException(ErrorCode.E1021, "Dataset initial-instance " + val +
1123                        " is earlier than the default initial instance " + DateUtils.formatDateOozieTZ(initialInstance));
1124            }
1125        }
1126    
1127        /* (non-Javadoc)
1128         * @see org.apache.oozie.command.XCommand#getEntityKey()
1129         */
1130        @Override
1131        public String getEntityKey() {
1132            return null;
1133        }
1134    
1135        /* (non-Javadoc)
1136         * @see org.apache.oozie.command.XCommand#isLockRequired()
1137         */
1138        @Override
1139        protected boolean isLockRequired() {
1140            return false;
1141        }
1142    
1143        /* (non-Javadoc)
1144         * @see org.apache.oozie.command.XCommand#loadState()
1145         */
1146        @Override
1147        protected void loadState() throws CommandException {
1148            jpaService = Services.get().get(JPAService.class);
1149            if (jpaService == null) {
1150                throw new CommandException(ErrorCode.E0610);
1151            }
1152            coordJob = new CoordinatorJobBean();
1153            if (this.bundleId != null) {
1154                // this coord job is created from bundle
1155                coordJob.setBundleId(this.bundleId);
1156                // first use bundle id if submit thru bundle
1157                logInfo.setParameter(DagXLogInfoService.JOB, this.bundleId);
1158                LogUtils.setLogInfo(logInfo);
1159            }
1160            if (this.coordName != null) {
1161                // this coord job is created from bundle
1162                coordJob.setAppName(this.coordName);
1163            }
1164            setJob(coordJob);
1165    
1166        }
1167    
1168        /* (non-Javadoc)
1169         * @see org.apache.oozie.command.XCommand#verifyPrecondition()
1170         */
1171        @Override
1172        protected void verifyPrecondition() throws CommandException {
1173    
1174        }
1175    
1176        /* (non-Javadoc)
1177         * @see org.apache.oozie.command.TransitionXCommand#notifyParent()
1178         */
1179        @Override
1180        public void notifyParent() throws CommandException {
1181            // update bundle action
1182            if (coordJob.getBundleId() != null) {
1183                LOG.debug("Updating bundle record: " + coordJob.getBundleId() + " for coord id: " + coordJob.getId());
1184                BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus);
1185                bundleStatusUpdate.call();
1186            }
1187        }
1188    
1189        /* (non-Javadoc)
1190         * @see org.apache.oozie.command.TransitionXCommand#updateJob()
1191         */
1192        @Override
1193        public void updateJob() throws CommandException {
1194        }
1195    
1196        /* (non-Javadoc)
1197         * @see org.apache.oozie.command.TransitionXCommand#getJob()
1198         */
1199        @Override
1200        public Job getJob() {
1201            return coordJob;
1202        }
1203    
1204        @Override
1205        public void performWrites() throws CommandException {
1206        }
1207    }