001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.coord;
019    
020    import java.io.IOException;
021    import java.io.InputStreamReader;
022    import java.io.Reader;
023    import java.io.StringReader;
024    import java.io.StringWriter;
025    import java.net.URI;
026    import java.net.URISyntaxException;
027    import java.text.DateFormat;
028    import java.text.SimpleDateFormat;
029    import java.util.ArrayList;
030    import java.util.Date;
031    import java.util.HashMap;
032    import java.util.HashSet;
033    import java.util.Iterator;
034    import java.util.List;
035    import java.util.Map;
036    import java.util.Set;
037    import java.util.TimeZone;
038    import java.util.TreeSet;
039    
040    import javax.xml.transform.stream.StreamSource;
041    import javax.xml.validation.Validator;
042    
043    import org.apache.hadoop.conf.Configuration;
044    import org.apache.hadoop.fs.FileSystem;
045    import org.apache.hadoop.fs.Path;
046    import org.apache.oozie.CoordinatorJobBean;
047    import org.apache.oozie.ErrorCode;
048    import org.apache.oozie.client.CoordinatorJob;
049    import org.apache.oozie.client.Job;
050    import org.apache.oozie.client.OozieClient;
051    import org.apache.oozie.client.CoordinatorJob.Execution;
052    import org.apache.oozie.command.CommandException;
053    import org.apache.oozie.command.SubmitTransitionXCommand;
054    import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
055    import org.apache.oozie.coord.CoordELEvaluator;
056    import org.apache.oozie.coord.CoordELFunctions;
057    import org.apache.oozie.coord.CoordinatorJobException;
058    import org.apache.oozie.coord.TimeUnit;
059    import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor;
060    import org.apache.oozie.executor.jpa.JPAExecutorException;
061    import org.apache.oozie.service.DagXLogInfoService;
062    import org.apache.oozie.service.HadoopAccessorException;
063    import org.apache.oozie.service.HadoopAccessorService;
064    import org.apache.oozie.service.JPAService;
065    import org.apache.oozie.service.SchemaService;
066    import org.apache.oozie.service.Service;
067    import org.apache.oozie.service.Services;
068    import org.apache.oozie.service.UUIDService;
069    import org.apache.oozie.service.WorkflowAppService;
070    import org.apache.oozie.service.SchemaService.SchemaName;
071    import org.apache.oozie.service.UUIDService.ApplicationType;
072    import org.apache.oozie.util.ConfigUtils;
073    import org.apache.oozie.util.DateUtils;
074    import org.apache.oozie.util.ELEvaluator;
075    import org.apache.oozie.util.IOUtils;
076    import org.apache.oozie.util.InstrumentUtils;
077    import org.apache.oozie.util.LogUtils;
078    import org.apache.oozie.util.ParamChecker;
079    import org.apache.oozie.util.PropertiesUtils;
080    import org.apache.oozie.util.XConfiguration;
081    import org.apache.oozie.util.XLog;
082    import org.apache.oozie.util.XmlUtils;
083    import org.jdom.Attribute;
084    import org.jdom.Element;
085    import org.jdom.JDOMException;
086    import org.jdom.Namespace;
087    import org.xml.sax.SAXException;
088    
089    /**
090     * This class provides the functionalities to resolve a coordinator job XML and write the job information into a DB
091     * table.
092     * <p/>
093     * Specifically it performs the following functions: 1. Resolve all the variables or properties using job
094     * configurations. 2. Insert all datasets definition as part of the <data-in> and <data-out> tags. 3. Validate the XML
095     * at runtime.
096     */
097    public class CoordSubmitXCommand extends SubmitTransitionXCommand {
098    
099        private Configuration conf;
100        private final String authToken;
101        private final String bundleId;
102        private final String coordName;
103        private boolean dryrun;
104        private JPAService jpaService = null;
105        private CoordinatorJob.Status prevStatus = CoordinatorJob.Status.PREP;
106    
107        public static final String CONFIG_DEFAULT = "coord-config-default.xml";
108        public static final String COORDINATOR_XML_FILE = "coordinator.xml";
109        public final String COORD_INPUT_EVENTS ="input-events";
110        public final String COORD_OUTPUT_EVENTS = "output-events";
111        public final String COORD_INPUT_EVENTS_DATA_IN ="data-in";
112        public final String COORD_OUTPUT_EVENTS_DATA_OUT = "data-out";
113    
114        private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>();
115        private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>();
116    
117        private CoordinatorJobBean coordJob = null;
118        /**
119         * Default timeout for normal jobs, in minutes, after which coordinator input check will timeout
120         */
121        public static final String CONF_DEFAULT_TIMEOUT_NORMAL = Service.CONF_PREFIX + "coord.normal.default.timeout";
122    
123        public static final String CONF_DEFAULT_CONCURRENCY = Service.CONF_PREFIX + "coord.default.concurrency";
124    
125        public static final String CONF_DEFAULT_THROTTLE = Service.CONF_PREFIX + "coord.default.throttle";
126    
127        public static final String CONF_MAT_THROTTLING_FACTOR = Service.CONF_PREFIX
128                + "coord.materialization.throttling.factor";
129    
130        /**
131         * Default MAX timeout in minutes, after which coordinator input check will timeout
132         */
133        public static final String CONF_DEFAULT_MAX_TIMEOUT = Service.CONF_PREFIX + "coord.default.max.timeout";
134    
135    
136        public static final String CONF_QUEUE_SIZE = Service.CONF_PREFIX + "CallableQueueService.queue.size";
137    
138        private ELEvaluator evalFreq = null;
139        private ELEvaluator evalNofuncs = null;
140        private ELEvaluator evalData = null;
141        private ELEvaluator evalInst = null;
142        private ELEvaluator evalAction = null;
143        private ELEvaluator evalSla = null;
144    
145        static {
146            String[] badUserProps = { PropertiesUtils.YEAR, PropertiesUtils.MONTH, PropertiesUtils.DAY,
147                    PropertiesUtils.HOUR, PropertiesUtils.MINUTE, PropertiesUtils.DAYS, PropertiesUtils.HOURS,
148                    PropertiesUtils.MINUTES, PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB,
149                    PropertiesUtils.TB, PropertiesUtils.PB, PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN,
150                    PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN, PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS };
151            PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES);
152    
153            String[] badDefaultProps = { PropertiesUtils.HADOOP_USER};
154            PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES);
155            PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES);
156        }
157    
158        /**
159         * Constructor to create the Coordinator Submit Command.
160         *
161         * @param conf : Configuration for Coordinator job
162         * @param authToken : To be used for authentication
163         */
164        public CoordSubmitXCommand(Configuration conf, String authToken) {
165            super("coord_submit", "coord_submit", 1);
166            this.conf = ParamChecker.notNull(conf, "conf");
167            this.authToken = ParamChecker.notEmpty(authToken, "authToken");
168            this.bundleId = null;
169            this.coordName = null;
170        }
171    
172        /**
173         * Constructor to create the Coordinator Submit Command by bundle job.
174         *
175         * @param conf : Configuration for Coordinator job
176         * @param authToken : To be used for authentication
177         * @param bundleId : bundle id
178         * @param coordName : coord name
179         */
180        public CoordSubmitXCommand(Configuration conf, String authToken, String bundleId, String coordName) {
181            super("coord_submit", "coord_submit", 1);
182            this.conf = ParamChecker.notNull(conf, "conf");
183            this.authToken = ParamChecker.notEmpty(authToken, "authToken");
184            this.bundleId = ParamChecker.notEmpty(bundleId, "bundleId");
185            this.coordName = ParamChecker.notEmpty(coordName, "coordName");
186        }
187    
188        /**
189         * Constructor to create the Coordinator Submit Command.
190         *
191         * @param dryrun : if dryrun
192         * @param conf : Configuration for Coordinator job
193         * @param authToken : To be used for authentication
194         */
195        public CoordSubmitXCommand(boolean dryrun, Configuration conf, String authToken) {
196            this(conf, authToken);
197            this.dryrun = dryrun;
198        }
199    
200        /* (non-Javadoc)
201         * @see org.apache.oozie.command.XCommand#execute()
202         */
203        @Override
204        protected String submit() throws CommandException {
205            String jobId = null;
206            LOG.info("STARTED Coordinator Submit");
207            InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
208    
209            boolean exceptionOccured = false;
210            try {
211                mergeDefaultConfig();
212    
213                String appXml = readAndValidateXml();
214                coordJob.setOrigJobXml(appXml);
215                LOG.debug("jobXml after initial validation " + XmlUtils.prettyPrint(appXml).toString());
216    
217                String appNamespace = readAppNamespace(appXml);
218                coordJob.setAppNamespace(appNamespace);
219    
220                appXml = XmlUtils.removeComments(appXml);
221                initEvaluators();
222                Element eJob = basicResolveAndIncludeDS(appXml, conf, coordJob);
223    
224                // checking if the coordinator application data input/output events
225                // specify multiple data instance values in erroneous manner
226                checkMultipleTimeInstances(eJob, COORD_INPUT_EVENTS, COORD_INPUT_EVENTS_DATA_IN);
227                checkMultipleTimeInstances(eJob, COORD_OUTPUT_EVENTS, COORD_OUTPUT_EVENTS_DATA_OUT);
228    
229                LOG.debug("jobXml after all validation " + XmlUtils.prettyPrint(eJob).toString());
230    
231                jobId = storeToDB(eJob, coordJob);
232                // log job info for coordinator job
233                LogUtils.setLogInfo(coordJob, logInfo);
234                LOG = XLog.resetPrefix(LOG);
235    
236                if (!dryrun) {
237                    // submit a command to materialize jobs for the next 1 hour (3600 secs)
238                    // so we don't wait 10 mins for the Service to run.
239                    queue(new CoordMaterializeTransitionXCommand(jobId, 3600), 100);
240                }
241                else {
242                    Date startTime = coordJob.getStartTime();
243                    long startTimeMilli = startTime.getTime();
244                    long endTimeMilli = startTimeMilli + (3600 * 1000);
245                    Date jobEndTime = coordJob.getEndTime();
246                    Date endTime = new Date(endTimeMilli);
247                    if (endTime.compareTo(jobEndTime) > 0) {
248                        endTime = jobEndTime;
249                    }
250                    jobId = coordJob.getId();
251                    LOG.info("[" + jobId + "]: Update status to RUNNING");
252                    coordJob.setStatus(Job.Status.RUNNING);
253                    coordJob.setPending();
254                    CoordActionMaterializeCommand coordActionMatCom = new CoordActionMaterializeCommand(jobId, startTime,
255                            endTime);
256                    Configuration jobConf = null;
257                    try {
258                        jobConf = new XConfiguration(new StringReader(coordJob.getConf()));
259                    }
260                    catch (IOException e1) {
261                        LOG.warn("Configuration parse error. read from DB :" + coordJob.getConf(), e1);
262                    }
263                    String action = coordActionMatCom.materializeJobs(true, coordJob, jobConf, null);
264                    String output = coordJob.getJobXml() + System.getProperty("line.separator")
265                    + "***actions for instance***" + action;
266                    return output;
267                }
268            }
269            catch (CoordinatorJobException cex) {
270                exceptionOccured = true;
271                LOG.warn("ERROR:  ", cex);
272                throw new CommandException(cex);
273            }
274            catch (IllegalArgumentException iex) {
275                exceptionOccured = true;
276                LOG.warn("ERROR:  ", iex);
277                throw new CommandException(ErrorCode.E1003, iex);
278            }
279            catch (Exception ex) {
280                exceptionOccured = true;
281                LOG.warn("ERROR:  ", ex);
282                throw new CommandException(ErrorCode.E0803, ex);
283            }
284            finally {
285                if (exceptionOccured) {
286                    if(coordJob.getId() == null || coordJob.getId().equalsIgnoreCase("")){
287                        coordJob.setStatus(CoordinatorJob.Status.FAILED);
288                        coordJob.resetPending();
289                    }
290                }
291            }
292    
293            LOG.info("ENDED Coordinator Submit jobId=" + jobId);
294            return jobId;
295        }
296    
297        /*
298         * Check against multiple data instance values inside a single <instance> tag
299         * If found, the job is not submitted and user is informed to correct the error, instead of defaulting to the first instance value in the list
300         */
301        private void checkMultipleTimeInstances(Element eCoordJob, String eventType, String dataType) throws CoordinatorJobException {
302            Element eventsSpec, dataSpec, instance;
303            List<Element> instanceSpecList;
304            Namespace ns = eCoordJob.getNamespace();
305            String instanceValue;
306            eventsSpec = eCoordJob.getChild(eventType, ns);
307            if (eventsSpec != null) {
308                dataSpec = eventsSpec.getChild(dataType, ns);
309                if (dataSpec != null) {
310                    // In case of input-events, there can be multiple child <instance> datasets. Iterating to ensure none of them have errors
311                    instanceSpecList = dataSpec.getChildren("instance", ns);
312                    Iterator instanceIter = instanceSpecList.iterator();
313                    while(instanceIter.hasNext()) {
314                        instance = ((Element) instanceIter.next());
315                        if(instance.getContentSize() == 0) { //empty string or whitespace
316                            throw new CoordinatorJobException(ErrorCode.E1021, "<instance> tag within " + eventType + " is empty!");
317                        }
318                        instanceValue = instance.getContent(0).toString();
319                        boolean isInvalid = false;
320                        try {
321                            isInvalid = evalAction.checkForExistence(instanceValue, ",");
322                        } catch (Exception e) {
323                            handleELParseException(eventType, dataType, instanceValue);
324                        }
325                        if (isInvalid) { // reaching this block implies instance is not empty i.e. length > 0
326                            handleExpresionWithMultipleInstances(eventType, dataType, instanceValue);
327                        }
328                    }
329                }
330            }
331        }
332    
333        private void handleELParseException(String eventType, String dataType, String instanceValue)
334                throws CoordinatorJobException {
335            String correctAction = null;
336            if(dataType.equals(COORD_INPUT_EVENTS_DATA_IN)) {
337                correctAction = "Coordinator app definition should have valid <instance> tag for data-in";
338            } else if(dataType.equals(COORD_OUTPUT_EVENTS_DATA_OUT)) {
339                correctAction = "Coordinator app definition should have valid <instance> tag for data-out";
340            }
341            throw new CoordinatorJobException(ErrorCode.E1021, eventType + " instance '" + instanceValue
342                    + "' is not valid. Coordinator job NOT SUBMITTED. " + correctAction);
343        }
344    
345        private void handleExpresionWithMultipleInstances(String eventType, String dataType, String instanceValue)
346                throws CoordinatorJobException {
347            String correctAction = null;
348            if(dataType.equals(COORD_INPUT_EVENTS_DATA_IN)) {
349                correctAction = "Coordinator app definition should have separate <instance> tag per data-in instance";
350            } else if(dataType.equals(COORD_OUTPUT_EVENTS_DATA_OUT)) {
351                correctAction = "Coordinator app definition can have only one <instance> tag per data-out instance";
352            }
353            throw new CoordinatorJobException(ErrorCode.E1021, eventType + " instance '" + instanceValue
354                    + "' contains more than one date instance. Coordinator job NOT SUBMITTED. " + correctAction);
355        }
356    
357        /**
358         * Read the application XML and validate against coordinator Schema
359         *
360         * @return validated coordinator XML
361         * @throws CoordinatorJobException thrown if unable to read or validate coordinator xml
362         */
363        private String readAndValidateXml() throws CoordinatorJobException {
364            String appPath = ParamChecker.notEmpty(conf.get(OozieClient.COORDINATOR_APP_PATH),
365                    OozieClient.COORDINATOR_APP_PATH);
366            String coordXml = readDefinition(appPath);
367            validateXml(coordXml);
368            return coordXml;
369        }
370    
371        /**
372         * Validate against Coordinator XSD file
373         *
374         * @param xmlContent : Input coordinator xml
375         * @throws CoordinatorJobException thrown if unable to validate coordinator xml
376         */
377        private void validateXml(String xmlContent) throws CoordinatorJobException {
378            javax.xml.validation.Schema schema = Services.get().get(SchemaService.class).getSchema(SchemaName.COORDINATOR);
379            Validator validator = schema.newValidator();
380            try {
381                validator.validate(new StreamSource(new StringReader(xmlContent)));
382            }
383            catch (SAXException ex) {
384                LOG.warn("SAXException :", ex);
385                throw new CoordinatorJobException(ErrorCode.E0701, ex.getMessage(), ex);
386            }
387            catch (IOException ex) {
388                LOG.warn("IOException :", ex);
389                throw new CoordinatorJobException(ErrorCode.E0702, ex.getMessage(), ex);
390            }
391        }
392    
393        /**
394         * Read the application XML schema namespace
395         *
396         * @param xmlContent input coordinator xml
397         * @return app xml namespace
398         * @throws CoordinatorJobException
399         */
400        private String readAppNamespace(String xmlContent) throws CoordinatorJobException {
401            try {
402                Element coordXmlElement = XmlUtils.parseXml(xmlContent);
403                Namespace ns = coordXmlElement.getNamespace();
404                if (ns != null && bundleId != null && ns.getURI().equals(SchemaService.COORDINATOR_NAMESPACE_URI_1)) {
405                    throw new CoordinatorJobException(ErrorCode.E1319, "bundle app can not submit coordinator namespace "
406                            + SchemaService.COORDINATOR_NAMESPACE_URI_1 + ", please use 0.2 or later");
407                }
408                if (ns != null) {
409                    return ns.getURI();
410                }
411                else {
412                    throw new CoordinatorJobException(ErrorCode.E0700, "the application xml namespace is not given");
413                }
414            }
415            catch (JDOMException ex) {
416                LOG.warn("JDOMException :", ex);
417                throw new CoordinatorJobException(ErrorCode.E0700, ex.getMessage(), ex);
418            }
419        }
420    
421        /**
422         * Merge default configuration with user-defined configuration.
423         *
424         * @throws CommandException thrown if failed to read or merge configurations
425         */
426        protected void mergeDefaultConfig() throws CommandException {
427            Path configDefault = null;
428            try {
429                String coordAppPathStr = conf.get(OozieClient.COORDINATOR_APP_PATH);
430                Path coordAppPath = new Path(coordAppPathStr);
431                String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
432                HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
433                Configuration fsConf = has.createJobConf(coordAppPath.toUri().getAuthority());
434                FileSystem fs = has.createFileSystem(user, coordAppPath.toUri(), fsConf);
435    
436                // app path could be a directory
437                if (!fs.isFile(coordAppPath)) {
438                    configDefault = new Path(coordAppPath, CONFIG_DEFAULT);
439                } else {
440                    configDefault = new Path(coordAppPath.getParent(), CONFIG_DEFAULT);
441                }
442    
443                if (fs.exists(configDefault)) {
444                    Configuration defaultConf = new XConfiguration(fs.open(configDefault));
445                    PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES);
446                    XConfiguration.injectDefaults(defaultConf, conf);
447                }
448                else {
449                    LOG.info("configDefault Doesn't exist " + configDefault);
450                }
451                PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES);
452    
453                // Resolving all variables in the job properties.
454                // This ensures the Hadoop Configuration semantics is preserved.
455                XConfiguration resolvedVarsConf = new XConfiguration();
456                for (Map.Entry<String, String> entry : conf) {
457                    resolvedVarsConf.set(entry.getKey(), conf.get(entry.getKey()));
458                }
459                conf = resolvedVarsConf;
460            }
461            catch (IOException e) {
462                throw new CommandException(ErrorCode.E0702, e.getMessage() + " : Problem reading default config "
463                        + configDefault, e);
464            }
465            catch (HadoopAccessorException e) {
466                throw new CommandException(e);
467            }
468            LOG.debug("Merged CONF :" + XmlUtils.prettyPrint(conf).toString());
469        }
470    
471        /**
472         * The method resolve all the variables that are defined in configuration. It also include the data set definition
473         * from dataset file into XML.
474         *
475         * @param appXml : Original job XML
476         * @param conf : Configuration of the job
477         * @param coordJob : Coordinator job bean to be populated.
478         * @return Resolved and modified job XML element.
479         * @throws CoordinatorJobException thrown if failed to resolve basic entities or include referred datasets
480         * @throws Exception thrown if failed to resolve basic entities or include referred datasets
481         */
482        public Element basicResolveAndIncludeDS(String appXml, Configuration conf, CoordinatorJobBean coordJob)
483        throws CoordinatorJobException, Exception {
484            Element basicResolvedApp = resolveInitial(conf, appXml, coordJob);
485            includeDataSets(basicResolvedApp, conf);
486            return basicResolvedApp;
487        }
488    
489        /**
490         * Insert data set into data-in and data-out tags.
491         *
492         * @param eAppXml : coordinator application XML
493         * @param eDatasets : DataSet XML
494         */
495        @SuppressWarnings("unchecked")
496        private void insertDataSet(Element eAppXml, Element eDatasets) {
497            // Adding DS definition in the coordinator XML
498            Element inputList = eAppXml.getChild("input-events", eAppXml.getNamespace());
499            if (inputList != null) {
500                for (Element dataIn : (List<Element>) inputList.getChildren("data-in", eAppXml.getNamespace())) {
501                    Element eDataset = findDataSet(eDatasets, dataIn.getAttributeValue("dataset"));
502                    dataIn.getContent().add(0, eDataset);
503                }
504            }
505            Element outputList = eAppXml.getChild("output-events", eAppXml.getNamespace());
506            if (outputList != null) {
507                for (Element dataOut : (List<Element>) outputList.getChildren("data-out", eAppXml.getNamespace())) {
508                    Element eDataset = findDataSet(eDatasets, dataOut.getAttributeValue("dataset"));
509                    dataOut.getContent().add(0, eDataset);
510                }
511            }
512        }
513    
514        /**
515         * Find a specific dataset from a list of Datasets.
516         *
517         * @param eDatasets : List of data sets
518         * @param name : queried data set name
519         * @return one Dataset element. otherwise throw Exception
520         */
521        @SuppressWarnings("unchecked")
522        private static Element findDataSet(Element eDatasets, String name) {
523            for (Element eDataset : (List<Element>) eDatasets.getChildren("dataset", eDatasets.getNamespace())) {
524                if (eDataset.getAttributeValue("name").equals(name)) {
525                    eDataset = (Element) eDataset.clone();
526                    eDataset.detach();
527                    return eDataset;
528                }
529            }
530            throw new RuntimeException("undefined dataset: " + name);
531        }
532    
533        /**
534         * Initialize all the required EL Evaluators.
535         */
536        protected void initEvaluators() {
537            evalFreq = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-submit-freq");
538            evalNofuncs = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-submit-nofuncs");
539            evalInst = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-submit-instances");
540            evalSla = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-sla-submit");
541            evalAction = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-action-start");
542        }
543    
544        /**
545         * Resolve basic entities using job Configuration.
546         *
547         * @param conf :Job configuration
548         * @param appXml : Original job XML
549         * @param coordJob : Coordinator job bean to be populated.
550         * @return Resolved job XML element.
551         * @throws CoordinatorJobException thrown if failed to resolve basic entities
552         * @throws Exception thrown if failed to resolve basic entities
553         */
554        @SuppressWarnings("unchecked")
555        protected Element resolveInitial(Configuration conf, String appXml, CoordinatorJobBean coordJob)
556        throws CoordinatorJobException, Exception {
557            Element eAppXml = XmlUtils.parseXml(appXml);
558            // job's main attributes
559            // frequency
560            String val = resolveAttribute("frequency", eAppXml, evalFreq);
561            int ival = ParamChecker.checkInteger(val, "frequency");
562            ParamChecker.checkGTZero(ival, "frequency");
563            coordJob.setFrequency(ival);
564            TimeUnit tmp = (evalFreq.getVariable("timeunit") == null) ? TimeUnit.MINUTE : ((TimeUnit) evalFreq
565                    .getVariable("timeunit"));
566            addAnAttribute("freq_timeunit", eAppXml, tmp.toString());
567            // TimeUnit
568            coordJob.setTimeUnit(CoordinatorJob.Timeunit.valueOf(tmp.toString()));
569            // End Of Duration
570            tmp = evalFreq.getVariable("endOfDuration") == null ? TimeUnit.NONE : ((TimeUnit) evalFreq
571                    .getVariable("endOfDuration"));
572            addAnAttribute("end_of_duration", eAppXml, tmp.toString());
573            // coordJob.setEndOfDuration(tmp) // TODO: Add new attribute in Job bean
574    
575            // Application name
576            if (this.coordName == null) {
577                val = resolveAttribute("name", eAppXml, evalNofuncs);
578                coordJob.setAppName(val);
579            }
580            else {
581                // this coord job is created from bundle
582                coordJob.setAppName(this.coordName);
583            }
584    
585            // start time
586            val = resolveAttribute("start", eAppXml, evalNofuncs);
587            ParamChecker.checkUTC(val, "start");
588            coordJob.setStartTime(DateUtils.parseDateUTC(val));
589            // end time
590            val = resolveAttribute("end", eAppXml, evalNofuncs);
591            ParamChecker.checkUTC(val, "end");
592            coordJob.setEndTime(DateUtils.parseDateUTC(val));
593            // Time zone
594            val = resolveAttribute("timezone", eAppXml, evalNofuncs);
595            ParamChecker.checkTimeZone(val, "timezone");
596            coordJob.setTimeZone(val);
597    
598            // controls
599            val = resolveTagContents("timeout", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs);
600            if (val == "") {
601                val = Services.get().getConf().get(CONF_DEFAULT_TIMEOUT_NORMAL);
602            }
603    
604            ival = ParamChecker.checkInteger(val, "timeout");
605            if (ival < 0 || ival > Services.get().getConf().getInt(CONF_DEFAULT_MAX_TIMEOUT, 129600)) {
606                ival = Services.get().getConf().getInt(CONF_DEFAULT_MAX_TIMEOUT, 129600);
607            }
608            coordJob.setTimeout(ival);
609    
610            val = resolveTagContents("concurrency", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs);
611            if (val == null || val.isEmpty()) {
612                val = Services.get().getConf().get(CONF_DEFAULT_CONCURRENCY, "1");
613            }
614            ival = ParamChecker.checkInteger(val, "concurrency");
615            coordJob.setConcurrency(ival);
616    
617            val = resolveTagContents("throttle", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs);
618            if (val == null || val.isEmpty()) {
619                int defaultThrottle = Services.get().getConf().getInt(CONF_DEFAULT_THROTTLE, 12);
620                ival = defaultThrottle;
621            }
622            else {
623                ival = ParamChecker.checkInteger(val, "throttle");
624            }
625            int maxQueue = Services.get().getConf().getInt(CONF_QUEUE_SIZE, 10000);
626            float factor = Services.get().getConf().getFloat(CONF_MAT_THROTTLING_FACTOR, 0.10f);
627            int maxThrottle = (int) (maxQueue * factor);
628            if (ival > maxThrottle || ival < 1) {
629                ival = maxThrottle;
630            }
631            LOG.debug("max throttle " + ival);
632            coordJob.setMatThrottling(ival);
633    
634            val = resolveTagContents("execution", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs);
635            if (val == "") {
636                val = Execution.FIFO.toString();
637            }
638            coordJob.setExecution(Execution.valueOf(val));
639            String[] acceptedVals = { Execution.LIFO.toString(), Execution.FIFO.toString(), Execution.LAST_ONLY.toString() };
640            ParamChecker.isMember(val, acceptedVals, "execution");
641    
642            // datasets
643            resolveTagContents("include", eAppXml.getChild("datasets", eAppXml.getNamespace()), evalNofuncs);
644            // for each data set
645            resolveDataSets(eAppXml);
646            HashMap<String, String> dataNameList = new HashMap<String, String>();
647            resolveIOEvents(eAppXml, dataNameList);
648    
649            resolveTagContents("app-path", eAppXml.getChild("action", eAppXml.getNamespace()).getChild("workflow",
650                    eAppXml.getNamespace()), evalNofuncs);
651            // TODO: If action or workflow tag is missing, NullPointerException will
652            // occur
653            Element configElem = eAppXml.getChild("action", eAppXml.getNamespace()).getChild("workflow",
654                    eAppXml.getNamespace()).getChild("configuration", eAppXml.getNamespace());
655            evalData = CoordELEvaluator.createELEvaluatorForDataEcho(conf, "coord-job-submit-data", dataNameList);
656            if (configElem != null) {
657                for (Element propElem : (List<Element>) configElem.getChildren("property", configElem.getNamespace())) {
658                    resolveTagContents("name", propElem, evalData);
659                    // Want to check the data-integrity but don't want to modify the
660                    // XML
661                    // for properties only
662                    Element tmpProp = (Element) propElem.clone();
663                    resolveTagContents("value", tmpProp, evalData);
664                }
665            }
666            resolveSLA(eAppXml, coordJob);
667            return eAppXml;
668        }
669    
670        /**
671         * Resolve SLA events
672         *
673         * @param eAppXml job XML
674         * @param coordJob coordinator job bean
675         * @throws CommandException thrown if failed to resolve sla events
676         */
677        private void resolveSLA(Element eAppXml, CoordinatorJobBean coordJob) throws CommandException {
678            Element eSla = eAppXml.getChild("action", eAppXml.getNamespace()).getChild("info",
679                    Namespace.getNamespace(SchemaService.SLA_NAME_SPACE_URI));
680    
681            if (eSla != null) {
682                String slaXml = XmlUtils.prettyPrint(eSla).toString();
683                try {
684                    // EL evaluation
685                    slaXml = evalSla.evaluate(slaXml, String.class);
686                    // Validate against semantic SXD
687                    XmlUtils.validateData(slaXml, SchemaName.SLA_ORIGINAL);
688                }
689                catch (Exception e) {
690                    throw new CommandException(ErrorCode.E1004, "Validation ERROR :" + e.getMessage(), e);
691                }
692            }
693        }
694    
695        /**
696         * Resolve input-events/data-in and output-events/data-out tags.
697         *
698         * @param eJob : Job element
699         * @throws CoordinatorJobException thrown if failed to resolve input and output events
700         */
701        @SuppressWarnings("unchecked")
702        private void resolveIOEvents(Element eJobOrg, HashMap<String, String> dataNameList) throws CoordinatorJobException {
703            // Resolving input-events/data-in
704            // Clone the job and don't update anything in the original
705            Element eJob = (Element) eJobOrg.clone();
706            Element inputList = eJob.getChild("input-events", eJob.getNamespace());
707            if (inputList != null) {
708                TreeSet<String> eventNameSet = new TreeSet<String>();
709                for (Element dataIn : (List<Element>) inputList.getChildren("data-in", eJob.getNamespace())) {
710                    String dataInName = dataIn.getAttributeValue("name");
711                    dataNameList.put(dataInName, "data-in");
712                    // check whether there is any duplicate data-in name
713                    if (eventNameSet.contains(dataInName)) {
714                        throw new RuntimeException("Duplicate dataIn name " + dataInName);
715                    }
716                    else {
717                        eventNameSet.add(dataInName);
718                    }
719                    resolveTagContents("instance", dataIn, evalInst);
720                    resolveTagContents("start-instance", dataIn, evalInst);
721                    resolveTagContents("end-instance", dataIn, evalInst);
722                }
723            }
724            // Resolving output-events/data-out
725            Element outputList = eJob.getChild("output-events", eJob.getNamespace());
726            if (outputList != null) {
727                TreeSet<String> eventNameSet = new TreeSet<String>();
728                for (Element dataOut : (List<Element>) outputList.getChildren("data-out", eJob.getNamespace())) {
729                    String dataOutName = dataOut.getAttributeValue("name");
730                    dataNameList.put(dataOutName, "data-out");
731                    // check whether there is any duplicate data-out name
732                    if (eventNameSet.contains(dataOutName)) {
733                        throw new RuntimeException("Duplicate dataIn name " + dataOutName);
734                    }
735                    else {
736                        eventNameSet.add(dataOutName);
737                    }
738                    resolveTagContents("instance", dataOut, evalInst);
739                }
740            }
741    
742        }
743    
744        /**
745         * Add an attribute into XML element.
746         *
747         * @param attrName :attribute name
748         * @param elem : Element to add attribute
749         * @param value :Value of attribute
750         */
751        private void addAnAttribute(String attrName, Element elem, String value) {
752            elem.setAttribute(attrName, value);
753        }
754    
755        /**
756         * Resolve datasets using job configuration.
757         *
758         * @param eAppXml : Job Element XML
759         * @throws Exception thrown if failed to resolve datasets
760         */
761        @SuppressWarnings("unchecked")
762        private void resolveDataSets(Element eAppXml) throws Exception {
763            Element datasetList = eAppXml.getChild("datasets", eAppXml.getNamespace());
764            if (datasetList != null) {
765    
766                List<Element> dsElems = datasetList.getChildren("dataset", eAppXml.getNamespace());
767                resolveDataSets(dsElems);
768                resolveTagContents("app-path", eAppXml.getChild("action", eAppXml.getNamespace()).getChild("workflow",
769                        eAppXml.getNamespace()), evalNofuncs);
770            }
771        }
772    
773        /**
774         * Resolve datasets using job configuration.
775         *
776         * @param dsElems : Data set XML element.
777         * @throws CoordinatorJobException thrown if failed to resolve datasets
778         */
779        private void resolveDataSets(List<Element> dsElems) throws CoordinatorJobException {
780            for (Element dsElem : dsElems) {
781                // Setting up default TimeUnit and EndOFDuraion
782                evalFreq.setVariable("timeunit", TimeUnit.MINUTE);
783                evalFreq.setVariable("endOfDuration", TimeUnit.NONE);
784    
785                String val = resolveAttribute("frequency", dsElem, evalFreq);
786                int ival = ParamChecker.checkInteger(val, "frequency");
787                ParamChecker.checkGTZero(ival, "frequency");
788                addAnAttribute("freq_timeunit", dsElem, evalFreq.getVariable("timeunit") == null ? TimeUnit.MINUTE
789                        .toString() : ((TimeUnit) evalFreq.getVariable("timeunit")).toString());
790                addAnAttribute("end_of_duration", dsElem, evalFreq.getVariable("endOfDuration") == null ? TimeUnit.NONE
791                        .toString() : ((TimeUnit) evalFreq.getVariable("endOfDuration")).toString());
792                val = resolveAttribute("initial-instance", dsElem, evalNofuncs);
793                ParamChecker.checkUTC(val, "initial-instance");
794                checkInitialInstance(val);
795                val = resolveAttribute("timezone", dsElem, evalNofuncs);
796                ParamChecker.checkTimeZone(val, "timezone");
797                resolveTagContents("uri-template", dsElem, evalNofuncs);
798                resolveTagContents("done-flag", dsElem, evalNofuncs);
799            }
800        }
801    
802        /**
803         * Resolve the content of a tag.
804         *
805         * @param tagName : Tag name of job XML i.e. <timeout> 10 </timeout>
806         * @param elem : Element where the tag exists.
807         * @param eval : EL evealuator
808         * @return Resolved tag content.
809         * @throws CoordinatorJobException thrown if failed to resolve tag content
810         */
811        @SuppressWarnings("unchecked")
812        private String resolveTagContents(String tagName, Element elem, ELEvaluator eval) throws CoordinatorJobException {
813            String ret = "";
814            if (elem != null) {
815                for (Element tagElem : (List<Element>) elem.getChildren(tagName, elem.getNamespace())) {
816                    if (tagElem != null) {
817                        String updated;
818                        try {
819                            updated = CoordELFunctions.evalAndWrap(eval, tagElem.getText().trim());
820    
821                        }
822                        catch (Exception e) {
823                            throw new CoordinatorJobException(ErrorCode.E1004, e.getMessage(), e);
824                        }
825                        tagElem.removeContent();
826                        tagElem.addContent(updated);
827                        ret += updated;
828                    }
829                }
830            }
831            return ret;
832        }
833    
834        /**
835         * Resolve an attribute value.
836         *
837         * @param attrName : Attribute name.
838         * @param elem : XML Element where attribute is defiend
839         * @param eval : ELEvaluator used to resolve
840         * @return Resolved attribute value
841         * @throws CoordinatorJobException thrown if failed to resolve an attribute value
842         */
843        private String resolveAttribute(String attrName, Element elem, ELEvaluator eval) throws CoordinatorJobException {
844            Attribute attr = elem.getAttribute(attrName);
845            String val = null;
846            if (attr != null) {
847                try {
848                    val = CoordELFunctions.evalAndWrap(eval, attr.getValue().trim());
849                }
850                catch (Exception e) {
851                    throw new CoordinatorJobException(ErrorCode.E1004, e.getMessage(), e);
852                }
853                attr.setValue(val);
854            }
855            return val;
856        }
857    
858        /**
859         * Include referred datasets into XML.
860         *
861         * @param resolvedXml : Job XML element.
862         * @param conf : Job configuration
863         * @throws CoordinatorJobException thrown if failed to include referred datasets into XML
864         */
865        @SuppressWarnings("unchecked")
866        protected void includeDataSets(Element resolvedXml, Configuration conf) throws CoordinatorJobException {
867            Element datasets = resolvedXml.getChild("datasets", resolvedXml.getNamespace());
868            Element allDataSets = new Element("all_datasets", resolvedXml.getNamespace());
869            List<String> dsList = new ArrayList<String>();
870            if (datasets != null) {
871                for (Element includeElem : (List<Element>) datasets.getChildren("include", datasets.getNamespace())) {
872                    String incDSFile = includeElem.getTextTrim();
873                    includeOneDSFile(incDSFile, dsList, allDataSets, datasets.getNamespace());
874                }
875                for (Element e : (List<Element>) datasets.getChildren("dataset", datasets.getNamespace())) {
876                    String dsName = e.getAttributeValue("name");
877                    if (dsList.contains(dsName)) {// Override with this DS
878                        // Remove old DS
879                        removeDataSet(allDataSets, dsName);
880                    }
881                    else {
882                        dsList.add(dsName);
883                    }
884                    allDataSets.addContent((Element) e.clone());
885                }
886            }
887            insertDataSet(resolvedXml, allDataSets);
888            resolvedXml.removeChild("datasets", resolvedXml.getNamespace());
889        }
890    
891        /**
892         * Include one dataset file.
893         *
894         * @param incDSFile : Include data set filename.
895         * @param dsList :List of dataset names to verify the duplicate.
896         * @param allDataSets : Element that includes all dataset definitions.
897         * @param dsNameSpace : Data set name space
898         * @throws CoordinatorJobException thrown if failed to include one dataset file
899         */
900        @SuppressWarnings("unchecked")
901        private void includeOneDSFile(String incDSFile, List<String> dsList, Element allDataSets, Namespace dsNameSpace)
902        throws CoordinatorJobException {
903            Element tmpDataSets = null;
904            try {
905                String dsXml = readDefinition(incDSFile);
906                LOG.debug("DSFILE :" + incDSFile + "\n" + dsXml);
907                tmpDataSets = XmlUtils.parseXml(dsXml);
908            }
909            catch (JDOMException e) {
910                LOG.warn("Error parsing included dataset [{0}].  Message [{1}]", incDSFile, e.getMessage());
911                throw new CoordinatorJobException(ErrorCode.E0700, e.getMessage());
912            }
913            resolveDataSets(tmpDataSets.getChildren("dataset"));
914            for (Element e : (List<Element>) tmpDataSets.getChildren("dataset")) {
915                String dsName = e.getAttributeValue("name");
916                if (dsList.contains(dsName)) {
917                    throw new RuntimeException("Duplicate Dataset " + dsName);
918                }
919                dsList.add(dsName);
920                Element tmp = (Element) e.clone();
921                // TODO: Don't like to over-write the external/include DS's namespace
922                tmp.setNamespace(dsNameSpace);
923                tmp.getChild("uri-template").setNamespace(dsNameSpace);
924                if (e.getChild("done-flag") != null) {
925                    tmp.getChild("done-flag").setNamespace(dsNameSpace);
926                }
927                allDataSets.addContent(tmp);
928            }
929            // nested include
930            for (Element includeElem : (List<Element>) tmpDataSets.getChildren("include", tmpDataSets.getNamespace())) {
931                String incFile = includeElem.getTextTrim();
932                includeOneDSFile(incFile, dsList, allDataSets, dsNameSpace);
933            }
934        }
935    
936        /**
937         * Remove a dataset from a list of dataset.
938         *
939         * @param eDatasets : List of dataset
940         * @param name : Dataset name to be removed.
941         */
942        @SuppressWarnings("unchecked")
943        private static void removeDataSet(Element eDatasets, String name) {
944            for (Element eDataset : (List<Element>) eDatasets.getChildren("dataset", eDatasets.getNamespace())) {
945                if (eDataset.getAttributeValue("name").equals(name)) {
946                    eDataset.detach();
947                }
948            }
949            throw new RuntimeException("undefined dataset: " + name);
950        }
951    
952        /**
953         * Read coordinator definition.
954         *
955         * @param appPath application path.
956         * @return coordinator definition.
957         * @throws CoordinatorJobException thrown if the definition could not be read.
958         */
959        protected String readDefinition(String appPath) throws CoordinatorJobException {
960            String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
961            // Configuration confHadoop = CoordUtils.getHadoopConf(conf);
962            try {
963                URI uri = new URI(appPath);
964                LOG.debug("user =" + user);
965                HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
966                Configuration fsConf = has.createJobConf(uri.getAuthority());
967                FileSystem fs = has.createFileSystem(user, uri, fsConf);
968                Path appDefPath = null;
969    
970                // app path could be a directory
971                Path path = new Path(uri.getPath());
972                // check file exists for dataset include file, app xml already checked
973                if (!fs.exists(path)) {
974                    throw new URISyntaxException(path.toString(), "path not existed : " + path.toString());
975                }
976                if (!fs.isFile(path)) {
977                    appDefPath = new Path(path, COORDINATOR_XML_FILE);
978                } else {
979                    appDefPath = path;
980                }
981    
982                Reader reader = new InputStreamReader(fs.open(appDefPath));
983                StringWriter writer = new StringWriter();
984                IOUtils.copyCharStream(reader, writer);
985                return writer.toString();
986            }
987            catch (IOException ex) {
988                LOG.warn("IOException :" + XmlUtils.prettyPrint(conf), ex);
989                throw new CoordinatorJobException(ErrorCode.E1001, ex.getMessage(), ex);
990            }
991            catch (URISyntaxException ex) {
992                LOG.warn("URISyException :" + ex.getMessage());
993                throw new CoordinatorJobException(ErrorCode.E1002, appPath, ex.getMessage(), ex);
994            }
995            catch (HadoopAccessorException ex) {
996                throw new CoordinatorJobException(ex);
997            }
998            catch (Exception ex) {
999                LOG.warn("Exception :", ex);
1000                throw new CoordinatorJobException(ErrorCode.E1001, ex.getMessage(), ex);
1001            }
1002        }
1003    
1004        /**
1005         * Write a coordinator job into database
1006         *
1007         * @param eJob : XML element of job
1008         * @param coordJob : Coordinator job bean
1009         * @return Job id
1010         * @throws CommandException thrown if unable to save coordinator job to db
1011         */
1012        private String storeToDB(Element eJob, CoordinatorJobBean coordJob) throws CommandException {
1013            String jobId = Services.get().get(UUIDService.class).generateId(ApplicationType.COORDINATOR);
1014            coordJob.setId(jobId);
1015            coordJob.setAuthToken(this.authToken);
1016    
1017            coordJob.setAppPath(conf.get(OozieClient.COORDINATOR_APP_PATH));
1018            coordJob.setCreatedTime(new Date());
1019            coordJob.setUser(conf.get(OozieClient.USER_NAME));
1020            String group = ConfigUtils.getWithDeprecatedCheck(conf, OozieClient.JOB_ACL, OozieClient.GROUP_NAME, null);
1021            coordJob.setGroup(group);
1022            coordJob.setConf(XmlUtils.prettyPrint(conf).toString());
1023            coordJob.setJobXml(XmlUtils.prettyPrint(eJob).toString());
1024            coordJob.setLastActionNumber(0);
1025            coordJob.setLastModifiedTime(new Date());
1026    
1027            if (!dryrun) {
1028                coordJob.setLastModifiedTime(new Date());
1029                try {
1030                    jpaService.execute(new CoordJobInsertJPAExecutor(coordJob));
1031                }
1032                catch (JPAExecutorException je) {
1033                    throw new CommandException(je);
1034                }
1035            }
1036            return jobId;
1037        }
1038    
1039        /*
1040         * this method checks if the initial-instance specified for a particular
1041           is not a date earlier than the oozie server default Jan 01, 1970 00:00Z UTC
1042         */
1043        private void checkInitialInstance(String val) throws CoordinatorJobException, IllegalArgumentException {
1044            Date initialInstance, givenInstance;
1045            DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm'Z'");
1046            df.setTimeZone(TimeZone.getTimeZone("UTC"));
1047            try {
1048                initialInstance = DateUtils.parseDateUTC("1970-01-01T00:00Z");
1049                givenInstance = DateUtils.parseDateUTC(val);
1050            }
1051            catch (Exception e) {
1052                throw new IllegalArgumentException("Unable to parse dataset initial-instance string '" + val + "' to Date object. ",e);
1053            }
1054            if(givenInstance.compareTo(initialInstance) < 0) {
1055                throw new CoordinatorJobException(ErrorCode.E1021, "Dataset initial-instance " + df.format(givenInstance) + " is earlier than the default initial instance " + df.format(initialInstance));
1056            }
1057        }
1058    
1059        /* (non-Javadoc)
1060         * @see org.apache.oozie.command.XCommand#getEntityKey()
1061         */
1062        @Override
1063        public String getEntityKey() {
1064            return null;
1065        }
1066    
1067        /* (non-Javadoc)
1068         * @see org.apache.oozie.command.XCommand#isLockRequired()
1069         */
1070        @Override
1071        protected boolean isLockRequired() {
1072            return false;
1073        }
1074    
1075        /* (non-Javadoc)
1076         * @see org.apache.oozie.command.XCommand#loadState()
1077         */
1078        @Override
1079        protected void loadState() throws CommandException {
1080            jpaService = Services.get().get(JPAService.class);
1081            if (jpaService == null) {
1082                throw new CommandException(ErrorCode.E0610);
1083            }
1084            coordJob = new CoordinatorJobBean();
1085            if (this.bundleId != null) {
1086                // this coord job is created from bundle
1087                coordJob.setBundleId(this.bundleId);
1088                // first use bundle id if submit thru bundle
1089                logInfo.setParameter(DagXLogInfoService.JOB, this.bundleId);
1090                LogUtils.setLogInfo(logInfo);
1091            }
1092            if (this.coordName != null) {
1093                // this coord job is created from bundle
1094                coordJob.setAppName(this.coordName);
1095            }
1096            setJob(coordJob);
1097    
1098        }
1099    
1100        /* (non-Javadoc)
1101         * @see org.apache.oozie.command.XCommand#verifyPrecondition()
1102         */
1103        @Override
1104        protected void verifyPrecondition() throws CommandException {
1105    
1106        }
1107    
1108        /* (non-Javadoc)
1109         * @see org.apache.oozie.command.TransitionXCommand#notifyParent()
1110         */
1111        @Override
1112        public void notifyParent() throws CommandException {
1113            // update bundle action
1114            if (coordJob.getBundleId() != null) {
1115                LOG.debug("Updating bundle record: " + coordJob.getBundleId() + " for coord id: " + coordJob.getId());
1116                BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus);
1117                bundleStatusUpdate.call();
1118            }
1119        }
1120    
1121        /* (non-Javadoc)
1122         * @see org.apache.oozie.command.TransitionXCommand#updateJob()
1123         */
1124        @Override
1125        public void updateJob() throws CommandException {
1126        }
1127    
1128        /* (non-Javadoc)
1129         * @see org.apache.oozie.command.TransitionXCommand#getJob()
1130         */
1131        @Override
1132        public Job getJob() {
1133            return coordJob;
1134        }
1135    }