001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.command.coord;
020
021import java.io.IOException;
022import java.io.InputStreamReader;
023import java.io.Reader;
024import java.io.StringReader;
025import java.io.StringWriter;
026import java.net.URI;
027import java.net.URISyntaxException;
028import java.util.ArrayList;
029import java.util.Calendar;
030import java.util.Date;
031import java.util.HashMap;
032import java.util.HashSet;
033import java.util.Iterator;
034import java.util.List;
035import java.util.Map;
036import java.util.Set;
037import java.util.TreeSet;
038
039import javax.xml.transform.stream.StreamSource;
040import javax.xml.validation.Validator;
041
042import org.apache.hadoop.conf.Configuration;
043import org.apache.hadoop.fs.FileSystem;
044import org.apache.hadoop.fs.Path;
045import org.apache.oozie.CoordinatorJobBean;
046import org.apache.oozie.ErrorCode;
047import org.apache.oozie.client.CoordinatorJob;
048import org.apache.oozie.client.Job;
049import org.apache.oozie.client.OozieClient;
050import org.apache.oozie.client.CoordinatorJob.Execution;
051import org.apache.oozie.command.CommandException;
052import org.apache.oozie.command.SubmitTransitionXCommand;
053import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
054import org.apache.oozie.coord.CoordELEvaluator;
055import org.apache.oozie.coord.CoordELFunctions;
056import org.apache.oozie.coord.CoordUtils;
057import org.apache.oozie.coord.CoordinatorJobException;
058import org.apache.oozie.coord.TimeUnit;
059import org.apache.oozie.coord.input.logic.CoordInputLogicEvaluator;
060import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
061import org.apache.oozie.executor.jpa.JPAExecutorException;
062import org.apache.oozie.service.CoordMaterializeTriggerService;
063import org.apache.oozie.service.ConfigurationService;
064import org.apache.oozie.service.HadoopAccessorException;
065import org.apache.oozie.service.HadoopAccessorService;
066import org.apache.oozie.service.JPAService;
067import org.apache.oozie.service.SchemaService;
068import org.apache.oozie.service.Service;
069import org.apache.oozie.service.Services;
070import org.apache.oozie.service.UUIDService;
071import org.apache.oozie.service.SchemaService.SchemaName;
072import org.apache.oozie.service.UUIDService.ApplicationType;
073import org.apache.oozie.util.ConfigUtils;
074import org.apache.oozie.util.DateUtils;
075import org.apache.oozie.util.ELEvaluator;
076import org.apache.oozie.util.ELUtils;
077import org.apache.oozie.util.IOUtils;
078import org.apache.oozie.util.InstrumentUtils;
079import org.apache.oozie.util.LogUtils;
080import org.apache.oozie.util.ParamChecker;
081import org.apache.oozie.util.ParameterVerifier;
082import org.apache.oozie.util.ParameterVerifierException;
083import org.apache.oozie.util.PropertiesUtils;
084import org.apache.oozie.util.XConfiguration;
085import org.apache.oozie.util.XmlUtils;
086import org.jdom.Attribute;
087import org.jdom.Element;
088import org.jdom.JDOMException;
089import org.jdom.Namespace;
090import org.xml.sax.SAXException;
091
092/**
093 * This class provides the functionalities to resolve a coordinator job XML and write the job information into a DB
094 * table.
095 * <p>
096 * Specifically it performs the following functions: 1. Resolve all the variables or properties using job
097 * configurations. 2. Insert all datasets definition as part of the &lt;data-in&gt; and &lt;data-out&gt; tags. 3. Validate the XML
098 * at runtime.
099 */
100public class CoordSubmitXCommand extends SubmitTransitionXCommand {
101
102    protected Configuration conf;
103    protected final String bundleId;
104    protected final String coordName;
105    protected boolean dryrun;
106    protected JPAService jpaService = null;
107    private CoordinatorJob.Status prevStatus = CoordinatorJob.Status.PREP;
108
109    public static final String CONFIG_DEFAULT = "coord-config-default.xml";
110    public static final String COORDINATOR_XML_FILE = "coordinator.xml";
111    public final String COORD_INPUT_EVENTS ="input-events";
112    public final String COORD_OUTPUT_EVENTS = "output-events";
113    public final String COORD_INPUT_EVENTS_DATA_IN ="data-in";
114    public final String COORD_OUTPUT_EVENTS_DATA_OUT = "data-out";
115
116    private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>();
117    private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>();
118
119    protected CoordinatorJobBean coordJob = null;
120    /**
121     * Default timeout for normal jobs, in minutes, after which coordinator input check will timeout
122     */
123    public static final String CONF_DEFAULT_TIMEOUT_NORMAL = Service.CONF_PREFIX + "coord.normal.default.timeout";
124
125    public static final String CONF_DEFAULT_CONCURRENCY = Service.CONF_PREFIX + "coord.default.concurrency";
126
127    public static final String CONF_DEFAULT_THROTTLE = Service.CONF_PREFIX + "coord.default.throttle";
128
129    public static final String CONF_MAT_THROTTLING_FACTOR = Service.CONF_PREFIX
130            + "coord.materialization.throttling.factor";
131
132    /**
133     * Default MAX timeout in minutes, after which coordinator input check will timeout
134     */
135    public static final String CONF_DEFAULT_MAX_TIMEOUT = Service.CONF_PREFIX + "coord.default.max.timeout";
136
137    public static final String CONF_QUEUE_SIZE = Service.CONF_PREFIX + "CallableQueueService.queue.size";
138
139    public static final String CONF_CHECK_MAX_FREQUENCY = Service.CONF_PREFIX + "coord.check.maximum.frequency";
140
141    private ELEvaluator evalFreq = null;
142    private ELEvaluator evalNofuncs = null;
143    private ELEvaluator evalData = null;
144    private ELEvaluator evalInst = null;
145    private ELEvaluator evalAction = null;
146    private ELEvaluator evalSla = null;
147    private ELEvaluator evalTimeout = null;
148    private ELEvaluator evalInitialInstance = null;
149
150    static {
151        String[] badUserProps = { PropertiesUtils.YEAR, PropertiesUtils.MONTH, PropertiesUtils.DAY,
152                PropertiesUtils.HOUR, PropertiesUtils.MINUTE, PropertiesUtils.DAYS, PropertiesUtils.HOURS,
153                PropertiesUtils.MINUTES, PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB,
154                PropertiesUtils.TB, PropertiesUtils.PB, PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN,
155                PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN, PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS };
156        PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES);
157
158        String[] badDefaultProps = { PropertiesUtils.HADOOP_USER};
159        PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES);
160        PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES);
161    }
162
163    /**
164     * Constructor to create the Coordinator Submit Command.
165     *
166     * @param conf : Configuration for Coordinator job
167     */
168    public CoordSubmitXCommand(Configuration conf) {
169        super("coord_submit", "coord_submit", 1);
170        this.conf = ParamChecker.notNull(conf, "conf");
171        this.bundleId = null;
172        this.coordName = null;
173    }
174
175    /**
176     * Constructor to create the Coordinator Submit Command by bundle job.
177     *
178     * @param conf : Configuration for Coordinator job
179     * @param bundleId : bundle id
180     * @param coordName : coord name
181     */
182    protected CoordSubmitXCommand(Configuration conf, String bundleId, String coordName) {
183        super("coord_submit", "coord_submit", 1);
184        this.conf = ParamChecker.notNull(conf, "conf");
185        this.bundleId = ParamChecker.notEmpty(bundleId, "bundleId");
186        this.coordName = ParamChecker.notEmpty(coordName, "coordName");
187    }
188
189    /**
190     * Constructor to create the Coordinator Submit Command.
191     *
192     * @param dryrun : if dryrun
193     * @param conf : Configuration for Coordinator job
194     */
195    public CoordSubmitXCommand(boolean dryrun, Configuration conf) {
196        this(conf);
197        this.dryrun = dryrun;
198    }
199
200    /* (non-Javadoc)
201     * @see org.apache.oozie.command.XCommand#execute()
202     */
203    @Override
204    protected String submit() throws CommandException {
205        LOG.info("STARTED Coordinator Submit");
206        String jobId = submitJob();
207        LOG.info("ENDED Coordinator Submit jobId=" + jobId);
208        return jobId;
209    }
210
211    protected String submitJob() throws CommandException {
212        String jobId = null;
213        InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
214
215        boolean exceptionOccured = false;
216        try {
217            mergeDefaultConfig();
218
219            String appXml = readAndValidateXml();
220            coordJob.setOrigJobXml(appXml);
221            LOG.debug("jobXml after initial validation " + XmlUtils.prettyPrint(appXml).toString());
222
223            Element eXml = XmlUtils.parseXml(appXml);
224
225            String appNamespace = readAppNamespace(eXml);
226            coordJob.setAppNamespace(appNamespace);
227
228            ParameterVerifier.verifyParameters(conf, eXml);
229
230            appXml = XmlUtils.removeComments(appXml);
231            initEvaluators();
232            Element eJob = basicResolveAndIncludeDS(appXml, conf, coordJob);
233
234            validateCoordinatorJob();
235
236            // checking if the coordinator application data input/output events
237            // specify multiple data instance values in erroneous manner
238            checkMultipleTimeInstances(eJob, COORD_INPUT_EVENTS, COORD_INPUT_EVENTS_DATA_IN);
239            checkMultipleTimeInstances(eJob, COORD_OUTPUT_EVENTS, COORD_OUTPUT_EVENTS_DATA_OUT);
240
241            LOG.debug("jobXml after all validation " + XmlUtils.prettyPrint(eJob).toString());
242
243            jobId = storeToDB(appXml, eJob, coordJob);
244            // log job info for coordinator job
245            LogUtils.setLogInfo(coordJob);
246
247            if (!dryrun) {
248                queueMaterializeTransitionXCommand(jobId);
249            }
250            else {
251                return getDryRun(coordJob);
252            }
253        }
254        catch (JDOMException jex) {
255            exceptionOccured = true;
256            LOG.warn("ERROR: ", jex);
257            throw new CommandException(ErrorCode.E0700, jex.getMessage(), jex);
258        }
259        catch (CoordinatorJobException cex) {
260            exceptionOccured = true;
261            LOG.warn("ERROR:  ", cex);
262            throw new CommandException(cex);
263        }
264        catch (ParameterVerifierException pex) {
265            exceptionOccured = true;
266            LOG.warn("ERROR: ", pex);
267            throw new CommandException(pex);
268        }
269        catch (IllegalArgumentException iex) {
270            exceptionOccured = true;
271            LOG.warn("ERROR:  ", iex);
272            throw new CommandException(ErrorCode.E1003, iex.getMessage(), iex);
273        }
274        catch (Exception ex) {
275            exceptionOccured = true;
276            LOG.warn("ERROR:  ", ex);
277            throw new CommandException(ErrorCode.E0803, ex.getMessage(), ex);
278        }
279        finally {
280            if (exceptionOccured) {
281                if (coordJob.getId() == null || coordJob.getId().equalsIgnoreCase("")) {
282                    coordJob.setStatus(CoordinatorJob.Status.FAILED);
283                    coordJob.resetPending();
284                }
285            }
286        }
287        return jobId;
288    }
289
290    /**
291     * Gets the dryrun output.
292     *
293     * @param coordJob the coordinatorJobBean
294     * @return the dry run
295     * @throws Exception the exception
296     */
297    protected String getDryRun(CoordinatorJobBean coordJob) throws Exception{
298        int materializationWindow = ConfigurationService
299                .getInt(CoordMaterializeTriggerService.CONF_MATERIALIZATION_WINDOW);
300        Date startTime = coordJob.getStartTime();
301        long startTimeMilli = startTime.getTime();
302        long endTimeMilli = startTimeMilli + (materializationWindow * 1000);
303        Date jobEndTime = coordJob.getEndTime();
304        Date endTime = new Date(endTimeMilli);
305        if (endTime.compareTo(jobEndTime) > 0) {
306            endTime = jobEndTime;
307        }
308        String jobId = coordJob.getId();
309        LOG.info("[" + jobId + "]: Update status to RUNNING");
310        coordJob.setStatus(Job.Status.RUNNING);
311        coordJob.setPending();
312        Configuration jobConf = null;
313        try {
314            jobConf = new XConfiguration(new StringReader(coordJob.getConf()));
315        }
316        catch (IOException e1) {
317            LOG.warn("Configuration parse error. read from DB :" + coordJob.getConf(), e1);
318        }
319        String action = new CoordMaterializeTransitionXCommand(coordJob, materializationWindow, startTime,
320                endTime).materializeActions(true);
321        String output = coordJob.getJobXml() + System.getProperty("line.separator")
322        + "***actions for instance***" + action;
323        return output;
324    }
325
326    /**
327     * Queue MaterializeTransitionXCommand
328     */
329    protected void queueMaterializeTransitionXCommand(String jobId) {
330        int materializationWindow = ConfigurationService
331                .getInt(CoordMaterializeTriggerService.CONF_MATERIALIZATION_WINDOW);
332        queue(new CoordMaterializeTransitionXCommand(jobId, materializationWindow), 100);
333    }
334
335    /**
336     * Method that validates values in the definition for correctness. Placeholder to add more.
337     */
338    private void validateCoordinatorJob() throws Exception {
339        // check if startTime < endTime
340        if (!coordJob.getStartTime().before(coordJob.getEndTime())) {
341            throw new IllegalArgumentException("Coordinator Start Time must be earlier than End Time.");
342        }
343
344        try {
345            // Check if a coord job with cron frequency will materialize actions
346            int freq = Integer.parseInt(coordJob.getFrequency());
347
348            // Check if the frequency is faster than 5 min if enabled
349            if (ConfigurationService.getBoolean(CONF_CHECK_MAX_FREQUENCY)) {
350                CoordinatorJob.Timeunit unit = coordJob.getTimeUnit();
351                if (freq == 0 || (freq < 5 && unit == CoordinatorJob.Timeunit.MINUTE)) {
352                    throw new IllegalArgumentException("Coordinator job with frequency [" + freq +
353                            "] minutes is faster than allowed maximum of 5 minutes ("
354                            + CONF_CHECK_MAX_FREQUENCY + " is set to true)");
355                }
356            }
357        } catch (NumberFormatException e) {
358            Date start = coordJob.getStartTime();
359            Calendar cal = Calendar.getInstance();
360            cal.setTime(start);
361            cal.add(Calendar.MINUTE, -1);
362            start = cal.getTime();
363
364            Date nextTime = CoordCommandUtils.getNextValidActionTimeForCronFrequency(start, coordJob);
365            if (nextTime == null) {
366                throw new IllegalArgumentException("Invalid coordinator cron frequency: " + coordJob.getFrequency());
367            }
368            if (!nextTime.before(coordJob.getEndTime())) {
369                throw new IllegalArgumentException("Coordinator job with frequency '" +
370                        coordJob.getFrequency() + "' materializes no actions between start and end time.");
371            }
372        }
373    }
374
375  /*
376  * Check against multiple data instance values inside a single <instance> <start-instance> or <end-instance> tag
377  * If found, the job is not submitted and user is informed to correct the error, instead of defaulting to the first instance value in the list
378  */
379    private void checkMultipleTimeInstances(Element eCoordJob, String eventType, String dataType) throws CoordinatorJobException {
380        Element eventsSpec, dataSpec, instance;
381        List<Element> instanceSpecList;
382        Namespace ns = eCoordJob.getNamespace();
383        String instanceValue;
384        eventsSpec = eCoordJob.getChild(eventType, ns);
385        if (eventsSpec != null) {
386            dataSpec = eventsSpec.getChild(dataType, ns);
387            if (dataSpec != null) {
388                // In case of input-events, there can be multiple child <instance> datasets. Iterating to ensure none of them have errors
389                instanceSpecList = dataSpec.getChildren("instance", ns);
390                Iterator instanceIter = instanceSpecList.iterator();
391                while(instanceIter.hasNext()) {
392                    instance = ((Element) instanceIter.next());
393                    if(instance.getContentSize() == 0) { //empty string or whitespace
394                        throw new CoordinatorJobException(ErrorCode.E1021, "<instance> tag within " + eventType + " is empty!");
395                    }
396                    instanceValue = instance.getContent(0).toString();
397                    boolean isInvalid = false;
398                    try {
399                        isInvalid = evalAction.checkForExistence(instanceValue, ",");
400                    } catch (Exception e) {
401                        handleELParseException(eventType, dataType, instanceValue);
402                    }
403                    if (isInvalid) { // reaching this block implies instance is not empty i.e. length > 0
404                        handleExpresionWithMultipleInstances(eventType, dataType, instanceValue);
405                    }
406                }
407
408                // In case of input-events, there can be multiple child <start-instance> datasets. Iterating to ensure none of them have errors
409                instanceSpecList = dataSpec.getChildren("start-instance", ns);
410                instanceIter = instanceSpecList.iterator();
411                while(instanceIter.hasNext()) {
412                    instance = ((Element) instanceIter.next());
413                    if(instance.getContentSize() == 0) { //empty string or whitespace
414                        throw new CoordinatorJobException(ErrorCode.E1021, "<start-instance> tag within " + eventType + " is empty!");
415                    }
416                    instanceValue = instance.getContent(0).toString();
417                    boolean isInvalid = false;
418                    try {
419                        isInvalid = evalAction.checkForExistence(instanceValue, ",");
420                    } catch (Exception e) {
421                        handleELParseException(eventType, dataType, instanceValue);
422                    }
423                    if (isInvalid) { // reaching this block implies start instance is not empty i.e. length > 0
424                        handleExpresionWithStartMultipleInstances(eventType, dataType, instanceValue);
425                    }
426                }
427
428                // In case of input-events, there can be multiple child <end-instance> datasets. Iterating to ensure none of them have errors
429                instanceSpecList = dataSpec.getChildren("end-instance", ns);
430                instanceIter = instanceSpecList.iterator();
431                while(instanceIter.hasNext()) {
432                    instance = ((Element) instanceIter.next());
433                    if(instance.getContentSize() == 0) { //empty string or whitespace
434                        throw new CoordinatorJobException(ErrorCode.E1021, "<end-instance> tag within " + eventType + " is empty!");
435                    }
436                    instanceValue = instance.getContent(0).toString();
437                    boolean isInvalid = false;
438                    try {
439                        isInvalid = evalAction.checkForExistence(instanceValue, ",");
440                    } catch (Exception e) {
441                        handleELParseException(eventType, dataType, instanceValue);
442                    }
443                    if (isInvalid) { // reaching this block implies instance is not empty i.e. length > 0
444                        handleExpresionWithMultipleEndInstances(eventType, dataType, instanceValue);
445                    }
446                }
447
448            }
449        }
450    }
451
452    private void handleELParseException(String eventType, String dataType, String instanceValue)
453            throws CoordinatorJobException {
454        String correctAction = null;
455        if(dataType.equals(COORD_INPUT_EVENTS_DATA_IN)) {
456            correctAction = "Coordinator app definition should have valid <instance> tag for data-in";
457        } else if(dataType.equals(COORD_OUTPUT_EVENTS_DATA_OUT)) {
458            correctAction = "Coordinator app definition should have valid <instance> tag for data-out";
459        }
460        throw new CoordinatorJobException(ErrorCode.E1021, eventType + " instance '" + instanceValue
461                + "' is not valid. Coordinator job NOT SUBMITTED. " + correctAction);
462    }
463
464    private void handleExpresionWithMultipleInstances(String eventType, String dataType, String instanceValue)
465            throws CoordinatorJobException {
466        String correctAction = null;
467        if(dataType.equals(COORD_INPUT_EVENTS_DATA_IN)) {
468            correctAction = "Coordinator app definition should have separate <instance> tag per data-in instance";
469        } else if(dataType.equals(COORD_OUTPUT_EVENTS_DATA_OUT)) {
470            correctAction = "Coordinator app definition can have only one <instance> tag per data-out instance";
471        }
472        throw new CoordinatorJobException(ErrorCode.E1021, eventType + " instance '" + instanceValue
473                + "' contains more than one date instance. Coordinator job NOT SUBMITTED. " + correctAction);
474    }
475
476    private void handleExpresionWithStartMultipleInstances(String eventType, String dataType, String instanceValue)
477            throws CoordinatorJobException {
478        String correctAction = "Coordinator app definition should not have multiple start-instances";
479        throw new CoordinatorJobException(ErrorCode.E1021, eventType + " start-instance '" + instanceValue
480                + "' contains more than one date start-instance. Coordinator job NOT SUBMITTED. " + correctAction);
481    }
482
483    private void handleExpresionWithMultipleEndInstances(String eventType, String dataType, String instanceValue)
484            throws CoordinatorJobException {
485        String correctAction = "Coordinator app definition should not have multiple end-instances";
486        throw new CoordinatorJobException(ErrorCode.E1021, eventType + " end-instance '" + instanceValue
487                + "' contains more than one date end-instance. Coordinator job NOT SUBMITTED. " + correctAction);
488    }
489    /**
490     * Read the application XML and validate against coordinator Schema
491     *
492     * @return validated coordinator XML
493     * @throws CoordinatorJobException thrown if unable to read or validate coordinator xml
494     */
495    protected String readAndValidateXml() throws CoordinatorJobException {
496        String appPath = ParamChecker.notEmpty(conf.get(OozieClient.COORDINATOR_APP_PATH),
497                OozieClient.COORDINATOR_APP_PATH);
498        String coordXml = readDefinition(appPath);
499        validateXml(coordXml);
500        return coordXml;
501    }
502
503    /**
504     * Validate against Coordinator XSD file
505     *
506     * @param xmlContent : Input coordinator xml
507     * @throws CoordinatorJobException thrown if unable to validate coordinator xml
508     */
509    private void validateXml(String xmlContent) throws CoordinatorJobException {
510        try {
511            Validator validator = Services.get().get(SchemaService.class).getValidator(SchemaName.COORDINATOR);
512            validator.validate(new StreamSource(new StringReader(xmlContent)));
513        }
514        catch (SAXException ex) {
515            LOG.warn("SAXException :", ex);
516            throw new CoordinatorJobException(ErrorCode.E0701, ex.getMessage(), ex);
517        }
518        catch (IOException ex) {
519            LOG.warn("IOException :", ex);
520            throw new CoordinatorJobException(ErrorCode.E0702, ex.getMessage(), ex);
521        }
522    }
523
524    /**
525     * Read the application XML schema namespace
526     *
527     * @param coordXmlElement input coordinator xml Element
528     * @return app xml namespace
529     * @throws CoordinatorJobException
530     */
531    private String readAppNamespace(Element coordXmlElement) throws CoordinatorJobException {
532        Namespace ns = coordXmlElement.getNamespace();
533        if (ns != null && bundleId != null && ns.getURI().equals(SchemaService.COORDINATOR_NAMESPACE_URI_1)) {
534            throw new CoordinatorJobException(ErrorCode.E1319, "bundle app can not submit coordinator namespace "
535                    + SchemaService.COORDINATOR_NAMESPACE_URI_1 + ", please use 0.2 or later");
536        }
537        if (ns != null) {
538            return ns.getURI();
539        }
540        else {
541            throw new CoordinatorJobException(ErrorCode.E0700, "the application xml namespace is not given");
542        }
543    }
544
545    /**
546     * Merge default configuration with user-defined configuration.
547     *
548     * @throws CommandException thrown if failed to read or merge configurations
549     */
550    protected void mergeDefaultConfig() throws CommandException {
551        Path configDefault = null;
552        try {
553            String coordAppPathStr = conf.get(OozieClient.COORDINATOR_APP_PATH);
554            Path coordAppPath = new Path(coordAppPathStr);
555            String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
556            HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
557            Configuration fsConf = has.createJobConf(coordAppPath.toUri().getAuthority());
558            FileSystem fs = has.createFileSystem(user, coordAppPath.toUri(), fsConf);
559
560            // app path could be a directory
561            if (!fs.isFile(coordAppPath)) {
562                configDefault = new Path(coordAppPath, CONFIG_DEFAULT);
563            } else {
564                configDefault = new Path(coordAppPath.getParent(), CONFIG_DEFAULT);
565            }
566
567            if (fs.exists(configDefault)) {
568                Configuration defaultConf = new XConfiguration(fs.open(configDefault));
569                PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES);
570                XConfiguration.injectDefaults(defaultConf, conf);
571            }
572            else {
573                LOG.info("configDefault Doesn't exist " + configDefault);
574            }
575            PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES);
576
577            // Resolving all variables in the job properties.
578            // This ensures the Hadoop Configuration semantics is preserved.
579            XConfiguration resolvedVarsConf = new XConfiguration();
580            for (Map.Entry<String, String> entry : conf) {
581                resolvedVarsConf.set(entry.getKey(), conf.get(entry.getKey()));
582            }
583            conf = resolvedVarsConf;
584        }
585        catch (IOException e) {
586            throw new CommandException(ErrorCode.E0702, e.getMessage() + " : Problem reading default config "
587                    + configDefault, e);
588        }
589        catch (HadoopAccessorException e) {
590            throw new CommandException(e);
591        }
592        LOG.debug("Merged CONF :" + XmlUtils.prettyPrint(conf).toString());
593    }
594
595    /**
596     * The method resolve all the variables that are defined in configuration. It also include the data set definition
597     * from dataset file into XML.
598     *
599     * @param appXml : Original job XML
600     * @param conf : Configuration of the job
601     * @param coordJob : Coordinator job bean to be populated.
602     * @return Resolved and modified job XML element.
603     * @throws CoordinatorJobException thrown if failed to resolve basic entities or include referred datasets
604     * @throws Exception thrown if failed to resolve basic entities or include referred datasets
605     */
606    public Element basicResolveAndIncludeDS(String appXml, Configuration conf, CoordinatorJobBean coordJob)
607    throws CoordinatorJobException, Exception {
608        Element basicResolvedApp = resolveInitial(conf, appXml, coordJob);
609        includeDataSets(basicResolvedApp, conf);
610        return basicResolvedApp;
611    }
612
613    /**
614     * Insert data set into data-in and data-out tags.
615     *
616     * @param eAppXml : coordinator application XML
617     * @param eDatasets : DataSet XML
618     */
619    @SuppressWarnings("unchecked")
620    private void insertDataSet(Element eAppXml, Element eDatasets) {
621        // Adding DS definition in the coordinator XML
622        Element inputList = eAppXml.getChild("input-events", eAppXml.getNamespace());
623        if (inputList != null) {
624            for (Element dataIn : (List<Element>) inputList.getChildren("data-in", eAppXml.getNamespace())) {
625                Element eDataset = findDataSet(eDatasets, dataIn.getAttributeValue("dataset"));
626                dataIn.getContent().add(0, eDataset);
627            }
628        }
629        Element outputList = eAppXml.getChild("output-events", eAppXml.getNamespace());
630        if (outputList != null) {
631            for (Element dataOut : (List<Element>) outputList.getChildren("data-out", eAppXml.getNamespace())) {
632                Element eDataset = findDataSet(eDatasets, dataOut.getAttributeValue("dataset"));
633                dataOut.getContent().add(0, eDataset);
634            }
635        }
636    }
637
638    /**
639     * Find a specific dataset from a list of Datasets.
640     *
641     * @param eDatasets : List of data sets
642     * @param name : queried data set name
643     * @return one Dataset element. otherwise throw Exception
644     */
645    @SuppressWarnings("unchecked")
646    private static Element findDataSet(Element eDatasets, String name) {
647        for (Element eDataset : (List<Element>) eDatasets.getChildren("dataset", eDatasets.getNamespace())) {
648            if (eDataset.getAttributeValue("name").equals(name)) {
649                eDataset = (Element) eDataset.clone();
650                eDataset.detach();
651                return eDataset;
652            }
653        }
654        throw new RuntimeException("undefined dataset: " + name);
655    }
656
657    /**
658     * Initialize all the required EL Evaluators.
659     */
660    protected void initEvaluators() {
661        evalFreq = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-submit-freq");
662        evalNofuncs = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-submit-nofuncs");
663        evalInst = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-submit-instances");
664        evalAction = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-action-start");
665        evalTimeout = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-wait-timeout");
666        evalInitialInstance = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-submit-initial-instance");
667
668    }
669
670    /**
671     * Resolve basic entities using job Configuration.
672     *
673     * @param conf :Job configuration
674     * @param appXml : Original job XML
675     * @param coordJob : Coordinator job bean to be populated.
676     * @return Resolved job XML element.
677     * @throws CoordinatorJobException thrown if failed to resolve basic entities
678     * @throws Exception thrown if failed to resolve basic entities
679     */
680    @SuppressWarnings("unchecked")
681    protected Element resolveInitial(Configuration conf, String appXml, CoordinatorJobBean coordJob)
682    throws CoordinatorJobException, Exception {
683        Element eAppXml = XmlUtils.parseXml(appXml);
684        // job's main attributes
685        // frequency
686        String val = resolveAttribute("frequency", eAppXml, evalFreq);
687        int ival = 0;
688
689        val = ParamChecker.checkFrequency(val);
690        coordJob.setFrequency(val);
691        TimeUnit tmp = (evalFreq.getVariable("timeunit") == null) ? TimeUnit.MINUTE : ((TimeUnit) evalFreq
692                .getVariable("timeunit"));
693        try {
694            Integer.parseInt(val);
695        }
696        catch (NumberFormatException ex) {
697            tmp=TimeUnit.CRON;
698        }
699
700        addAnAttribute("freq_timeunit", eAppXml, tmp.toString());
701        // TimeUnit
702        coordJob.setTimeUnit(CoordinatorJob.Timeunit.valueOf(tmp.toString()));
703        // End Of Duration
704        tmp = evalFreq.getVariable("endOfDuration") == null ? TimeUnit.NONE : ((TimeUnit) evalFreq
705                .getVariable("endOfDuration"));
706        addAnAttribute("end_of_duration", eAppXml, tmp.toString());
707        // coordJob.setEndOfDuration(tmp) // TODO: Add new attribute in Job bean
708
709        // Application name
710        if (this.coordName == null) {
711            String name = ELUtils.resolveAppName(eAppXml.getAttribute("name").getValue(), conf);
712            coordJob.setAppName(name);
713        }
714        else {
715            // this coord job is created from bundle
716            coordJob.setAppName(this.coordName);
717        }
718
719        // start time
720        val = resolveAttribute("start", eAppXml, evalNofuncs);
721        ParamChecker.checkDateOozieTZ(val, "start");
722        coordJob.setStartTime(DateUtils.parseDateOozieTZ(val));
723        // end time
724        val = resolveAttribute("end", eAppXml, evalNofuncs);
725        ParamChecker.checkDateOozieTZ(val, "end");
726        coordJob.setEndTime(DateUtils.parseDateOozieTZ(val));
727        // Time zone
728        val = resolveAttribute("timezone", eAppXml, evalNofuncs);
729        ParamChecker.checkTimeZone(val, "timezone");
730        coordJob.setTimeZone(val);
731
732        // controls
733        val = resolveTagContents("timeout", eAppXml.getChild("controls", eAppXml.getNamespace()), evalTimeout);
734        if (val != null && val != "") {
735            int t = Integer.parseInt(val);
736            tmp = (evalTimeout.getVariable("timeunit") == null) ? TimeUnit.MINUTE : ((TimeUnit) evalTimeout
737                    .getVariable("timeunit"));
738            switch (tmp) {
739                case HOUR:
740                    val = String.valueOf(t * 60);
741                    break;
742                case DAY:
743                    val = String.valueOf(t * 60 * 24);
744                    break;
745                case MONTH:
746                    val = String.valueOf(t * 60 * 24 * 30);
747                    break;
748                default:
749                    break;
750            }
751        }
752        else {
753            val = ConfigurationService.get(CONF_DEFAULT_TIMEOUT_NORMAL);
754        }
755
756        ival = ParamChecker.checkInteger(val, "timeout");
757        if (ival < 0 || ival > ConfigurationService.getInt(CONF_DEFAULT_MAX_TIMEOUT)) {
758            ival = ConfigurationService.getInt(CONF_DEFAULT_MAX_TIMEOUT);
759        }
760        coordJob.setTimeout(ival);
761
762        val = resolveTagContents("concurrency", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs);
763        if (val == null || val.isEmpty()) {
764            val = ConfigurationService.get(CONF_DEFAULT_CONCURRENCY);
765        }
766        ival = ParamChecker.checkInteger(val, "concurrency");
767        coordJob.setConcurrency(ival);
768
769        val = resolveTagContents("throttle", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs);
770        if (val == null || val.isEmpty()) {
771            int defaultThrottle = ConfigurationService.getInt(CONF_DEFAULT_THROTTLE);
772            ival = defaultThrottle;
773        }
774        else {
775            ival = ParamChecker.checkInteger(val, "throttle");
776        }
777        int maxQueue = ConfigurationService.getInt(CONF_QUEUE_SIZE);
778        float factor = ConfigurationService.getFloat(CONF_MAT_THROTTLING_FACTOR);
779        int maxThrottle = (int) (maxQueue * factor);
780        if (ival > maxThrottle || ival < 1) {
781            ival = maxThrottle;
782        }
783        LOG.debug("max throttle " + ival);
784        coordJob.setMatThrottling(ival);
785
786        val = resolveTagContents("execution", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs);
787        if (val == "") {
788            val = Execution.FIFO.toString();
789        }
790        coordJob.setExecutionOrder(Execution.valueOf(val));
791        String[] acceptedVals = { Execution.LIFO.toString(), Execution.FIFO.toString(), Execution.LAST_ONLY.toString(),
792            Execution.NONE.toString()};
793        ParamChecker.isMember(val, acceptedVals, "execution");
794
795        // datasets
796        resolveTagContents("include", eAppXml.getChild("datasets", eAppXml.getNamespace()), evalNofuncs);
797        // for each data set
798        resolveDataSets(eAppXml);
799        HashMap<String, String> dataNameList = new HashMap<String, String>();
800        resolveIODataset(eAppXml);
801        resolveIOEvents(eAppXml, dataNameList);
802
803        if (CoordUtils.isInputLogicSpecified(eAppXml)) {
804            resolveInputLogic(eAppXml.getChild(CoordInputLogicEvaluator.INPUT_LOGIC, eAppXml.getNamespace()), evalInst,
805                    dataNameList);
806        }
807
808        resolveTagContents("app-path", eAppXml.getChild("action", eAppXml.getNamespace()).getChild("workflow",
809                eAppXml.getNamespace()), evalNofuncs);
810        // TODO: If action or workflow tag is missing, NullPointerException will
811        // occur
812        Element configElem = eAppXml.getChild("action", eAppXml.getNamespace()).getChild("workflow",
813                eAppXml.getNamespace()).getChild("configuration", eAppXml.getNamespace());
814        evalData = CoordELEvaluator.createELEvaluatorForDataEcho(conf, "coord-job-submit-data", dataNameList);
815        if (configElem != null) {
816            for (Element propElem : (List<Element>) configElem.getChildren("property", configElem.getNamespace())) {
817                resolveTagContents("name", propElem, evalData);
818                // Want to check the data-integrity but don't want to modify the
819                // XML
820                // for properties only
821                Element tmpProp = (Element) propElem.clone();
822                resolveTagContents("value", tmpProp, evalData);
823            }
824        }
825        evalSla = CoordELEvaluator.createELEvaluatorForDataAndConf(conf, "coord-sla-submit", dataNameList);
826        resolveSLA(eAppXml, coordJob);
827        return eAppXml;
828    }
829
830    /**
831     * Resolve SLA events
832     *
833     * @param eAppXml job XML
834     * @param coordJob coordinator job bean
835     * @throws CommandException thrown if failed to resolve sla events
836     */
837    private void resolveSLA(Element eAppXml, CoordinatorJobBean coordJob) throws CommandException {
838        Element eSla = XmlUtils.getSLAElement(eAppXml.getChild("action", eAppXml.getNamespace()));
839
840        if (eSla != null) {
841            String slaXml = XmlUtils.prettyPrint(eSla).toString();
842            try {
843                // EL evaluation
844                slaXml = evalSla.evaluate(slaXml, String.class);
845                // Validate against semantic SXD
846                XmlUtils.validateData(slaXml, SchemaName.SLA_ORIGINAL);
847            }
848            catch (Exception e) {
849                throw new CommandException(ErrorCode.E1004, "Validation ERROR :" + e.getMessage(), e);
850            }
851        }
852    }
853
854    /**
855     * Resolve input-events/data-in and output-events/data-out tags.
856     *
857     * @param eJobOrg : Job element
858     * @throws CoordinatorJobException thrown if failed to resolve input and output events
859     */
860    @SuppressWarnings("unchecked")
861    private void resolveIOEvents(Element eJobOrg, HashMap<String, String> dataNameList) throws CoordinatorJobException {
862        // Resolving input-events/data-in
863        // Clone the job and don't update anything in the original
864        Element eJob = (Element) eJobOrg.clone();
865        Element inputList = eJob.getChild("input-events", eJob.getNamespace());
866        if (inputList != null) {
867            TreeSet<String> eventNameSet = new TreeSet<String>();
868            for (Element dataIn : (List<Element>) inputList.getChildren("data-in", eJob.getNamespace())) {
869                String dataInName = dataIn.getAttributeValue("name");
870                dataNameList.put(dataInName, "data-in");
871                // check whether there is any duplicate data-in name
872                if (eventNameSet.contains(dataInName)) {
873                    throw new RuntimeException("Duplicate dataIn name " + dataInName);
874                }
875                else {
876                    eventNameSet.add(dataInName);
877                }
878                resolveTagContents("instance", dataIn, evalInst);
879                resolveTagContents("start-instance", dataIn, evalInst);
880                resolveTagContents("end-instance", dataIn, evalInst);
881
882            }
883        }
884        // Resolving output-events/data-out
885        Element outputList = eJob.getChild("output-events", eJob.getNamespace());
886        if (outputList != null) {
887            TreeSet<String> eventNameSet = new TreeSet<String>();
888            for (Element dataOut : (List<Element>) outputList.getChildren("data-out", eJob.getNamespace())) {
889                String dataOutName = dataOut.getAttributeValue("name");
890                dataNameList.put(dataOutName, "data-out");
891                // check whether there is any duplicate data-out name
892                if (eventNameSet.contains(dataOutName)) {
893                    throw new RuntimeException("Duplicate dataIn name " + dataOutName);
894                }
895                else {
896                    eventNameSet.add(dataOutName);
897                }
898                resolveTagContents("instance", dataOut, evalInst);
899
900            }
901        }
902
903    }
904
905    private void resolveInputLogic(Element root, ELEvaluator evalInputLogic, HashMap<String, String> dataNameList)
906            throws Exception {
907        for (Object event : root.getChildren()) {
908            Element inputElement = (Element) event;
909            resolveAttribute("dataset", inputElement, evalInputLogic);
910            String name=resolveAttribute("name", inputElement, evalInputLogic);
911            resolveAttribute("or", inputElement, evalInputLogic);
912            resolveAttribute("and", inputElement, evalInputLogic);
913            resolveAttribute("combine", inputElement, evalInputLogic);
914
915            if (name != null) {
916                dataNameList.put(name, "data-in");
917            }
918
919            if (!inputElement.getChildren().isEmpty()) {
920                resolveInputLogic(inputElement, evalInputLogic, dataNameList);
921            }
922        }
923    }
924
925    /**
926     * Resolve input-events/dataset and output-events/dataset tags.
927     *
928     * @param eJob : Job element
929     * @throws CoordinatorJobException thrown if failed to resolve input and output events
930     */
931    @SuppressWarnings("unchecked")
932    private void resolveIODataset(Element eAppXml) throws CoordinatorJobException {
933        // Resolving input-events/data-in
934        Element inputList = eAppXml.getChild("input-events", eAppXml.getNamespace());
935        if (inputList != null) {
936            for (Element dataIn : (List<Element>) inputList.getChildren("data-in", eAppXml.getNamespace())) {
937                resolveAttribute("dataset", dataIn, evalInst);
938
939            }
940        }
941        // Resolving output-events/data-out
942        Element outputList = eAppXml.getChild("output-events", eAppXml.getNamespace());
943        if (outputList != null) {
944            for (Element dataOut : (List<Element>) outputList.getChildren("data-out", eAppXml.getNamespace())) {
945                resolveAttribute("dataset", dataOut, evalInst);
946
947            }
948        }
949
950    }
951
952
953    /**
954     * Add an attribute into XML element.
955     *
956     * @param attrName :attribute name
957     * @param elem : Element to add attribute
958     * @param value :Value of attribute
959     */
960    private void addAnAttribute(String attrName, Element elem, String value) {
961        elem.setAttribute(attrName, value);
962    }
963
964    /**
965     * Resolve datasets using job configuration.
966     *
967     * @param eAppXml : Job Element XML
968     * @throws Exception thrown if failed to resolve datasets
969     */
970    @SuppressWarnings("unchecked")
971    private void resolveDataSets(Element eAppXml) throws Exception {
972        Element datasetList = eAppXml.getChild("datasets", eAppXml.getNamespace());
973        if (datasetList != null) {
974
975            List<Element> dsElems = datasetList.getChildren("dataset", eAppXml.getNamespace());
976            resolveDataSets(dsElems);
977            resolveTagContents("app-path", eAppXml.getChild("action", eAppXml.getNamespace()).getChild("workflow",
978                    eAppXml.getNamespace()), evalNofuncs);
979        }
980    }
981
982    /**
983     * Resolve datasets using job configuration.
984     *
985     * @param dsElems : Data set XML element.
986     * @throws CoordinatorJobException thrown if failed to resolve datasets
987     */
988    private void resolveDataSets(List<Element> dsElems) throws CoordinatorJobException {
989        for (Element dsElem : dsElems) {
990            // Setting up default TimeUnit and EndOFDuraion
991            evalFreq.setVariable("timeunit", TimeUnit.MINUTE);
992            evalFreq.setVariable("endOfDuration", TimeUnit.NONE);
993
994            String val = resolveAttribute("frequency", dsElem, evalFreq);
995            int ival = ParamChecker.checkInteger(val, "frequency");
996            ParamChecker.checkGTZero(ival, "frequency");
997            addAnAttribute("freq_timeunit", dsElem, evalFreq.getVariable("timeunit") == null ? TimeUnit.MINUTE
998                    .toString() : ((TimeUnit) evalFreq.getVariable("timeunit")).toString());
999            addAnAttribute("end_of_duration", dsElem, evalFreq.getVariable("endOfDuration") == null ? TimeUnit.NONE
1000                    .toString() : ((TimeUnit) evalFreq.getVariable("endOfDuration")).toString());
1001            val = resolveAttribute("initial-instance", dsElem, evalInitialInstance);
1002            ParamChecker.checkDateOozieTZ(val, "initial-instance");
1003            checkInitialInstance(val);
1004            val = resolveAttribute("timezone", dsElem, evalNofuncs);
1005            ParamChecker.checkTimeZone(val, "timezone");
1006            resolveTagContents("uri-template", dsElem, evalNofuncs);
1007            resolveTagContents("done-flag", dsElem, evalNofuncs);
1008        }
1009    }
1010
1011    /**
1012     * Resolve the content of a tag.
1013     *
1014     * @param tagName : Tag name of job XML i.e. <timeout> 10 </timeout>
1015     * @param elem : Element where the tag exists.
1016     * @param eval : EL evealuator
1017     * @return Resolved tag content.
1018     * @throws CoordinatorJobException thrown if failed to resolve tag content
1019     */
1020    @SuppressWarnings("unchecked")
1021    private String resolveTagContents(String tagName, Element elem, ELEvaluator eval) throws CoordinatorJobException {
1022        String ret = "";
1023        if (elem != null) {
1024            for (Element tagElem : (List<Element>) elem.getChildren(tagName, elem.getNamespace())) {
1025                if (tagElem != null) {
1026                    String updated;
1027                    try {
1028                        updated = CoordELFunctions.evalAndWrap(eval, tagElem.getText().trim());
1029
1030                    }
1031                    catch (Exception e) {
1032                        throw new CoordinatorJobException(ErrorCode.E1004, e.getMessage(), e);
1033                    }
1034                    tagElem.removeContent();
1035                    tagElem.addContent(updated);
1036                    ret += updated;
1037                }
1038            }
1039        }
1040        return ret;
1041    }
1042
1043    /**
1044     * Resolve an attribute value.
1045     *
1046     * @param attrName : Attribute name.
1047     * @param elem : XML Element where attribute is defiend
1048     * @param eval : ELEvaluator used to resolve
1049     * @return Resolved attribute value
1050     * @throws CoordinatorJobException thrown if failed to resolve an attribute value
1051     */
1052    private String resolveAttribute(String attrName, Element elem, ELEvaluator eval) throws CoordinatorJobException {
1053        Attribute attr = elem.getAttribute(attrName);
1054        String val = null;
1055        if (attr != null) {
1056            try {
1057                val = CoordELFunctions.evalAndWrap(eval, attr.getValue().trim());
1058            }
1059            catch (Exception e) {
1060                throw new CoordinatorJobException(ErrorCode.E1004, e.getMessage(), e);
1061            }
1062            attr.setValue(val);
1063        }
1064        return val;
1065    }
1066
1067    /**
1068     * Include referred datasets into XML.
1069     *
1070     * @param resolvedXml : Job XML element.
1071     * @param conf : Job configuration
1072     * @throws CoordinatorJobException thrown if failed to include referred datasets into XML
1073     */
1074    @SuppressWarnings("unchecked")
1075    protected void includeDataSets(Element resolvedXml, Configuration conf) throws CoordinatorJobException {
1076        Element datasets = resolvedXml.getChild("datasets", resolvedXml.getNamespace());
1077        Element allDataSets = new Element("all_datasets", resolvedXml.getNamespace());
1078        List<String> dsList = new ArrayList<String>();
1079        if (datasets != null) {
1080            for (Element includeElem : (List<Element>) datasets.getChildren("include", datasets.getNamespace())) {
1081                String incDSFile = includeElem.getTextTrim();
1082                includeOneDSFile(incDSFile, dsList, allDataSets, datasets.getNamespace());
1083            }
1084            for (Element e : (List<Element>) datasets.getChildren("dataset", datasets.getNamespace())) {
1085                String dsName = e.getAttributeValue("name");
1086                if (dsList.contains(dsName)) {// Override with this DS
1087                    // Remove duplicate
1088                    removeDataSet(allDataSets, dsName);
1089                }
1090                else {
1091                    dsList.add(dsName);
1092                }
1093                allDataSets.addContent((Element) e.clone());
1094            }
1095        }
1096        insertDataSet(resolvedXml, allDataSets);
1097        resolvedXml.removeChild("datasets", resolvedXml.getNamespace());
1098    }
1099
1100    /**
1101     * Include one dataset file.
1102     *
1103     * @param incDSFile : Include data set filename.
1104     * @param dsList :List of dataset names to verify the duplicate.
1105     * @param allDataSets : Element that includes all dataset definitions.
1106     * @param dsNameSpace : Data set name space
1107     * @throws CoordinatorJobException thrown if failed to include one dataset file
1108     */
1109    @SuppressWarnings("unchecked")
1110    private void includeOneDSFile(String incDSFile, List<String> dsList, Element allDataSets, Namespace dsNameSpace)
1111    throws CoordinatorJobException {
1112        Element tmpDataSets = null;
1113        try {
1114            String dsXml = readDefinition(incDSFile);
1115            LOG.debug("DSFILE :" + incDSFile + "\n" + dsXml);
1116            tmpDataSets = XmlUtils.parseXml(dsXml);
1117        }
1118        catch (JDOMException e) {
1119            LOG.warn("Error parsing included dataset [{0}].  Message [{1}]", incDSFile, e.getMessage());
1120            throw new CoordinatorJobException(ErrorCode.E0700, e.getMessage());
1121        }
1122        resolveDataSets(tmpDataSets.getChildren("dataset"));
1123        for (Element e : (List<Element>) tmpDataSets.getChildren("dataset")) {
1124            String dsName = e.getAttributeValue("name");
1125            if (dsList.contains(dsName)) {
1126                throw new RuntimeException("Duplicate Dataset " + dsName);
1127            }
1128            dsList.add(dsName);
1129            Element tmp = (Element) e.clone();
1130            // TODO: Don't like to over-write the external/include DS's namespace
1131            tmp.setNamespace(dsNameSpace);
1132            tmp.getChild("uri-template").setNamespace(dsNameSpace);
1133            if (e.getChild("done-flag") != null) {
1134                tmp.getChild("done-flag").setNamespace(dsNameSpace);
1135            }
1136            allDataSets.addContent(tmp);
1137        }
1138        // nested include
1139        for (Element includeElem : (List<Element>) tmpDataSets.getChildren("include", tmpDataSets.getNamespace())) {
1140            String incFile = includeElem.getTextTrim();
1141            includeOneDSFile(incFile, dsList, allDataSets, dsNameSpace);
1142        }
1143    }
1144
1145    /**
1146     * Remove a dataset from a list of dataset.
1147     *
1148     * @param eDatasets : List of dataset
1149     * @param name : Dataset name to be removed.
1150     */
1151    @SuppressWarnings("unchecked")
1152    private static void removeDataSet(Element eDatasets, String name) {
1153        for (Element eDataset : (List<Element>) eDatasets.getChildren("dataset", eDatasets.getNamespace())) {
1154            if (eDataset.getAttributeValue("name").equals(name)) {
1155                eDataset.detach();
1156                return;
1157            }
1158        }
1159        throw new RuntimeException("undefined dataset: " + name);
1160    }
1161
1162    /**
1163     * Read coordinator definition.
1164     *
1165     * @param appPath application path.
1166     * @return coordinator definition.
1167     * @throws CoordinatorJobException thrown if the definition could not be read.
1168     */
1169    protected String readDefinition(String appPath) throws CoordinatorJobException {
1170        String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
1171        // Configuration confHadoop = CoordUtils.getHadoopConf(conf);
1172        try {
1173            URI uri = new URI(appPath);
1174            LOG.debug("user =" + user);
1175            HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
1176            Configuration fsConf = has.createJobConf(uri.getAuthority());
1177            FileSystem fs = has.createFileSystem(user, uri, fsConf);
1178            Path appDefPath = null;
1179
1180            // app path could be a directory
1181            Path path = new Path(uri.getPath());
1182            // check file exists for dataset include file, app xml already checked
1183            if (!fs.exists(path)) {
1184                throw new URISyntaxException(path.toString(), "path not existed : " + path.toString());
1185            }
1186            if (!fs.isFile(path)) {
1187                appDefPath = new Path(path, COORDINATOR_XML_FILE);
1188            } else {
1189                appDefPath = path;
1190            }
1191
1192            Reader reader = new InputStreamReader(fs.open(appDefPath));
1193            StringWriter writer = new StringWriter();
1194            IOUtils.copyCharStream(reader, writer);
1195            return writer.toString();
1196        }
1197        catch (IOException ex) {
1198            LOG.warn("IOException :" + XmlUtils.prettyPrint(conf), ex);
1199            throw new CoordinatorJobException(ErrorCode.E1001, ex.getMessage(), ex);
1200        }
1201        catch (URISyntaxException ex) {
1202            LOG.warn("URISyException :" + ex.getMessage());
1203            throw new CoordinatorJobException(ErrorCode.E1002, appPath, ex.getMessage(), ex);
1204        }
1205        catch (HadoopAccessorException ex) {
1206            throw new CoordinatorJobException(ex);
1207        }
1208        catch (Exception ex) {
1209            LOG.warn("Exception :", ex);
1210            throw new CoordinatorJobException(ErrorCode.E1001, ex.getMessage(), ex);
1211        }
1212    }
1213
1214    /**
1215     * Write a coordinator job into database
1216     *
1217     *@param appXML : Coordinator definition xml
1218     * @param eJob : XML element of job
1219     * @param coordJob : Coordinator job bean
1220     * @return Job id
1221     * @throws CommandException thrown if unable to save coordinator job to db
1222     */
1223    protected String storeToDB(String appXML, Element eJob, CoordinatorJobBean coordJob) throws CommandException {
1224        String jobId = Services.get().get(UUIDService.class).generateId(ApplicationType.COORDINATOR);
1225        coordJob.setId(jobId);
1226
1227        coordJob.setAppPath(conf.get(OozieClient.COORDINATOR_APP_PATH));
1228        coordJob.setCreatedTime(new Date());
1229        coordJob.setUser(conf.get(OozieClient.USER_NAME));
1230        String group = ConfigUtils.getWithDeprecatedCheck(conf, OozieClient.JOB_ACL, OozieClient.GROUP_NAME, null);
1231        coordJob.setGroup(group);
1232        coordJob.setConf(XmlUtils.prettyPrint(conf).toString());
1233        coordJob.setJobXml(XmlUtils.prettyPrint(eJob).toString());
1234        coordJob.setLastActionNumber(0);
1235        coordJob.setLastModifiedTime(new Date());
1236
1237        if (!dryrun) {
1238            coordJob.setLastModifiedTime(new Date());
1239            try {
1240                CoordJobQueryExecutor.getInstance().insert(coordJob);
1241            }
1242            catch (JPAExecutorException jpaee) {
1243                coordJob.setId(null);
1244                coordJob.setStatus(CoordinatorJob.Status.FAILED);
1245                throw new CommandException(jpaee);
1246            }
1247        }
1248        return jobId;
1249    }
1250
1251    /*
1252     * this method checks if the initial-instance specified for a particular
1253       is not a date earlier than the oozie server default Jan 01, 1970 00:00Z UTC
1254     */
1255    private void checkInitialInstance(String val) throws CoordinatorJobException, IllegalArgumentException {
1256        Date initialInstance, givenInstance;
1257        try {
1258            initialInstance = DateUtils.parseDateUTC("1970-01-01T00:00Z");
1259            givenInstance = DateUtils.parseDateOozieTZ(val);
1260        }
1261        catch (Exception e) {
1262            throw new IllegalArgumentException("Unable to parse dataset initial-instance string '" + val +
1263                                               "' to Date object. ",e);
1264        }
1265        if(givenInstance.compareTo(initialInstance) < 0) {
1266            throw new CoordinatorJobException(ErrorCode.E1021, "Dataset initial-instance " + val +
1267                    " is earlier than the default initial instance " + DateUtils.formatDateOozieTZ(initialInstance));
1268        }
1269    }
1270
1271    /* (non-Javadoc)
1272     * @see org.apache.oozie.command.XCommand#getEntityKey()
1273     */
1274    @Override
1275    public String getEntityKey() {
1276        return null;
1277    }
1278
1279    /* (non-Javadoc)
1280     * @see org.apache.oozie.command.XCommand#isLockRequired()
1281     */
1282    @Override
1283    protected boolean isLockRequired() {
1284        return false;
1285    }
1286
1287    /* (non-Javadoc)
1288     * @see org.apache.oozie.command.XCommand#loadState()
1289     */
1290    @Override
1291    protected void loadState() throws CommandException {
1292        jpaService = Services.get().get(JPAService.class);
1293        if (jpaService == null) {
1294            throw new CommandException(ErrorCode.E0610);
1295        }
1296        coordJob = new CoordinatorJobBean();
1297        if (this.bundleId != null) {
1298            // this coord job is created from bundle
1299            coordJob.setBundleId(this.bundleId);
1300            // first use bundle id if submit thru bundle
1301            LogUtils.setLogInfo(this.bundleId);
1302        }
1303        if (this.coordName != null) {
1304            // this coord job is created from bundle
1305            coordJob.setAppName(this.coordName);
1306        }
1307        setJob(coordJob);
1308
1309    }
1310
1311    /* (non-Javadoc)
1312     * @see org.apache.oozie.command.XCommand#verifyPrecondition()
1313     */
1314    @Override
1315    protected void verifyPrecondition() throws CommandException {
1316
1317    }
1318
1319    /* (non-Javadoc)
1320     * @see org.apache.oozie.command.TransitionXCommand#notifyParent()
1321     */
1322    @Override
1323    public void notifyParent() throws CommandException {
1324        // update bundle action
1325        if (coordJob.getBundleId() != null) {
1326            LOG.debug("Updating bundle record: " + coordJob.getBundleId() + " for coord id: " + coordJob.getId());
1327            BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus);
1328            bundleStatusUpdate.call();
1329        }
1330    }
1331
1332    /* (non-Javadoc)
1333     * @see org.apache.oozie.command.TransitionXCommand#updateJob()
1334     */
1335    @Override
1336    public void updateJob() throws CommandException {
1337    }
1338
1339    /* (non-Javadoc)
1340     * @see org.apache.oozie.command.TransitionXCommand#getJob()
1341     */
1342    @Override
1343    public Job getJob() {
1344        return coordJob;
1345    }
1346
1347    @Override
1348    public void performWrites() throws CommandException {
1349    }
1350}