001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.coord;
019    
020    import java.io.IOException;
021    import java.io.InputStreamReader;
022    import java.io.Reader;
023    import java.io.StringReader;
024    import java.io.StringWriter;
025    import java.net.URI;
026    import java.net.URISyntaxException;
027    import java.util.ArrayList;
028    import java.util.Date;
029    import java.util.HashMap;
030    import java.util.HashSet;
031    import java.util.Iterator;
032    import java.util.List;
033    import java.util.Map;
034    import java.util.Set;
035    import java.util.TreeSet;
036    
037    import javax.xml.transform.stream.StreamSource;
038    import javax.xml.validation.Validator;
039    
040    import org.apache.hadoop.conf.Configuration;
041    import org.apache.hadoop.fs.FileSystem;
042    import org.apache.hadoop.fs.Path;
043    import org.apache.oozie.CoordinatorJobBean;
044    import org.apache.oozie.ErrorCode;
045    import org.apache.oozie.client.CoordinatorJob;
046    import org.apache.oozie.client.Job;
047    import org.apache.oozie.client.OozieClient;
048    import org.apache.oozie.client.CoordinatorJob.Execution;
049    import org.apache.oozie.command.CommandException;
050    import org.apache.oozie.command.SubmitTransitionXCommand;
051    import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
052    import org.apache.oozie.coord.CoordELEvaluator;
053    import org.apache.oozie.coord.CoordELFunctions;
054    import org.apache.oozie.coord.CoordinatorJobException;
055    import org.apache.oozie.coord.TimeUnit;
056    import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor;
057    import org.apache.oozie.executor.jpa.JPAExecutorException;
058    import org.apache.oozie.service.DagXLogInfoService;
059    import org.apache.oozie.service.HadoopAccessorException;
060    import org.apache.oozie.service.HadoopAccessorService;
061    import org.apache.oozie.service.JPAService;
062    import org.apache.oozie.service.SchemaService;
063    import org.apache.oozie.service.Service;
064    import org.apache.oozie.service.Services;
065    import org.apache.oozie.service.UUIDService;
066    import org.apache.oozie.service.SchemaService.SchemaName;
067    import org.apache.oozie.service.UUIDService.ApplicationType;
068    import org.apache.oozie.util.ConfigUtils;
069    import org.apache.oozie.util.DateUtils;
070    import org.apache.oozie.util.ELEvaluator;
071    import org.apache.oozie.util.ELUtils;
072    import org.apache.oozie.util.IOUtils;
073    import org.apache.oozie.util.InstrumentUtils;
074    import org.apache.oozie.util.LogUtils;
075    import org.apache.oozie.util.ParamChecker;
076    import org.apache.oozie.util.ParameterVerifier;
077    import org.apache.oozie.util.ParameterVerifierException;
078    import org.apache.oozie.util.PropertiesUtils;
079    import org.apache.oozie.util.XConfiguration;
080    import org.apache.oozie.util.XLog;
081    import org.apache.oozie.util.XmlUtils;
082    import org.jdom.Attribute;
083    import org.jdom.Element;
084    import org.jdom.JDOMException;
085    import org.jdom.Namespace;
086    import org.xml.sax.SAXException;
087    
088    /**
089     * This class provides the functionalities to resolve a coordinator job XML and write the job information into a DB
090     * table.
091     * <p/>
092     * Specifically it performs the following functions: 1. Resolve all the variables or properties using job
093     * configurations. 2. Insert all datasets definition as part of the <data-in> and <data-out> tags. 3. Validate the XML
094     * at runtime.
095     */
096    public class CoordSubmitXCommand extends SubmitTransitionXCommand {
097    
098        private Configuration conf;
099        private final String authToken;
100        private final String bundleId;
101        private final String coordName;
102        private boolean dryrun;
103        private JPAService jpaService = null;
104        private CoordinatorJob.Status prevStatus = CoordinatorJob.Status.PREP;
105    
106        public static final String CONFIG_DEFAULT = "coord-config-default.xml";
107        public static final String COORDINATOR_XML_FILE = "coordinator.xml";
108        public final String COORD_INPUT_EVENTS ="input-events";
109        public final String COORD_OUTPUT_EVENTS = "output-events";
110        public final String COORD_INPUT_EVENTS_DATA_IN ="data-in";
111        public final String COORD_OUTPUT_EVENTS_DATA_OUT = "data-out";
112    
113        private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>();
114        private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>();
115    
116        private CoordinatorJobBean coordJob = null;
117        /**
118         * Default timeout for normal jobs, in minutes, after which coordinator input check will timeout
119         */
120        public static final String CONF_DEFAULT_TIMEOUT_NORMAL = Service.CONF_PREFIX + "coord.normal.default.timeout";
121    
122        public static final String CONF_DEFAULT_CONCURRENCY = Service.CONF_PREFIX + "coord.default.concurrency";
123    
124        public static final String CONF_DEFAULT_THROTTLE = Service.CONF_PREFIX + "coord.default.throttle";
125    
126        public static final String CONF_MAT_THROTTLING_FACTOR = Service.CONF_PREFIX
127                + "coord.materialization.throttling.factor";
128    
129        /**
130         * Default MAX timeout in minutes, after which coordinator input check will timeout
131         */
132        public static final String CONF_DEFAULT_MAX_TIMEOUT = Service.CONF_PREFIX + "coord.default.max.timeout";
133    
134    
135        public static final String CONF_QUEUE_SIZE = Service.CONF_PREFIX + "CallableQueueService.queue.size";
136    
137        private ELEvaluator evalFreq = null;
138        private ELEvaluator evalNofuncs = null;
139        private ELEvaluator evalData = null;
140        private ELEvaluator evalInst = null;
141        private ELEvaluator evalAction = null;
142        private ELEvaluator evalSla = null;
143    
144        static {
145            String[] badUserProps = { PropertiesUtils.YEAR, PropertiesUtils.MONTH, PropertiesUtils.DAY,
146                    PropertiesUtils.HOUR, PropertiesUtils.MINUTE, PropertiesUtils.DAYS, PropertiesUtils.HOURS,
147                    PropertiesUtils.MINUTES, PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB,
148                    PropertiesUtils.TB, PropertiesUtils.PB, PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN,
149                    PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN, PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS };
150            PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES);
151    
152            String[] badDefaultProps = { PropertiesUtils.HADOOP_USER};
153            PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES);
154            PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES);
155        }
156    
157        /**
158         * Constructor to create the Coordinator Submit Command.
159         *
160         * @param conf : Configuration for Coordinator job
161         * @param authToken : To be used for authentication
162         */
163        public CoordSubmitXCommand(Configuration conf, String authToken) {
164            super("coord_submit", "coord_submit", 1);
165            this.conf = ParamChecker.notNull(conf, "conf");
166            this.authToken = ParamChecker.notEmpty(authToken, "authToken");
167            this.bundleId = null;
168            this.coordName = null;
169        }
170    
171        /**
172         * Constructor to create the Coordinator Submit Command by bundle job.
173         *
174         * @param conf : Configuration for Coordinator job
175         * @param authToken : To be used for authentication
176         * @param bundleId : bundle id
177         * @param coordName : coord name
178         */
179        public CoordSubmitXCommand(Configuration conf, String authToken, String bundleId, String coordName) {
180            super("coord_submit", "coord_submit", 1);
181            this.conf = ParamChecker.notNull(conf, "conf");
182            this.authToken = ParamChecker.notEmpty(authToken, "authToken");
183            this.bundleId = ParamChecker.notEmpty(bundleId, "bundleId");
184            this.coordName = ParamChecker.notEmpty(coordName, "coordName");
185        }
186    
187        /**
188         * Constructor to create the Coordinator Submit Command.
189         *
190         * @param dryrun : if dryrun
191         * @param conf : Configuration for Coordinator job
192         * @param authToken : To be used for authentication
193         */
194        public CoordSubmitXCommand(boolean dryrun, Configuration conf, String authToken) {
195            this(conf, authToken);
196            this.dryrun = dryrun;
197        }
198    
199        /* (non-Javadoc)
200         * @see org.apache.oozie.command.XCommand#execute()
201         */
202        @Override
203        protected String submit() throws CommandException {
204            String jobId = null;
205            LOG.info("STARTED Coordinator Submit");
206            InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
207    
208            boolean exceptionOccured = false;
209            try {
210                mergeDefaultConfig();
211    
212                String appXml = readAndValidateXml();
213                coordJob.setOrigJobXml(appXml);
214                LOG.debug("jobXml after initial validation " + XmlUtils.prettyPrint(appXml).toString());
215    
216                Element eXml = XmlUtils.parseXml(appXml);
217                
218                String appNamespace = readAppNamespace(eXml);
219                coordJob.setAppNamespace(appNamespace);
220    
221                ParameterVerifier.verifyParameters(conf, eXml);
222                
223                appXml = XmlUtils.removeComments(appXml);
224                initEvaluators();
225                Element eJob = basicResolveAndIncludeDS(appXml, conf, coordJob);
226    
227                validateCoordinatorJob();
228    
229                // checking if the coordinator application data input/output events
230                // specify multiple data instance values in erroneous manner
231                checkMultipleTimeInstances(eJob, COORD_INPUT_EVENTS, COORD_INPUT_EVENTS_DATA_IN);
232                checkMultipleTimeInstances(eJob, COORD_OUTPUT_EVENTS, COORD_OUTPUT_EVENTS_DATA_OUT);
233    
234                LOG.debug("jobXml after all validation " + XmlUtils.prettyPrint(eJob).toString());
235    
236                jobId = storeToDB(eJob, coordJob);
237                // log job info for coordinator job
238                LogUtils.setLogInfo(coordJob, logInfo);
239                LOG = XLog.resetPrefix(LOG);
240    
241                if (!dryrun) {
242                    // submit a command to materialize jobs for the next 1 hour (3600 secs)
243                    // so we don't wait 10 mins for the Service to run.
244                    queue(new CoordMaterializeTransitionXCommand(jobId, 3600), 100);
245                }
246                else {
247                    Date startTime = coordJob.getStartTime();
248                    long startTimeMilli = startTime.getTime();
249                    long endTimeMilli = startTimeMilli + (3600 * 1000);
250                    Date jobEndTime = coordJob.getEndTime();
251                    Date endTime = new Date(endTimeMilli);
252                    if (endTime.compareTo(jobEndTime) > 0) {
253                        endTime = jobEndTime;
254                    }
255                    jobId = coordJob.getId();
256                    LOG.info("[" + jobId + "]: Update status to RUNNING");
257                    coordJob.setStatus(Job.Status.RUNNING);
258                    coordJob.setPending();
259                    CoordActionMaterializeCommand coordActionMatCom = new CoordActionMaterializeCommand(jobId, startTime,
260                            endTime);
261                    Configuration jobConf = null;
262                    try {
263                        jobConf = new XConfiguration(new StringReader(coordJob.getConf()));
264                    }
265                    catch (IOException e1) {
266                        LOG.warn("Configuration parse error. read from DB :" + coordJob.getConf(), e1);
267                    }
268                    String action = coordActionMatCom.materializeJobs(true, coordJob, jobConf, null);
269                    String output = coordJob.getJobXml() + System.getProperty("line.separator")
270                    + "***actions for instance***" + action;
271                    return output;
272                }
273            }
274            catch (JDOMException jex) {
275                exceptionOccured = true;
276                LOG.warn("ERROR: ", jex);
277                throw new CommandException(ErrorCode.E0700, jex.getMessage(), jex);
278            }
279            catch (CoordinatorJobException cex) {
280                exceptionOccured = true;
281                LOG.warn("ERROR:  ", cex);
282                throw new CommandException(cex);
283            }
284            catch (ParameterVerifierException pex) {
285                exceptionOccured = true;
286                LOG.warn("ERROR: ", pex);
287                throw new CommandException(pex);
288            }
289            catch (IllegalArgumentException iex) {
290                exceptionOccured = true;
291                LOG.warn("ERROR:  ", iex);
292                throw new CommandException(ErrorCode.E1003, iex);
293            }
294            catch (Exception ex) {
295                exceptionOccured = true;
296                LOG.warn("ERROR:  ", ex);
297                throw new CommandException(ErrorCode.E0803, ex);
298            }
299            finally {
300                if (exceptionOccured) {
301                    if(coordJob.getId() == null || coordJob.getId().equalsIgnoreCase("")){
302                        coordJob.setStatus(CoordinatorJob.Status.FAILED);
303                        coordJob.resetPending();
304                    }
305                }
306            }
307    
308            LOG.info("ENDED Coordinator Submit jobId=" + jobId);
309            return jobId;
310        }
311    
312        /**
313         * Method that validates values in the definition for correctness. Placeholder to add more.
314         */
315        private void validateCoordinatorJob() {
316            // check if startTime < endTime
317            if (coordJob.getStartTime().after(coordJob.getEndTime())) {
318                throw new IllegalArgumentException("Coordinator Start Time cannot be greater than End Time.");
319            }
320        }
321    
322      /*
323      * Check against multiple data instance values inside a single <instance> <start-instance> or <end-instance> tag
324      * If found, the job is not submitted and user is informed to correct the error, instead of defaulting to the first instance value in the list
325      */
326        private void checkMultipleTimeInstances(Element eCoordJob, String eventType, String dataType) throws CoordinatorJobException {
327            Element eventsSpec, dataSpec, instance;
328            List<Element> instanceSpecList;
329            Namespace ns = eCoordJob.getNamespace();
330            String instanceValue;
331            eventsSpec = eCoordJob.getChild(eventType, ns);
332            if (eventsSpec != null) {
333                dataSpec = eventsSpec.getChild(dataType, ns);
334                if (dataSpec != null) {
335                    // In case of input-events, there can be multiple child <instance> datasets. Iterating to ensure none of them have errors
336                    instanceSpecList = dataSpec.getChildren("instance", ns);
337                    Iterator instanceIter = instanceSpecList.iterator();
338                    while(instanceIter.hasNext()) {
339                        instance = ((Element) instanceIter.next());
340                        if(instance.getContentSize() == 0) { //empty string or whitespace
341                            throw new CoordinatorJobException(ErrorCode.E1021, "<instance> tag within " + eventType + " is empty!");
342                        }
343                        instanceValue = instance.getContent(0).toString();
344                        boolean isInvalid = false;
345                        try {
346                            isInvalid = evalAction.checkForExistence(instanceValue, ",");
347                        } catch (Exception e) {
348                            handleELParseException(eventType, dataType, instanceValue);
349                        }
350                        if (isInvalid) { // reaching this block implies instance is not empty i.e. length > 0
351                            handleExpresionWithMultipleInstances(eventType, dataType, instanceValue);
352                        }
353                    }
354    
355                    // In case of input-events, there can be multiple child <start-instance> datasets. Iterating to ensure none of them have errors
356                    instanceSpecList = dataSpec.getChildren("start-instance", ns);
357                    instanceIter = instanceSpecList.iterator();
358                    while(instanceIter.hasNext()) {
359                        instance = ((Element) instanceIter.next());
360                        if(instance.getContentSize() == 0) { //empty string or whitespace
361                            throw new CoordinatorJobException(ErrorCode.E1021, "<start-instance> tag within " + eventType + " is empty!");
362                        }
363                        instanceValue = instance.getContent(0).toString();
364                        boolean isInvalid = false;
365                        try {
366                            isInvalid = evalAction.checkForExistence(instanceValue, ",");
367                        } catch (Exception e) {
368                            handleELParseException(eventType, dataType, instanceValue);
369                        }
370                        if (isInvalid) { // reaching this block implies start instance is not empty i.e. length > 0
371                            handleExpresionWithStartMultipleInstances(eventType, dataType, instanceValue);
372                        }
373                    }
374    
375                    // In case of input-events, there can be multiple child <end-instance> datasets. Iterating to ensure none of them have errors
376                    instanceSpecList = dataSpec.getChildren("end-instance", ns);
377                    instanceIter = instanceSpecList.iterator();
378                    while(instanceIter.hasNext()) {
379                        instance = ((Element) instanceIter.next());
380                        if(instance.getContentSize() == 0) { //empty string or whitespace
381                            throw new CoordinatorJobException(ErrorCode.E1021, "<end-instance> tag within " + eventType + " is empty!");
382                        }
383                        instanceValue = instance.getContent(0).toString();
384                        boolean isInvalid = false;
385                        try {
386                            isInvalid = evalAction.checkForExistence(instanceValue, ",");
387                        } catch (Exception e) {
388                            handleELParseException(eventType, dataType, instanceValue);
389                        }
390                        if (isInvalid) { // reaching this block implies instance is not empty i.e. length > 0
391                            handleExpresionWithMultipleEndInstances(eventType, dataType, instanceValue);
392                        }
393                    }
394    
395                }
396            }
397        }
398    
399        private void handleELParseException(String eventType, String dataType, String instanceValue)
400                throws CoordinatorJobException {
401            String correctAction = null;
402            if(dataType.equals(COORD_INPUT_EVENTS_DATA_IN)) {
403                correctAction = "Coordinator app definition should have valid <instance> tag for data-in";
404            } else if(dataType.equals(COORD_OUTPUT_EVENTS_DATA_OUT)) {
405                correctAction = "Coordinator app definition should have valid <instance> tag for data-out";
406            }
407            throw new CoordinatorJobException(ErrorCode.E1021, eventType + " instance '" + instanceValue
408                    + "' is not valid. Coordinator job NOT SUBMITTED. " + correctAction);
409        }
410    
411        private void handleExpresionWithMultipleInstances(String eventType, String dataType, String instanceValue)
412                throws CoordinatorJobException {
413            String correctAction = null;
414            if(dataType.equals(COORD_INPUT_EVENTS_DATA_IN)) {
415                correctAction = "Coordinator app definition should have separate <instance> tag per data-in instance";
416            } else if(dataType.equals(COORD_OUTPUT_EVENTS_DATA_OUT)) {
417                correctAction = "Coordinator app definition can have only one <instance> tag per data-out instance";
418            }
419            throw new CoordinatorJobException(ErrorCode.E1021, eventType + " instance '" + instanceValue
420                    + "' contains more than one date instance. Coordinator job NOT SUBMITTED. " + correctAction);
421        }
422    
423        private void handleExpresionWithStartMultipleInstances(String eventType, String dataType, String instanceValue)
424                throws CoordinatorJobException {
425            String correctAction = "Coordinator app definition should not have multiple start-instances";
426            throw new CoordinatorJobException(ErrorCode.E1021, eventType + " start-instance '" + instanceValue
427                    + "' contains more than one date start-instance. Coordinator job NOT SUBMITTED. " + correctAction);
428        }
429    
430        private void handleExpresionWithMultipleEndInstances(String eventType, String dataType, String instanceValue)
431                throws CoordinatorJobException {
432            String correctAction = "Coordinator app definition should not have multiple end-instances";
433            throw new CoordinatorJobException(ErrorCode.E1021, eventType + " end-instance '" + instanceValue
434                    + "' contains more than one date end-instance. Coordinator job NOT SUBMITTED. " + correctAction);
435        }
436    
437        /**
438         * Read the application XML and validate against coordinator Schema
439         *
440         * @return validated coordinator XML
441         * @throws CoordinatorJobException thrown if unable to read or validate coordinator xml
442         */
443        private String readAndValidateXml() throws CoordinatorJobException {
444            String appPath = ParamChecker.notEmpty(conf.get(OozieClient.COORDINATOR_APP_PATH),
445                    OozieClient.COORDINATOR_APP_PATH);
446            String coordXml = readDefinition(appPath);
447            validateXml(coordXml);
448            return coordXml;
449        }
450    
451        /**
452         * Validate against Coordinator XSD file
453         *
454         * @param xmlContent : Input coordinator xml
455         * @throws CoordinatorJobException thrown if unable to validate coordinator xml
456         */
457        private void validateXml(String xmlContent) throws CoordinatorJobException {
458            javax.xml.validation.Schema schema = Services.get().get(SchemaService.class).getSchema(SchemaName.COORDINATOR);
459            Validator validator = schema.newValidator();
460            try {
461                validator.validate(new StreamSource(new StringReader(xmlContent)));
462            }
463            catch (SAXException ex) {
464                LOG.warn("SAXException :", ex);
465                throw new CoordinatorJobException(ErrorCode.E0701, ex.getMessage(), ex);
466            }
467            catch (IOException ex) {
468                LOG.warn("IOException :", ex);
469                throw new CoordinatorJobException(ErrorCode.E0702, ex.getMessage(), ex);
470            }
471        }
472    
473        /**
474         * Read the application XML schema namespace
475         *
476         * @param coordXmlElement input coordinator xml Element
477         * @return app xml namespace
478         * @throws CoordinatorJobException
479         */
480        private String readAppNamespace(Element coordXmlElement) throws CoordinatorJobException {
481            Namespace ns = coordXmlElement.getNamespace();
482            if (ns != null && bundleId != null && ns.getURI().equals(SchemaService.COORDINATOR_NAMESPACE_URI_1)) {
483                throw new CoordinatorJobException(ErrorCode.E1319, "bundle app can not submit coordinator namespace "
484                        + SchemaService.COORDINATOR_NAMESPACE_URI_1 + ", please use 0.2 or later");
485            }
486            if (ns != null) {
487                return ns.getURI();
488            }
489            else {
490                throw new CoordinatorJobException(ErrorCode.E0700, "the application xml namespace is not given");
491            }
492        }
493    
494        /**
495         * Merge default configuration with user-defined configuration.
496         *
497         * @throws CommandException thrown if failed to read or merge configurations
498         */
499        protected void mergeDefaultConfig() throws CommandException {
500            Path configDefault = null;
501            try {
502                String coordAppPathStr = conf.get(OozieClient.COORDINATOR_APP_PATH);
503                Path coordAppPath = new Path(coordAppPathStr);
504                String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
505                HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
506                Configuration fsConf = has.createJobConf(coordAppPath.toUri().getAuthority());
507                FileSystem fs = has.createFileSystem(user, coordAppPath.toUri(), fsConf);
508    
509                // app path could be a directory
510                if (!fs.isFile(coordAppPath)) {
511                    configDefault = new Path(coordAppPath, CONFIG_DEFAULT);
512                } else {
513                    configDefault = new Path(coordAppPath.getParent(), CONFIG_DEFAULT);
514                }
515    
516                if (fs.exists(configDefault)) {
517                    Configuration defaultConf = new XConfiguration(fs.open(configDefault));
518                    PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES);
519                    XConfiguration.injectDefaults(defaultConf, conf);
520                }
521                else {
522                    LOG.info("configDefault Doesn't exist " + configDefault);
523                }
524                PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES);
525    
526                // Resolving all variables in the job properties.
527                // This ensures the Hadoop Configuration semantics is preserved.
528                XConfiguration resolvedVarsConf = new XConfiguration();
529                for (Map.Entry<String, String> entry : conf) {
530                    resolvedVarsConf.set(entry.getKey(), conf.get(entry.getKey()));
531                }
532                conf = resolvedVarsConf;
533            }
534            catch (IOException e) {
535                throw new CommandException(ErrorCode.E0702, e.getMessage() + " : Problem reading default config "
536                        + configDefault, e);
537            }
538            catch (HadoopAccessorException e) {
539                throw new CommandException(e);
540            }
541            LOG.debug("Merged CONF :" + XmlUtils.prettyPrint(conf).toString());
542        }
543    
544        /**
545         * The method resolve all the variables that are defined in configuration. It also include the data set definition
546         * from dataset file into XML.
547         *
548         * @param appXml : Original job XML
549         * @param conf : Configuration of the job
550         * @param coordJob : Coordinator job bean to be populated.
551         * @return Resolved and modified job XML element.
552         * @throws CoordinatorJobException thrown if failed to resolve basic entities or include referred datasets
553         * @throws Exception thrown if failed to resolve basic entities or include referred datasets
554         */
555        public Element basicResolveAndIncludeDS(String appXml, Configuration conf, CoordinatorJobBean coordJob)
556        throws CoordinatorJobException, Exception {
557            Element basicResolvedApp = resolveInitial(conf, appXml, coordJob);
558            includeDataSets(basicResolvedApp, conf);
559            return basicResolvedApp;
560        }
561    
562        /**
563         * Insert data set into data-in and data-out tags.
564         *
565         * @param eAppXml : coordinator application XML
566         * @param eDatasets : DataSet XML
567         */
568        @SuppressWarnings("unchecked")
569        private void insertDataSet(Element eAppXml, Element eDatasets) {
570            // Adding DS definition in the coordinator XML
571            Element inputList = eAppXml.getChild("input-events", eAppXml.getNamespace());
572            if (inputList != null) {
573                for (Element dataIn : (List<Element>) inputList.getChildren("data-in", eAppXml.getNamespace())) {
574                    Element eDataset = findDataSet(eDatasets, dataIn.getAttributeValue("dataset"));
575                    dataIn.getContent().add(0, eDataset);
576                }
577            }
578            Element outputList = eAppXml.getChild("output-events", eAppXml.getNamespace());
579            if (outputList != null) {
580                for (Element dataOut : (List<Element>) outputList.getChildren("data-out", eAppXml.getNamespace())) {
581                    Element eDataset = findDataSet(eDatasets, dataOut.getAttributeValue("dataset"));
582                    dataOut.getContent().add(0, eDataset);
583                }
584            }
585        }
586    
587        /**
588         * Find a specific dataset from a list of Datasets.
589         *
590         * @param eDatasets : List of data sets
591         * @param name : queried data set name
592         * @return one Dataset element. otherwise throw Exception
593         */
594        @SuppressWarnings("unchecked")
595        private static Element findDataSet(Element eDatasets, String name) {
596            for (Element eDataset : (List<Element>) eDatasets.getChildren("dataset", eDatasets.getNamespace())) {
597                if (eDataset.getAttributeValue("name").equals(name)) {
598                    eDataset = (Element) eDataset.clone();
599                    eDataset.detach();
600                    return eDataset;
601                }
602            }
603            throw new RuntimeException("undefined dataset: " + name);
604        }
605    
606        /**
607         * Initialize all the required EL Evaluators.
608         */
609        protected void initEvaluators() {
610            evalFreq = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-submit-freq");
611            evalNofuncs = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-submit-nofuncs");
612            evalInst = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-submit-instances");
613            evalSla = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-sla-submit");
614            evalAction = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-action-start");
615        }
616    
617        /**
618         * Resolve basic entities using job Configuration.
619         *
620         * @param conf :Job configuration
621         * @param appXml : Original job XML
622         * @param coordJob : Coordinator job bean to be populated.
623         * @return Resolved job XML element.
624         * @throws CoordinatorJobException thrown if failed to resolve basic entities
625         * @throws Exception thrown if failed to resolve basic entities
626         */
627        @SuppressWarnings("unchecked")
628        protected Element resolveInitial(Configuration conf, String appXml, CoordinatorJobBean coordJob)
629        throws CoordinatorJobException, Exception {
630            Element eAppXml = XmlUtils.parseXml(appXml);
631            // job's main attributes
632            // frequency
633            String val = resolveAttribute("frequency", eAppXml, evalFreq);
634            int ival = ParamChecker.checkInteger(val, "frequency");
635            ParamChecker.checkGTZero(ival, "frequency");
636            coordJob.setFrequency(ival);
637            TimeUnit tmp = (evalFreq.getVariable("timeunit") == null) ? TimeUnit.MINUTE : ((TimeUnit) evalFreq
638                    .getVariable("timeunit"));
639            addAnAttribute("freq_timeunit", eAppXml, tmp.toString());
640            // TimeUnit
641            coordJob.setTimeUnit(CoordinatorJob.Timeunit.valueOf(tmp.toString()));
642            // End Of Duration
643            tmp = evalFreq.getVariable("endOfDuration") == null ? TimeUnit.NONE : ((TimeUnit) evalFreq
644                    .getVariable("endOfDuration"));
645            addAnAttribute("end_of_duration", eAppXml, tmp.toString());
646            // coordJob.setEndOfDuration(tmp) // TODO: Add new attribute in Job bean
647    
648            // Application name
649            if (this.coordName == null) {
650                String name = ELUtils.resolveAppName(eAppXml.getAttribute("name").getValue(), conf);
651                coordJob.setAppName(name);
652            }
653            else {
654                // this coord job is created from bundle
655                coordJob.setAppName(this.coordName);
656            }
657    
658            // start time
659            val = resolveAttribute("start", eAppXml, evalNofuncs);
660            ParamChecker.checkDateOozieTZ(val, "start");
661            coordJob.setStartTime(DateUtils.parseDateOozieTZ(val));
662            // end time
663            val = resolveAttribute("end", eAppXml, evalNofuncs);
664            ParamChecker.checkDateOozieTZ(val, "end");
665            coordJob.setEndTime(DateUtils.parseDateOozieTZ(val));
666            // Time zone
667            val = resolveAttribute("timezone", eAppXml, evalNofuncs);
668            ParamChecker.checkTimeZone(val, "timezone");
669            coordJob.setTimeZone(val);
670    
671            // controls
672            val = resolveTagContents("timeout", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs);
673            if (val == "") {
674                val = Services.get().getConf().get(CONF_DEFAULT_TIMEOUT_NORMAL);
675            }
676    
677            ival = ParamChecker.checkInteger(val, "timeout");
678            if (ival < 0 || ival > Services.get().getConf().getInt(CONF_DEFAULT_MAX_TIMEOUT, 129600)) {
679                ival = Services.get().getConf().getInt(CONF_DEFAULT_MAX_TIMEOUT, 129600);
680            }
681            coordJob.setTimeout(ival);
682    
683            val = resolveTagContents("concurrency", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs);
684            if (val == null || val.isEmpty()) {
685                val = Services.get().getConf().get(CONF_DEFAULT_CONCURRENCY, "1");
686            }
687            ival = ParamChecker.checkInteger(val, "concurrency");
688            coordJob.setConcurrency(ival);
689    
690            val = resolveTagContents("throttle", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs);
691            if (val == null || val.isEmpty()) {
692                int defaultThrottle = Services.get().getConf().getInt(CONF_DEFAULT_THROTTLE, 12);
693                ival = defaultThrottle;
694            }
695            else {
696                ival = ParamChecker.checkInteger(val, "throttle");
697            }
698            int maxQueue = Services.get().getConf().getInt(CONF_QUEUE_SIZE, 10000);
699            float factor = Services.get().getConf().getFloat(CONF_MAT_THROTTLING_FACTOR, 0.10f);
700            int maxThrottle = (int) (maxQueue * factor);
701            if (ival > maxThrottle || ival < 1) {
702                ival = maxThrottle;
703            }
704            LOG.debug("max throttle " + ival);
705            coordJob.setMatThrottling(ival);
706    
707            val = resolveTagContents("execution", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs);
708            if (val == "") {
709                val = Execution.FIFO.toString();
710            }
711            coordJob.setExecution(Execution.valueOf(val));
712            String[] acceptedVals = { Execution.LIFO.toString(), Execution.FIFO.toString(), Execution.LAST_ONLY.toString() };
713            ParamChecker.isMember(val, acceptedVals, "execution");
714    
715            // datasets
716            resolveTagContents("include", eAppXml.getChild("datasets", eAppXml.getNamespace()), evalNofuncs);
717            // for each data set
718            resolveDataSets(eAppXml);
719            HashMap<String, String> dataNameList = new HashMap<String, String>();
720            resolveIOEvents(eAppXml, dataNameList);
721    
722            resolveTagContents("app-path", eAppXml.getChild("action", eAppXml.getNamespace()).getChild("workflow",
723                    eAppXml.getNamespace()), evalNofuncs);
724            // TODO: If action or workflow tag is missing, NullPointerException will
725            // occur
726            Element configElem = eAppXml.getChild("action", eAppXml.getNamespace()).getChild("workflow",
727                    eAppXml.getNamespace()).getChild("configuration", eAppXml.getNamespace());
728            evalData = CoordELEvaluator.createELEvaluatorForDataEcho(conf, "coord-job-submit-data", dataNameList);
729            if (configElem != null) {
730                for (Element propElem : (List<Element>) configElem.getChildren("property", configElem.getNamespace())) {
731                    resolveTagContents("name", propElem, evalData);
732                    // Want to check the data-integrity but don't want to modify the
733                    // XML
734                    // for properties only
735                    Element tmpProp = (Element) propElem.clone();
736                    resolveTagContents("value", tmpProp, evalData);
737                }
738            }
739            resolveSLA(eAppXml, coordJob);
740            return eAppXml;
741        }
742    
743        /**
744         * Resolve SLA events
745         *
746         * @param eAppXml job XML
747         * @param coordJob coordinator job bean
748         * @throws CommandException thrown if failed to resolve sla events
749         */
750        private void resolveSLA(Element eAppXml, CoordinatorJobBean coordJob) throws CommandException {
751            Element eSla = eAppXml.getChild("action", eAppXml.getNamespace()).getChild("info",
752                    Namespace.getNamespace(SchemaService.SLA_NAME_SPACE_URI));
753    
754            if (eSla != null) {
755                String slaXml = XmlUtils.prettyPrint(eSla).toString();
756                try {
757                    // EL evaluation
758                    slaXml = evalSla.evaluate(slaXml, String.class);
759                    // Validate against semantic SXD
760                    XmlUtils.validateData(slaXml, SchemaName.SLA_ORIGINAL);
761                }
762                catch (Exception e) {
763                    throw new CommandException(ErrorCode.E1004, "Validation ERROR :" + e.getMessage(), e);
764                }
765            }
766        }
767    
768        /**
769         * Resolve input-events/data-in and output-events/data-out tags.
770         *
771         * @param eJob : Job element
772         * @throws CoordinatorJobException thrown if failed to resolve input and output events
773         */
774        @SuppressWarnings("unchecked")
775        private void resolveIOEvents(Element eJobOrg, HashMap<String, String> dataNameList) throws CoordinatorJobException {
776            // Resolving input-events/data-in
777            // Clone the job and don't update anything in the original
778            Element eJob = (Element) eJobOrg.clone();
779            Element inputList = eJob.getChild("input-events", eJob.getNamespace());
780            if (inputList != null) {
781                TreeSet<String> eventNameSet = new TreeSet<String>();
782                for (Element dataIn : (List<Element>) inputList.getChildren("data-in", eJob.getNamespace())) {
783                    String dataInName = dataIn.getAttributeValue("name");
784                    dataNameList.put(dataInName, "data-in");
785                    // check whether there is any duplicate data-in name
786                    if (eventNameSet.contains(dataInName)) {
787                        throw new RuntimeException("Duplicate dataIn name " + dataInName);
788                    }
789                    else {
790                        eventNameSet.add(dataInName);
791                    }
792                    resolveTagContents("instance", dataIn, evalInst);
793                    resolveTagContents("start-instance", dataIn, evalInst);
794                    resolveTagContents("end-instance", dataIn, evalInst);
795                }
796            }
797            // Resolving output-events/data-out
798            Element outputList = eJob.getChild("output-events", eJob.getNamespace());
799            if (outputList != null) {
800                TreeSet<String> eventNameSet = new TreeSet<String>();
801                for (Element dataOut : (List<Element>) outputList.getChildren("data-out", eJob.getNamespace())) {
802                    String dataOutName = dataOut.getAttributeValue("name");
803                    dataNameList.put(dataOutName, "data-out");
804                    // check whether there is any duplicate data-out name
805                    if (eventNameSet.contains(dataOutName)) {
806                        throw new RuntimeException("Duplicate dataIn name " + dataOutName);
807                    }
808                    else {
809                        eventNameSet.add(dataOutName);
810                    }
811                    resolveTagContents("instance", dataOut, evalInst);
812                }
813            }
814    
815        }
816    
817        /**
818         * Add an attribute into XML element.
819         *
820         * @param attrName :attribute name
821         * @param elem : Element to add attribute
822         * @param value :Value of attribute
823         */
824        private void addAnAttribute(String attrName, Element elem, String value) {
825            elem.setAttribute(attrName, value);
826        }
827    
828        /**
829         * Resolve datasets using job configuration.
830         *
831         * @param eAppXml : Job Element XML
832         * @throws Exception thrown if failed to resolve datasets
833         */
834        @SuppressWarnings("unchecked")
835        private void resolveDataSets(Element eAppXml) throws Exception {
836            Element datasetList = eAppXml.getChild("datasets", eAppXml.getNamespace());
837            if (datasetList != null) {
838    
839                List<Element> dsElems = datasetList.getChildren("dataset", eAppXml.getNamespace());
840                resolveDataSets(dsElems);
841                resolveTagContents("app-path", eAppXml.getChild("action", eAppXml.getNamespace()).getChild("workflow",
842                        eAppXml.getNamespace()), evalNofuncs);
843            }
844        }
845    
846        /**
847         * Resolve datasets using job configuration.
848         *
849         * @param dsElems : Data set XML element.
850         * @throws CoordinatorJobException thrown if failed to resolve datasets
851         */
852        private void resolveDataSets(List<Element> dsElems) throws CoordinatorJobException {
853            for (Element dsElem : dsElems) {
854                // Setting up default TimeUnit and EndOFDuraion
855                evalFreq.setVariable("timeunit", TimeUnit.MINUTE);
856                evalFreq.setVariable("endOfDuration", TimeUnit.NONE);
857    
858                String val = resolveAttribute("frequency", dsElem, evalFreq);
859                int ival = ParamChecker.checkInteger(val, "frequency");
860                ParamChecker.checkGTZero(ival, "frequency");
861                addAnAttribute("freq_timeunit", dsElem, evalFreq.getVariable("timeunit") == null ? TimeUnit.MINUTE
862                        .toString() : ((TimeUnit) evalFreq.getVariable("timeunit")).toString());
863                addAnAttribute("end_of_duration", dsElem, evalFreq.getVariable("endOfDuration") == null ? TimeUnit.NONE
864                        .toString() : ((TimeUnit) evalFreq.getVariable("endOfDuration")).toString());
865                val = resolveAttribute("initial-instance", dsElem, evalNofuncs);
866                ParamChecker.checkDateOozieTZ(val, "initial-instance");
867                checkInitialInstance(val);
868                val = resolveAttribute("timezone", dsElem, evalNofuncs);
869                ParamChecker.checkTimeZone(val, "timezone");
870                resolveTagContents("uri-template", dsElem, evalNofuncs);
871                resolveTagContents("done-flag", dsElem, evalNofuncs);
872            }
873        }
874    
875        /**
876         * Resolve the content of a tag.
877         *
878         * @param tagName : Tag name of job XML i.e. <timeout> 10 </timeout>
879         * @param elem : Element where the tag exists.
880         * @param eval : EL evealuator
881         * @return Resolved tag content.
882         * @throws CoordinatorJobException thrown if failed to resolve tag content
883         */
884        @SuppressWarnings("unchecked")
885        private String resolveTagContents(String tagName, Element elem, ELEvaluator eval) throws CoordinatorJobException {
886            String ret = "";
887            if (elem != null) {
888                for (Element tagElem : (List<Element>) elem.getChildren(tagName, elem.getNamespace())) {
889                    if (tagElem != null) {
890                        String updated;
891                        try {
892                            updated = CoordELFunctions.evalAndWrap(eval, tagElem.getText().trim());
893    
894                        }
895                        catch (Exception e) {
896                            throw new CoordinatorJobException(ErrorCode.E1004, e.getMessage(), e);
897                        }
898                        tagElem.removeContent();
899                        tagElem.addContent(updated);
900                        ret += updated;
901                    }
902                }
903            }
904            return ret;
905        }
906    
907        /**
908         * Resolve an attribute value.
909         *
910         * @param attrName : Attribute name.
911         * @param elem : XML Element where attribute is defiend
912         * @param eval : ELEvaluator used to resolve
913         * @return Resolved attribute value
914         * @throws CoordinatorJobException thrown if failed to resolve an attribute value
915         */
916        private String resolveAttribute(String attrName, Element elem, ELEvaluator eval) throws CoordinatorJobException {
917            Attribute attr = elem.getAttribute(attrName);
918            String val = null;
919            if (attr != null) {
920                try {
921                    val = CoordELFunctions.evalAndWrap(eval, attr.getValue().trim());
922                }
923                catch (Exception e) {
924                    throw new CoordinatorJobException(ErrorCode.E1004, e.getMessage(), e);
925                }
926                attr.setValue(val);
927            }
928            return val;
929        }
930    
931        /**
932         * Include referred datasets into XML.
933         *
934         * @param resolvedXml : Job XML element.
935         * @param conf : Job configuration
936         * @throws CoordinatorJobException thrown if failed to include referred datasets into XML
937         */
938        @SuppressWarnings("unchecked")
939        protected void includeDataSets(Element resolvedXml, Configuration conf) throws CoordinatorJobException {
940            Element datasets = resolvedXml.getChild("datasets", resolvedXml.getNamespace());
941            Element allDataSets = new Element("all_datasets", resolvedXml.getNamespace());
942            List<String> dsList = new ArrayList<String>();
943            if (datasets != null) {
944                for (Element includeElem : (List<Element>) datasets.getChildren("include", datasets.getNamespace())) {
945                    String incDSFile = includeElem.getTextTrim();
946                    includeOneDSFile(incDSFile, dsList, allDataSets, datasets.getNamespace());
947                }
948                for (Element e : (List<Element>) datasets.getChildren("dataset", datasets.getNamespace())) {
949                    String dsName = e.getAttributeValue("name");
950                    if (dsList.contains(dsName)) {// Override with this DS
951                        // Remove old DS
952                        removeDataSet(allDataSets, dsName);
953                    }
954                    else {
955                        dsList.add(dsName);
956                    }
957                    allDataSets.addContent((Element) e.clone());
958                }
959            }
960            insertDataSet(resolvedXml, allDataSets);
961            resolvedXml.removeChild("datasets", resolvedXml.getNamespace());
962        }
963    
964        /**
965         * Include one dataset file.
966         *
967         * @param incDSFile : Include data set filename.
968         * @param dsList :List of dataset names to verify the duplicate.
969         * @param allDataSets : Element that includes all dataset definitions.
970         * @param dsNameSpace : Data set name space
971         * @throws CoordinatorJobException thrown if failed to include one dataset file
972         */
973        @SuppressWarnings("unchecked")
974        private void includeOneDSFile(String incDSFile, List<String> dsList, Element allDataSets, Namespace dsNameSpace)
975        throws CoordinatorJobException {
976            Element tmpDataSets = null;
977            try {
978                String dsXml = readDefinition(incDSFile);
979                LOG.debug("DSFILE :" + incDSFile + "\n" + dsXml);
980                tmpDataSets = XmlUtils.parseXml(dsXml);
981            }
982            catch (JDOMException e) {
983                LOG.warn("Error parsing included dataset [{0}].  Message [{1}]", incDSFile, e.getMessage());
984                throw new CoordinatorJobException(ErrorCode.E0700, e.getMessage());
985            }
986            resolveDataSets(tmpDataSets.getChildren("dataset"));
987            for (Element e : (List<Element>) tmpDataSets.getChildren("dataset")) {
988                String dsName = e.getAttributeValue("name");
989                if (dsList.contains(dsName)) {
990                    throw new RuntimeException("Duplicate Dataset " + dsName);
991                }
992                dsList.add(dsName);
993                Element tmp = (Element) e.clone();
994                // TODO: Don't like to over-write the external/include DS's namespace
995                tmp.setNamespace(dsNameSpace);
996                tmp.getChild("uri-template").setNamespace(dsNameSpace);
997                if (e.getChild("done-flag") != null) {
998                    tmp.getChild("done-flag").setNamespace(dsNameSpace);
999                }
1000                allDataSets.addContent(tmp);
1001            }
1002            // nested include
1003            for (Element includeElem : (List<Element>) tmpDataSets.getChildren("include", tmpDataSets.getNamespace())) {
1004                String incFile = includeElem.getTextTrim();
1005                includeOneDSFile(incFile, dsList, allDataSets, dsNameSpace);
1006            }
1007        }
1008    
1009        /**
1010         * Remove a dataset from a list of dataset.
1011         *
1012         * @param eDatasets : List of dataset
1013         * @param name : Dataset name to be removed.
1014         */
1015        @SuppressWarnings("unchecked")
1016        private static void removeDataSet(Element eDatasets, String name) {
1017            for (Element eDataset : (List<Element>) eDatasets.getChildren("dataset", eDatasets.getNamespace())) {
1018                if (eDataset.getAttributeValue("name").equals(name)) {
1019                    eDataset.detach();
1020                }
1021            }
1022            throw new RuntimeException("undefined dataset: " + name);
1023        }
1024    
1025        /**
1026         * Read coordinator definition.
1027         *
1028         * @param appPath application path.
1029         * @return coordinator definition.
1030         * @throws CoordinatorJobException thrown if the definition could not be read.
1031         */
1032        protected String readDefinition(String appPath) throws CoordinatorJobException {
1033            String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
1034            // Configuration confHadoop = CoordUtils.getHadoopConf(conf);
1035            try {
1036                URI uri = new URI(appPath);
1037                LOG.debug("user =" + user);
1038                HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
1039                Configuration fsConf = has.createJobConf(uri.getAuthority());
1040                FileSystem fs = has.createFileSystem(user, uri, fsConf);
1041                Path appDefPath = null;
1042    
1043                // app path could be a directory
1044                Path path = new Path(uri.getPath());
1045                // check file exists for dataset include file, app xml already checked
1046                if (!fs.exists(path)) {
1047                    throw new URISyntaxException(path.toString(), "path not existed : " + path.toString());
1048                }
1049                if (!fs.isFile(path)) {
1050                    appDefPath = new Path(path, COORDINATOR_XML_FILE);
1051                } else {
1052                    appDefPath = path;
1053                }
1054    
1055                Reader reader = new InputStreamReader(fs.open(appDefPath));
1056                StringWriter writer = new StringWriter();
1057                IOUtils.copyCharStream(reader, writer);
1058                return writer.toString();
1059            }
1060            catch (IOException ex) {
1061                LOG.warn("IOException :" + XmlUtils.prettyPrint(conf), ex);
1062                throw new CoordinatorJobException(ErrorCode.E1001, ex.getMessage(), ex);
1063            }
1064            catch (URISyntaxException ex) {
1065                LOG.warn("URISyException :" + ex.getMessage());
1066                throw new CoordinatorJobException(ErrorCode.E1002, appPath, ex.getMessage(), ex);
1067            }
1068            catch (HadoopAccessorException ex) {
1069                throw new CoordinatorJobException(ex);
1070            }
1071            catch (Exception ex) {
1072                LOG.warn("Exception :", ex);
1073                throw new CoordinatorJobException(ErrorCode.E1001, ex.getMessage(), ex);
1074            }
1075        }
1076    
1077        /**
1078         * Write a coordinator job into database
1079         *
1080         * @param eJob : XML element of job
1081         * @param coordJob : Coordinator job bean
1082         * @return Job id
1083         * @throws CommandException thrown if unable to save coordinator job to db
1084         */
1085        private String storeToDB(Element eJob, CoordinatorJobBean coordJob) throws CommandException {
1086            String jobId = Services.get().get(UUIDService.class).generateId(ApplicationType.COORDINATOR);
1087            coordJob.setId(jobId);
1088            coordJob.setAuthToken(this.authToken);
1089    
1090            coordJob.setAppPath(conf.get(OozieClient.COORDINATOR_APP_PATH));
1091            coordJob.setCreatedTime(new Date());
1092            coordJob.setUser(conf.get(OozieClient.USER_NAME));
1093            String group = ConfigUtils.getWithDeprecatedCheck(conf, OozieClient.JOB_ACL, OozieClient.GROUP_NAME, null);
1094            coordJob.setGroup(group);
1095            coordJob.setConf(XmlUtils.prettyPrint(conf).toString());
1096            coordJob.setJobXml(XmlUtils.prettyPrint(eJob).toString());
1097            coordJob.setLastActionNumber(0);
1098            coordJob.setLastModifiedTime(new Date());
1099    
1100            if (!dryrun) {
1101                coordJob.setLastModifiedTime(new Date());
1102                try {
1103                    jpaService.execute(new CoordJobInsertJPAExecutor(coordJob));
1104                }
1105                catch (JPAExecutorException je) {
1106                    throw new CommandException(je);
1107                }
1108            }
1109            return jobId;
1110        }
1111    
1112        /*
1113         * this method checks if the initial-instance specified for a particular
1114           is not a date earlier than the oozie server default Jan 01, 1970 00:00Z UTC
1115         */
1116        private void checkInitialInstance(String val) throws CoordinatorJobException, IllegalArgumentException {
1117            Date initialInstance, givenInstance;
1118            try {
1119                initialInstance = DateUtils.parseDateUTC("1970-01-01T00:00Z");
1120                givenInstance = DateUtils.parseDateOozieTZ(val);
1121            }
1122            catch (Exception e) {
1123                throw new IllegalArgumentException("Unable to parse dataset initial-instance string '" + val +
1124                                                   "' to Date object. ",e);
1125            }
1126            if(givenInstance.compareTo(initialInstance) < 0) {
1127                throw new CoordinatorJobException(ErrorCode.E1021, "Dataset initial-instance " + val +
1128                        " is earlier than the default initial instance " + DateUtils.formatDateOozieTZ(initialInstance));
1129            }
1130        }
1131    
1132        /* (non-Javadoc)
1133         * @see org.apache.oozie.command.XCommand#getEntityKey()
1134         */
1135        @Override
1136        public String getEntityKey() {
1137            return null;
1138        }
1139    
1140        /* (non-Javadoc)
1141         * @see org.apache.oozie.command.XCommand#isLockRequired()
1142         */
1143        @Override
1144        protected boolean isLockRequired() {
1145            return false;
1146        }
1147    
1148        /* (non-Javadoc)
1149         * @see org.apache.oozie.command.XCommand#loadState()
1150         */
1151        @Override
1152        protected void loadState() throws CommandException {
1153            jpaService = Services.get().get(JPAService.class);
1154            if (jpaService == null) {
1155                throw new CommandException(ErrorCode.E0610);
1156            }
1157            coordJob = new CoordinatorJobBean();
1158            if (this.bundleId != null) {
1159                // this coord job is created from bundle
1160                coordJob.setBundleId(this.bundleId);
1161                // first use bundle id if submit thru bundle
1162                logInfo.setParameter(DagXLogInfoService.JOB, this.bundleId);
1163                LogUtils.setLogInfo(logInfo);
1164            }
1165            if (this.coordName != null) {
1166                // this coord job is created from bundle
1167                coordJob.setAppName(this.coordName);
1168            }
1169            setJob(coordJob);
1170    
1171        }
1172    
1173        /* (non-Javadoc)
1174         * @see org.apache.oozie.command.XCommand#verifyPrecondition()
1175         */
1176        @Override
1177        protected void verifyPrecondition() throws CommandException {
1178    
1179        }
1180    
1181        /* (non-Javadoc)
1182         * @see org.apache.oozie.command.TransitionXCommand#notifyParent()
1183         */
1184        @Override
1185        public void notifyParent() throws CommandException {
1186            // update bundle action
1187            if (coordJob.getBundleId() != null) {
1188                LOG.debug("Updating bundle record: " + coordJob.getBundleId() + " for coord id: " + coordJob.getId());
1189                BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus);
1190                bundleStatusUpdate.call();
1191            }
1192        }
1193    
1194        /* (non-Javadoc)
1195         * @see org.apache.oozie.command.TransitionXCommand#updateJob()
1196         */
1197        @Override
1198        public void updateJob() throws CommandException {
1199        }
1200    
1201        /* (non-Javadoc)
1202         * @see org.apache.oozie.command.TransitionXCommand#getJob()
1203         */
1204        @Override
1205        public Job getJob() {
1206            return coordJob;
1207        }
1208    
1209        @Override
1210        public void performWrites() throws CommandException {
1211        }
1212    }