001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.coord;
019    
020    import java.io.IOException;
021    import java.io.InputStreamReader;
022    import java.io.Reader;
023    import java.io.StringReader;
024    import java.io.StringWriter;
025    import java.net.URI;
026    import java.net.URISyntaxException;
027    import java.util.ArrayList;
028    import java.util.Date;
029    import java.util.HashMap;
030    import java.util.HashSet;
031    import java.util.List;
032    import java.util.Map;
033    import java.util.Set;
034    import java.util.TreeSet;
035    
036    import javax.xml.transform.stream.StreamSource;
037    import javax.xml.validation.Validator;
038    
039    import org.apache.hadoop.conf.Configuration;
040    import org.apache.hadoop.fs.FileSystem;
041    import org.apache.hadoop.fs.Path;
042    import org.apache.oozie.CoordinatorJobBean;
043    import org.apache.oozie.ErrorCode;
044    import org.apache.oozie.client.CoordinatorJob;
045    import org.apache.oozie.client.Job;
046    import org.apache.oozie.client.OozieClient;
047    import org.apache.oozie.client.CoordinatorJob.Execution;
048    import org.apache.oozie.command.CommandException;
049    import org.apache.oozie.command.SubmitTransitionXCommand;
050    import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
051    import org.apache.oozie.coord.CoordELEvaluator;
052    import org.apache.oozie.coord.CoordELFunctions;
053    import org.apache.oozie.coord.CoordinatorJobException;
054    import org.apache.oozie.coord.TimeUnit;
055    import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor;
056    import org.apache.oozie.executor.jpa.JPAExecutorException;
057    import org.apache.oozie.service.DagXLogInfoService;
058    import org.apache.oozie.service.HadoopAccessorException;
059    import org.apache.oozie.service.HadoopAccessorService;
060    import org.apache.oozie.service.JPAService;
061    import org.apache.oozie.service.SchemaService;
062    import org.apache.oozie.service.Service;
063    import org.apache.oozie.service.Services;
064    import org.apache.oozie.service.UUIDService;
065    import org.apache.oozie.service.WorkflowAppService;
066    import org.apache.oozie.service.SchemaService.SchemaName;
067    import org.apache.oozie.service.UUIDService.ApplicationType;
068    import org.apache.oozie.util.DateUtils;
069    import org.apache.oozie.util.ELEvaluator;
070    import org.apache.oozie.util.IOUtils;
071    import org.apache.oozie.util.InstrumentUtils;
072    import org.apache.oozie.util.LogUtils;
073    import org.apache.oozie.util.ParamChecker;
074    import org.apache.oozie.util.PropertiesUtils;
075    import org.apache.oozie.util.XConfiguration;
076    import org.apache.oozie.util.XLog;
077    import org.apache.oozie.util.XmlUtils;
078    import org.jdom.Attribute;
079    import org.jdom.Element;
080    import org.jdom.JDOMException;
081    import org.jdom.Namespace;
082    import org.xml.sax.SAXException;
083    
084    /**
085     * This class provides the functionalities to resolve a coordinator job XML and write the job information into a DB
086     * table.
087     * <p/>
088     * Specifically it performs the following functions: 1. Resolve all the variables or properties using job
089     * configurations. 2. Insert all datasets definition as part of the <data-in> and <data-out> tags. 3. Validate the XML
090     * at runtime.
091     */
092    public class CoordSubmitXCommand extends SubmitTransitionXCommand {
093    
094        private Configuration conf;
095        private final String authToken;
096        private final String bundleId;
097        private final String coordName;
098        private boolean dryrun;
099        private JPAService jpaService = null;
100        private CoordinatorJob.Status prevStatus = CoordinatorJob.Status.PREP;
101    
102        public static final String CONFIG_DEFAULT = "coord-config-default.xml";
103        public static final String COORDINATOR_XML_FILE = "coordinator.xml";
104    
105        private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>();
106        private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>();
107    
108        private CoordinatorJobBean coordJob = null;
109        /**
110         * Default timeout for normal jobs, in minutes, after which coordinator input check will timeout
111         */
112        public static final String CONF_DEFAULT_TIMEOUT_NORMAL = Service.CONF_PREFIX + "coord.normal.default.timeout";
113    
114        public static final String CONF_DEFAULT_CONCURRENCY = Service.CONF_PREFIX + "coord.default.concurrency";
115    
116        public static final String CONF_DEFAULT_THROTTLE = Service.CONF_PREFIX + "coord.default.throttle";
117    
118        public static final String CONF_MAT_THROTTLING_FACTOR = Service.CONF_PREFIX
119                + "coord.materialization.throttling.factor";
120    
121        /**
122         * Default MAX timeout in minutes, after which coordinator input check will timeout
123         */
124        public static final String CONF_DEFAULT_MAX_TIMEOUT = Service.CONF_PREFIX + "coord.default.max.timeout";
125    
126    
127        public static final String CONF_QUEUE_SIZE = Service.CONF_PREFIX + "CallableQueueService.queue.size";
128    
129        private ELEvaluator evalFreq = null;
130        private ELEvaluator evalNofuncs = null;
131        private ELEvaluator evalData = null;
132        private ELEvaluator evalInst = null;
133        private ELEvaluator evalSla = null;
134    
135        static {
136            String[] badUserProps = { PropertiesUtils.YEAR, PropertiesUtils.MONTH, PropertiesUtils.DAY,
137                    PropertiesUtils.HOUR, PropertiesUtils.MINUTE, PropertiesUtils.DAYS, PropertiesUtils.HOURS,
138                    PropertiesUtils.MINUTES, PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB,
139                    PropertiesUtils.TB, PropertiesUtils.PB, PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN,
140                    PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN, PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS };
141            PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES);
142    
143            String[] badDefaultProps = { PropertiesUtils.HADOOP_USER, PropertiesUtils.HADOOP_UGI,
144                    WorkflowAppService.HADOOP_JT_KERBEROS_NAME, WorkflowAppService.HADOOP_NN_KERBEROS_NAME };
145            PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES);
146            PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES);
147        }
148    
149        /**
150         * Constructor to create the Coordinator Submit Command.
151         *
152         * @param conf : Configuration for Coordinator job
153         * @param authToken : To be used for authentication
154         */
155        public CoordSubmitXCommand(Configuration conf, String authToken) {
156            super("coord_submit", "coord_submit", 1);
157            this.conf = ParamChecker.notNull(conf, "conf");
158            this.authToken = ParamChecker.notEmpty(authToken, "authToken");
159            this.bundleId = null;
160            this.coordName = null;
161        }
162    
163        /**
164         * Constructor to create the Coordinator Submit Command by bundle job.
165         *
166         * @param conf : Configuration for Coordinator job
167         * @param authToken : To be used for authentication
168         * @param bundleId : bundle id
169         * @param coordName : coord name
170         */
171        public CoordSubmitXCommand(Configuration conf, String authToken, String bundleId, String coordName) {
172            super("coord_submit", "coord_submit", 1);
173            this.conf = ParamChecker.notNull(conf, "conf");
174            this.authToken = ParamChecker.notEmpty(authToken, "authToken");
175            this.bundleId = ParamChecker.notEmpty(bundleId, "bundleId");
176            this.coordName = ParamChecker.notEmpty(coordName, "coordName");
177        }
178    
179        /**
180         * Constructor to create the Coordinator Submit Command.
181         *
182         * @param dryrun : if dryrun
183         * @param conf : Configuration for Coordinator job
184         * @param authToken : To be used for authentication
185         */
186        public CoordSubmitXCommand(boolean dryrun, Configuration conf, String authToken) {
187            this(conf, authToken);
188            this.dryrun = dryrun;
189        }
190    
191        /* (non-Javadoc)
192         * @see org.apache.oozie.command.XCommand#execute()
193         */
194        @Override
195        protected String submit() throws CommandException {
196            String jobId = null;
197            LOG.info("STARTED Coordinator Submit");
198            InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
199    
200            boolean exceptionOccured = false;
201            try {
202                mergeDefaultConfig();
203    
204                String appXml = readAndValidateXml();
205                coordJob.setOrigJobXml(appXml);
206                LOG.debug("jobXml after initial validation " + XmlUtils.prettyPrint(appXml).toString());
207    
208                String appNamespace = readAppNamespace(appXml);
209                coordJob.setAppNamespace(appNamespace);
210    
211                appXml = XmlUtils.removeComments(appXml);
212                initEvaluators();
213                Element eJob = basicResolveAndIncludeDS(appXml, conf, coordJob);
214                LOG.debug("jobXml after all validation " + XmlUtils.prettyPrint(eJob).toString());
215    
216                jobId = storeToDB(eJob, coordJob);
217                // log job info for coordinator job
218                LogUtils.setLogInfo(coordJob, logInfo);
219                LOG = XLog.resetPrefix(LOG);
220    
221                if (!dryrun) {
222                    // submit a command to materialize jobs for the next 1 hour (3600 secs)
223                    // so we don't wait 10 mins for the Service to run.
224                    queue(new CoordMaterializeTransitionXCommand(jobId, 3600), 100);
225                }
226                else {
227                    Date startTime = coordJob.getStartTime();
228                    long startTimeMilli = startTime.getTime();
229                    long endTimeMilli = startTimeMilli + (3600 * 1000);
230                    Date jobEndTime = coordJob.getEndTime();
231                    Date endTime = new Date(endTimeMilli);
232                    if (endTime.compareTo(jobEndTime) > 0) {
233                        endTime = jobEndTime;
234                    }
235                    jobId = coordJob.getId();
236                    LOG.info("[" + jobId + "]: Update status to RUNNING");
237                    coordJob.setStatus(Job.Status.RUNNING);
238                    coordJob.setPending();
239                    CoordActionMaterializeCommand coordActionMatCom = new CoordActionMaterializeCommand(jobId, startTime,
240                            endTime);
241                    Configuration jobConf = null;
242                    try {
243                        jobConf = new XConfiguration(new StringReader(coordJob.getConf()));
244                    }
245                    catch (IOException e1) {
246                        LOG.warn("Configuration parse error. read from DB :" + coordJob.getConf(), e1);
247                    }
248                    String action = coordActionMatCom.materializeJobs(true, coordJob, jobConf, null);
249                    String output = coordJob.getJobXml() + System.getProperty("line.separator")
250                    + "***actions for instance***" + action;
251                    return output;
252                }
253            }
254            catch (CoordinatorJobException cex) {
255                exceptionOccured = true;
256                LOG.warn("ERROR:  ", cex);
257                throw new CommandException(cex);
258            }
259            catch (IllegalArgumentException iex) {
260                exceptionOccured = true;
261                LOG.warn("ERROR:  ", iex);
262                throw new CommandException(ErrorCode.E1003, iex);
263            }
264            catch (Exception ex) {
265                exceptionOccured = true;
266                LOG.warn("ERROR:  ", ex);
267                throw new CommandException(ErrorCode.E0803, ex);
268            }
269            finally {
270                if (exceptionOccured) {
271                    if(coordJob.getId() == null || coordJob.getId().equalsIgnoreCase("")){
272                        coordJob.setStatus(CoordinatorJob.Status.FAILED);
273                        coordJob.resetPending();
274                    }
275                }
276            }
277    
278            LOG.info("ENDED Coordinator Submit jobId=" + jobId);
279            return jobId;
280        }
281    
282        /**
283         * Read the application XML and validate against coordinator Schema
284         *
285         * @return validated coordinator XML
286         * @throws CoordinatorJobException thrown if unable to read or validate coordinator xml
287         */
288        private String readAndValidateXml() throws CoordinatorJobException {
289            String appPath = ParamChecker.notEmpty(conf.get(OozieClient.COORDINATOR_APP_PATH),
290                    OozieClient.COORDINATOR_APP_PATH);
291            String coordXml = readDefinition(appPath);
292            validateXml(coordXml);
293            return coordXml;
294        }
295    
296        /**
297         * Validate against Coordinator XSD file
298         *
299         * @param xmlContent : Input coordinator xml
300         * @throws CoordinatorJobException thrown if unable to validate coordinator xml
301         */
302        private void validateXml(String xmlContent) throws CoordinatorJobException {
303            javax.xml.validation.Schema schema = Services.get().get(SchemaService.class).getSchema(SchemaName.COORDINATOR);
304            Validator validator = schema.newValidator();
305            try {
306                validator.validate(new StreamSource(new StringReader(xmlContent)));
307            }
308            catch (SAXException ex) {
309                LOG.warn("SAXException :", ex);
310                throw new CoordinatorJobException(ErrorCode.E0701, ex.getMessage(), ex);
311            }
312            catch (IOException ex) {
313                LOG.warn("IOException :", ex);
314                throw new CoordinatorJobException(ErrorCode.E0702, ex.getMessage(), ex);
315            }
316        }
317    
318        /**
319         * Read the application XML schema namespace
320         *
321         * @param xmlContent input coordinator xml
322         * @return app xml namespace
323         * @throws CoordinatorJobException
324         */
325        private String readAppNamespace(String xmlContent) throws CoordinatorJobException {
326            try {
327                Element coordXmlElement = XmlUtils.parseXml(xmlContent);
328                Namespace ns = coordXmlElement.getNamespace();
329                if (ns != null && bundleId != null && ns.getURI().equals(SchemaService.COORDINATOR_NAMESPACE_URI_1)) {
330                    throw new CoordinatorJobException(ErrorCode.E1319, "bundle app can not submit coordinator namespace "
331                            + SchemaService.COORDINATOR_NAMESPACE_URI_1 + ", please use 0.2 or later");
332                }
333                if (ns != null) {
334                    return ns.getURI();
335                }
336                else {
337                    throw new CoordinatorJobException(ErrorCode.E0700, "the application xml namespace is not given");
338                }
339            }
340            catch (JDOMException ex) {
341                LOG.warn("JDOMException :", ex);
342                throw new CoordinatorJobException(ErrorCode.E0700, ex.getMessage(), ex);
343            }
344        }
345    
346        /**
347         * Merge default configuration with user-defined configuration.
348         *
349         * @throws CommandException thrown if failed to read or merge configurations
350         */
351        protected void mergeDefaultConfig() throws CommandException {
352            Path configDefault = null;
353            try {
354                String coordAppPathStr = conf.get(OozieClient.COORDINATOR_APP_PATH);
355                Path coordAppPath = new Path(coordAppPathStr);
356                String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
357                String group = ParamChecker.notEmpty(conf.get(OozieClient.GROUP_NAME), OozieClient.GROUP_NAME);
358                FileSystem fs = Services.get().get(HadoopAccessorService.class).createFileSystem(user, group,
359                        coordAppPath.toUri(), new Configuration());
360    
361                // app path could be a directory
362                if (!fs.isFile(coordAppPath)) {
363                    configDefault = new Path(coordAppPath, CONFIG_DEFAULT);
364                } else {
365                    configDefault = new Path(coordAppPath.getParent(), CONFIG_DEFAULT);
366                }
367    
368                if (fs.exists(configDefault)) {
369                    Configuration defaultConf = new XConfiguration(fs.open(configDefault));
370                    PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES);
371                    XConfiguration.injectDefaults(defaultConf, conf);
372                }
373                else {
374                    LOG.info("configDefault Doesn't exist " + configDefault);
375                }
376                PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES);
377    
378                // Resolving all variables in the job properties.
379                // This ensures the Hadoop Configuration semantics is preserved.
380                XConfiguration resolvedVarsConf = new XConfiguration();
381                for (Map.Entry<String, String> entry : conf) {
382                    resolvedVarsConf.set(entry.getKey(), conf.get(entry.getKey()));
383                }
384                conf = resolvedVarsConf;
385            }
386            catch (IOException e) {
387                throw new CommandException(ErrorCode.E0702, e.getMessage() + " : Problem reading default config "
388                        + configDefault, e);
389            }
390            catch (HadoopAccessorException e) {
391                throw new CommandException(e);
392            }
393            LOG.debug("Merged CONF :" + XmlUtils.prettyPrint(conf).toString());
394        }
395    
396        /**
397         * The method resolve all the variables that are defined in configuration. It also include the data set definition
398         * from dataset file into XML.
399         *
400         * @param appXml : Original job XML
401         * @param conf : Configuration of the job
402         * @param coordJob : Coordinator job bean to be populated.
403         * @return Resolved and modified job XML element.
404         * @throws CoordinatorJobException thrown if failed to resolve basic entities or include referred datasets
405         * @throws Exception thrown if failed to resolve basic entities or include referred datasets
406         */
407        public Element basicResolveAndIncludeDS(String appXml, Configuration conf, CoordinatorJobBean coordJob)
408        throws CoordinatorJobException, Exception {
409            Element basicResolvedApp = resolveInitial(conf, appXml, coordJob);
410            includeDataSets(basicResolvedApp, conf);
411            return basicResolvedApp;
412        }
413    
414        /**
415         * Insert data set into data-in and data-out tags.
416         *
417         * @param eAppXml : coordinator application XML
418         * @param eDatasets : DataSet XML
419         */
420        @SuppressWarnings("unchecked")
421        private void insertDataSet(Element eAppXml, Element eDatasets) {
422            // Adding DS definition in the coordinator XML
423            Element inputList = eAppXml.getChild("input-events", eAppXml.getNamespace());
424            if (inputList != null) {
425                for (Element dataIn : (List<Element>) inputList.getChildren("data-in", eAppXml.getNamespace())) {
426                    Element eDataset = findDataSet(eDatasets, dataIn.getAttributeValue("dataset"));
427                    dataIn.getContent().add(0, eDataset);
428                }
429            }
430            Element outputList = eAppXml.getChild("output-events", eAppXml.getNamespace());
431            if (outputList != null) {
432                for (Element dataOut : (List<Element>) outputList.getChildren("data-out", eAppXml.getNamespace())) {
433                    Element eDataset = findDataSet(eDatasets, dataOut.getAttributeValue("dataset"));
434                    dataOut.getContent().add(0, eDataset);
435                }
436            }
437        }
438    
439        /**
440         * Find a specific dataset from a list of Datasets.
441         *
442         * @param eDatasets : List of data sets
443         * @param name : queried data set name
444         * @return one Dataset element. otherwise throw Exception
445         */
446        @SuppressWarnings("unchecked")
447        private static Element findDataSet(Element eDatasets, String name) {
448            for (Element eDataset : (List<Element>) eDatasets.getChildren("dataset", eDatasets.getNamespace())) {
449                if (eDataset.getAttributeValue("name").equals(name)) {
450                    eDataset = (Element) eDataset.clone();
451                    eDataset.detach();
452                    return eDataset;
453                }
454            }
455            throw new RuntimeException("undefined dataset: " + name);
456        }
457    
458        /**
459         * Initialize all the required EL Evaluators.
460         */
461        protected void initEvaluators() {
462            evalFreq = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-submit-freq");
463            evalNofuncs = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-submit-nofuncs");
464            evalInst = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-submit-instances");
465            evalSla = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-sla-submit");
466        }
467    
468        /**
469         * Resolve basic entities using job Configuration.
470         *
471         * @param conf :Job configuration
472         * @param appXml : Original job XML
473         * @param coordJob : Coordinator job bean to be populated.
474         * @return Resolved job XML element.
475         * @throws CoordinatorJobException thrown if failed to resolve basic entities
476         * @throws Exception thrown if failed to resolve basic entities
477         */
478        @SuppressWarnings("unchecked")
479        protected Element resolveInitial(Configuration conf, String appXml, CoordinatorJobBean coordJob)
480        throws CoordinatorJobException, Exception {
481            Element eAppXml = XmlUtils.parseXml(appXml);
482            // job's main attributes
483            // frequency
484            String val = resolveAttribute("frequency", eAppXml, evalFreq);
485            int ival = ParamChecker.checkInteger(val, "frequency");
486            ParamChecker.checkGTZero(ival, "frequency");
487            coordJob.setFrequency(ival);
488            TimeUnit tmp = (evalFreq.getVariable("timeunit") == null) ? TimeUnit.MINUTE : ((TimeUnit) evalFreq
489                    .getVariable("timeunit"));
490            addAnAttribute("freq_timeunit", eAppXml, tmp.toString());
491            // TimeUnit
492            coordJob.setTimeUnit(CoordinatorJob.Timeunit.valueOf(tmp.toString()));
493            // End Of Duration
494            tmp = evalFreq.getVariable("endOfDuration") == null ? TimeUnit.NONE : ((TimeUnit) evalFreq
495                    .getVariable("endOfDuration"));
496            addAnAttribute("end_of_duration", eAppXml, tmp.toString());
497            // coordJob.setEndOfDuration(tmp) // TODO: Add new attribute in Job bean
498    
499            // Application name
500            if (this.coordName == null) {
501                val = resolveAttribute("name", eAppXml, evalNofuncs);
502                coordJob.setAppName(val);
503            }
504            else {
505                // this coord job is created from bundle
506                coordJob.setAppName(this.coordName);
507            }
508    
509            // start time
510            val = resolveAttribute("start", eAppXml, evalNofuncs);
511            ParamChecker.checkUTC(val, "start");
512            coordJob.setStartTime(DateUtils.parseDateUTC(val));
513            // end time
514            val = resolveAttribute("end", eAppXml, evalNofuncs);
515            ParamChecker.checkUTC(val, "end");
516            coordJob.setEndTime(DateUtils.parseDateUTC(val));
517            // Time zone
518            val = resolveAttribute("timezone", eAppXml, evalNofuncs);
519            ParamChecker.checkTimeZone(val, "timezone");
520            coordJob.setTimeZone(val);
521    
522            // controls
523            val = resolveTagContents("timeout", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs);
524            if (val == "") {
525                val = Services.get().getConf().get(CONF_DEFAULT_TIMEOUT_NORMAL);
526            }
527    
528            ival = ParamChecker.checkInteger(val, "timeout");
529            if (ival < 0 || ival > Services.get().getConf().getInt(CONF_DEFAULT_MAX_TIMEOUT, 129600)) {
530                ival = Services.get().getConf().getInt(CONF_DEFAULT_MAX_TIMEOUT, 129600);
531            }
532            coordJob.setTimeout(ival);
533    
534            val = resolveTagContents("concurrency", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs);
535            if (val == null || val.isEmpty()) {
536                val = Services.get().getConf().get(CONF_DEFAULT_CONCURRENCY, "-1");
537            }
538            ival = ParamChecker.checkInteger(val, "concurrency");
539            coordJob.setConcurrency(ival);
540    
541            val = resolveTagContents("throttle", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs);
542            if (val == null || val.isEmpty()) {
543                int defaultThrottle = Services.get().getConf().getInt(CONF_DEFAULT_THROTTLE, 12);
544                ival = defaultThrottle;
545            }
546            else {
547                ival = ParamChecker.checkInteger(val, "throttle");
548            }
549            int maxQueue = Services.get().getConf().getInt(CONF_QUEUE_SIZE, 10000);
550            float factor = Services.get().getConf().getFloat(CONF_MAT_THROTTLING_FACTOR, 0.10f);
551            int maxThrottle = (int) (maxQueue * factor);
552            if (ival > maxThrottle || ival < 1) {
553                ival = maxThrottle;
554            }
555            LOG.debug("max throttle " + ival);
556            coordJob.setMatThrottling(ival);
557    
558            val = resolveTagContents("execution", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs);
559            if (val == "") {
560                val = Execution.FIFO.toString();
561            }
562            coordJob.setExecution(Execution.valueOf(val));
563            String[] acceptedVals = { Execution.LIFO.toString(), Execution.FIFO.toString(), Execution.LAST_ONLY.toString() };
564            ParamChecker.isMember(val, acceptedVals, "execution");
565    
566            // datasets
567            resolveTagContents("include", eAppXml.getChild("datasets", eAppXml.getNamespace()), evalNofuncs);
568            // for each data set
569            resolveDataSets(eAppXml);
570            HashMap<String, String> dataNameList = new HashMap<String, String>();
571            resolveIOEvents(eAppXml, dataNameList);
572    
573            resolveTagContents("app-path", eAppXml.getChild("action", eAppXml.getNamespace()).getChild("workflow",
574                    eAppXml.getNamespace()), evalNofuncs);
575            // TODO: If action or workflow tag is missing, NullPointerException will
576            // occur
577            Element configElem = eAppXml.getChild("action", eAppXml.getNamespace()).getChild("workflow",
578                    eAppXml.getNamespace()).getChild("configuration", eAppXml.getNamespace());
579            evalData = CoordELEvaluator.createELEvaluatorForDataEcho(conf, "coord-job-submit-data", dataNameList);
580            if (configElem != null) {
581                for (Element propElem : (List<Element>) configElem.getChildren("property", configElem.getNamespace())) {
582                    resolveTagContents("name", propElem, evalData);
583                    // Want to check the data-integrity but don't want to modify the
584                    // XML
585                    // for properties only
586                    Element tmpProp = (Element) propElem.clone();
587                    resolveTagContents("value", tmpProp, evalData);
588                }
589            }
590            resolveSLA(eAppXml, coordJob);
591            return eAppXml;
592        }
593    
594        /**
595         * Resolve SLA events
596         *
597         * @param eAppXml job XML
598         * @param coordJob coordinator job bean
599         * @throws CommandException thrown if failed to resolve sla events
600         */
601        private void resolveSLA(Element eAppXml, CoordinatorJobBean coordJob) throws CommandException {
602            Element eSla = eAppXml.getChild("action", eAppXml.getNamespace()).getChild("info",
603                    Namespace.getNamespace(SchemaService.SLA_NAME_SPACE_URI));
604    
605            if (eSla != null) {
606                String slaXml = XmlUtils.prettyPrint(eSla).toString();
607                try {
608                    // EL evaluation
609                    slaXml = evalSla.evaluate(slaXml, String.class);
610                    // Validate against semantic SXD
611                    XmlUtils.validateData(slaXml, SchemaName.SLA_ORIGINAL);
612                }
613                catch (Exception e) {
614                    throw new CommandException(ErrorCode.E1004, "Validation ERROR :" + e.getMessage(), e);
615                }
616            }
617        }
618    
619        /**
620         * Resolve input-events/data-in and output-events/data-out tags.
621         *
622         * @param eJob : Job element
623         * @throws CoordinatorJobException thrown if failed to resolve input and output events
624         */
625        @SuppressWarnings("unchecked")
626        private void resolveIOEvents(Element eJobOrg, HashMap<String, String> dataNameList) throws CoordinatorJobException {
627            // Resolving input-events/data-in
628            // Clone the job and don't update anything in the original
629            Element eJob = (Element) eJobOrg.clone();
630            Element inputList = eJob.getChild("input-events", eJob.getNamespace());
631            if (inputList != null) {
632                TreeSet<String> eventNameSet = new TreeSet<String>();
633                for (Element dataIn : (List<Element>) inputList.getChildren("data-in", eJob.getNamespace())) {
634                    String dataInName = dataIn.getAttributeValue("name");
635                    dataNameList.put(dataInName, "data-in");
636                    // check whether there is any duplicate data-in name
637                    if (eventNameSet.contains(dataInName)) {
638                        throw new RuntimeException("Duplicate dataIn name " + dataInName);
639                    }
640                    else {
641                        eventNameSet.add(dataInName);
642                    }
643                    resolveTagContents("instance", dataIn, evalInst);
644                    resolveTagContents("start-instance", dataIn, evalInst);
645                    resolveTagContents("end-instance", dataIn, evalInst);
646                }
647            }
648            // Resolving output-events/data-out
649            Element outputList = eJob.getChild("output-events", eJob.getNamespace());
650            if (outputList != null) {
651                TreeSet<String> eventNameSet = new TreeSet<String>();
652                for (Element dataOut : (List<Element>) outputList.getChildren("data-out", eJob.getNamespace())) {
653                    String dataOutName = dataOut.getAttributeValue("name");
654                    dataNameList.put(dataOutName, "data-out");
655                    // check whether there is any duplicate data-out name
656                    if (eventNameSet.contains(dataOutName)) {
657                        throw new RuntimeException("Duplicate dataIn name " + dataOutName);
658                    }
659                    else {
660                        eventNameSet.add(dataOutName);
661                    }
662                    resolveTagContents("instance", dataOut, evalInst);
663                }
664            }
665    
666        }
667    
668        /**
669         * Add an attribute into XML element.
670         *
671         * @param attrName :attribute name
672         * @param elem : Element to add attribute
673         * @param value :Value of attribute
674         */
675        private void addAnAttribute(String attrName, Element elem, String value) {
676            elem.setAttribute(attrName, value);
677        }
678    
679        /**
680         * Resolve datasets using job configuration.
681         *
682         * @param eAppXml : Job Element XML
683         * @throws Exception thrown if failed to resolve datasets
684         */
685        @SuppressWarnings("unchecked")
686        private void resolveDataSets(Element eAppXml) throws Exception {
687            Element datasetList = eAppXml.getChild("datasets", eAppXml.getNamespace());
688            if (datasetList != null) {
689    
690                List<Element> dsElems = datasetList.getChildren("dataset", eAppXml.getNamespace());
691                resolveDataSets(dsElems);
692                resolveTagContents("app-path", eAppXml.getChild("action", eAppXml.getNamespace()).getChild("workflow",
693                        eAppXml.getNamespace()), evalNofuncs);
694            }
695        }
696    
697        /**
698         * Resolve datasets using job configuration.
699         *
700         * @param dsElems : Data set XML element.
701         * @throws CoordinatorJobException thrown if failed to resolve datasets
702         */
703        private void resolveDataSets(List<Element> dsElems) throws CoordinatorJobException {
704            for (Element dsElem : dsElems) {
705                // Setting up default TimeUnit and EndOFDuraion
706                evalFreq.setVariable("timeunit", TimeUnit.MINUTE);
707                evalFreq.setVariable("endOfDuration", TimeUnit.NONE);
708    
709                String val = resolveAttribute("frequency", dsElem, evalFreq);
710                int ival = ParamChecker.checkInteger(val, "frequency");
711                ParamChecker.checkGTZero(ival, "frequency");
712                addAnAttribute("freq_timeunit", dsElem, evalFreq.getVariable("timeunit") == null ? TimeUnit.MINUTE
713                        .toString() : ((TimeUnit) evalFreq.getVariable("timeunit")).toString());
714                addAnAttribute("end_of_duration", dsElem, evalFreq.getVariable("endOfDuration") == null ? TimeUnit.NONE
715                        .toString() : ((TimeUnit) evalFreq.getVariable("endOfDuration")).toString());
716                val = resolveAttribute("initial-instance", dsElem, evalNofuncs);
717                ParamChecker.checkUTC(val, "initial-instance");
718                val = resolveAttribute("timezone", dsElem, evalNofuncs);
719                ParamChecker.checkTimeZone(val, "timezone");
720                resolveTagContents("uri-template", dsElem, evalNofuncs);
721                resolveTagContents("done-flag", dsElem, evalNofuncs);
722            }
723        }
724    
725        /**
726         * Resolve the content of a tag.
727         *
728         * @param tagName : Tag name of job XML i.e. <timeout> 10 </timeout>
729         * @param elem : Element where the tag exists.
730         * @param eval : EL evealuator
731         * @return Resolved tag content.
732         * @throws CoordinatorJobException thrown if failed to resolve tag content
733         */
734        @SuppressWarnings("unchecked")
735        private String resolveTagContents(String tagName, Element elem, ELEvaluator eval) throws CoordinatorJobException {
736            String ret = "";
737            if (elem != null) {
738                for (Element tagElem : (List<Element>) elem.getChildren(tagName, elem.getNamespace())) {
739                    if (tagElem != null) {
740                        String updated;
741                        try {
742                            updated = CoordELFunctions.evalAndWrap(eval, tagElem.getText().trim());
743    
744                        }
745                        catch (Exception e) {
746                            throw new CoordinatorJobException(ErrorCode.E1004, e.getMessage(), e);
747                        }
748                        tagElem.removeContent();
749                        tagElem.addContent(updated);
750                        ret += updated;
751                    }
752                }
753            }
754            return ret;
755        }
756    
757        /**
758         * Resolve an attribute value.
759         *
760         * @param attrName : Attribute name.
761         * @param elem : XML Element where attribute is defiend
762         * @param eval : ELEvaluator used to resolve
763         * @return Resolved attribute value
764         * @throws CoordinatorJobException thrown if failed to resolve an attribute value
765         */
766        private String resolveAttribute(String attrName, Element elem, ELEvaluator eval) throws CoordinatorJobException {
767            Attribute attr = elem.getAttribute(attrName);
768            String val = null;
769            if (attr != null) {
770                try {
771                    val = CoordELFunctions.evalAndWrap(eval, attr.getValue().trim());
772                }
773                catch (Exception e) {
774                    throw new CoordinatorJobException(ErrorCode.E1004, e.getMessage(), e);
775                }
776                attr.setValue(val);
777            }
778            return val;
779        }
780    
781        /**
782         * Include referred datasets into XML.
783         *
784         * @param resolvedXml : Job XML element.
785         * @param conf : Job configuration
786         * @throws CoordinatorJobException thrown if failed to include referred datasets into XML
787         */
788        @SuppressWarnings("unchecked")
789        protected void includeDataSets(Element resolvedXml, Configuration conf) throws CoordinatorJobException {
790            Element datasets = resolvedXml.getChild("datasets", resolvedXml.getNamespace());
791            Element allDataSets = new Element("all_datasets", resolvedXml.getNamespace());
792            List<String> dsList = new ArrayList<String>();
793            if (datasets != null) {
794                for (Element includeElem : (List<Element>) datasets.getChildren("include", datasets.getNamespace())) {
795                    String incDSFile = includeElem.getTextTrim();
796                    includeOneDSFile(incDSFile, dsList, allDataSets, datasets.getNamespace());
797                }
798                for (Element e : (List<Element>) datasets.getChildren("dataset", datasets.getNamespace())) {
799                    String dsName = e.getAttributeValue("name");
800                    if (dsList.contains(dsName)) {// Override with this DS
801                        // Remove old DS
802                        removeDataSet(allDataSets, dsName);
803                    }
804                    else {
805                        dsList.add(dsName);
806                    }
807                    allDataSets.addContent((Element) e.clone());
808                }
809            }
810            insertDataSet(resolvedXml, allDataSets);
811            resolvedXml.removeChild("datasets", resolvedXml.getNamespace());
812        }
813    
814        /**
815         * Include one dataset file.
816         *
817         * @param incDSFile : Include data set filename.
818         * @param dsList :List of dataset names to verify the duplicate.
819         * @param allDataSets : Element that includes all dataset definitions.
820         * @param dsNameSpace : Data set name space
821         * @throws CoordinatorJobException thrown if failed to include one dataset file
822         */
823        @SuppressWarnings("unchecked")
824        private void includeOneDSFile(String incDSFile, List<String> dsList, Element allDataSets, Namespace dsNameSpace)
825        throws CoordinatorJobException {
826            Element tmpDataSets = null;
827            try {
828                String dsXml = readDefinition(incDSFile);
829                LOG.debug("DSFILE :" + incDSFile + "\n" + dsXml);
830                tmpDataSets = XmlUtils.parseXml(dsXml);
831            }
832            catch (JDOMException e) {
833                LOG.warn("Error parsing included dataset [{0}].  Message [{1}]", incDSFile, e.getMessage());
834                throw new CoordinatorJobException(ErrorCode.E0700, e.getMessage());
835            }
836            resolveDataSets(tmpDataSets.getChildren("dataset"));
837            for (Element e : (List<Element>) tmpDataSets.getChildren("dataset")) {
838                String dsName = e.getAttributeValue("name");
839                if (dsList.contains(dsName)) {
840                    throw new RuntimeException("Duplicate Dataset " + dsName);
841                }
842                dsList.add(dsName);
843                Element tmp = (Element) e.clone();
844                // TODO: Don't like to over-write the external/include DS's namespace
845                tmp.setNamespace(dsNameSpace);
846                tmp.getChild("uri-template").setNamespace(dsNameSpace);
847                if (e.getChild("done-flag") != null) {
848                    tmp.getChild("done-flag").setNamespace(dsNameSpace);
849                }
850                allDataSets.addContent(tmp);
851            }
852            // nested include
853            for (Element includeElem : (List<Element>) tmpDataSets.getChildren("include", tmpDataSets.getNamespace())) {
854                String incFile = includeElem.getTextTrim();
855                includeOneDSFile(incFile, dsList, allDataSets, dsNameSpace);
856            }
857        }
858    
859        /**
860         * Remove a dataset from a list of dataset.
861         *
862         * @param eDatasets : List of dataset
863         * @param name : Dataset name to be removed.
864         */
865        @SuppressWarnings("unchecked")
866        private static void removeDataSet(Element eDatasets, String name) {
867            for (Element eDataset : (List<Element>) eDatasets.getChildren("dataset", eDatasets.getNamespace())) {
868                if (eDataset.getAttributeValue("name").equals(name)) {
869                    eDataset.detach();
870                }
871            }
872            throw new RuntimeException("undefined dataset: " + name);
873        }
874    
875        /**
876         * Read coordinator definition.
877         *
878         * @param appPath application path.
879         * @return coordinator definition.
880         * @throws CoordinatorJobException thrown if the definition could not be read.
881         */
882        protected String readDefinition(String appPath) throws CoordinatorJobException {
883            String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
884            String group = ParamChecker.notEmpty(conf.get(OozieClient.GROUP_NAME), OozieClient.GROUP_NAME);
885            // Configuration confHadoop = CoordUtils.getHadoopConf(conf);
886            try {
887                URI uri = new URI(appPath);
888                LOG.debug("user =" + user + " group =" + group);
889                FileSystem fs = Services.get().get(HadoopAccessorService.class).createFileSystem(user, group, uri,
890                        new Configuration());
891                Path appDefPath = null;
892    
893                // app path could be a directory
894                Path path = new Path(uri.getPath());
895                // check file exists for dataset include file, app xml already checked
896                if (!fs.exists(path)) {
897                    throw new URISyntaxException(path.toString(), "path not existed : " + path.toString());
898                }
899                if (!fs.isFile(path)) {
900                    appDefPath = new Path(path, COORDINATOR_XML_FILE);
901                } else {
902                    appDefPath = path;
903                }
904    
905                Reader reader = new InputStreamReader(fs.open(appDefPath));
906                StringWriter writer = new StringWriter();
907                IOUtils.copyCharStream(reader, writer);
908                return writer.toString();
909            }
910            catch (IOException ex) {
911                LOG.warn("IOException :" + XmlUtils.prettyPrint(conf), ex);
912                throw new CoordinatorJobException(ErrorCode.E1001, ex.getMessage(), ex);
913            }
914            catch (URISyntaxException ex) {
915                LOG.warn("URISyException :" + ex.getMessage());
916                throw new CoordinatorJobException(ErrorCode.E1002, appPath, ex.getMessage(), ex);
917            }
918            catch (HadoopAccessorException ex) {
919                throw new CoordinatorJobException(ex);
920            }
921            catch (Exception ex) {
922                LOG.warn("Exception :", ex);
923                throw new CoordinatorJobException(ErrorCode.E1001, ex.getMessage(), ex);
924            }
925        }
926    
927        /**
928         * Write a coordinator job into database
929         *
930         * @param eJob : XML element of job
931         * @param coordJob : Coordinator job bean
932         * @return Job id
933         * @throws CommandException thrown if unable to save coordinator job to db
934         */
935        private String storeToDB(Element eJob, CoordinatorJobBean coordJob) throws CommandException {
936            String jobId = Services.get().get(UUIDService.class).generateId(ApplicationType.COORDINATOR);
937            coordJob.setId(jobId);
938            coordJob.setAuthToken(this.authToken);
939    
940            coordJob.setAppPath(conf.get(OozieClient.COORDINATOR_APP_PATH));
941            coordJob.setCreatedTime(new Date());
942            coordJob.setUser(conf.get(OozieClient.USER_NAME));
943            coordJob.setGroup(conf.get(OozieClient.GROUP_NAME));
944            coordJob.setConf(XmlUtils.prettyPrint(conf).toString());
945            coordJob.setJobXml(XmlUtils.prettyPrint(eJob).toString());
946            coordJob.setLastActionNumber(0);
947            coordJob.setLastModifiedTime(new Date());
948    
949            if (!dryrun) {
950                coordJob.setLastModifiedTime(new Date());
951                try {
952                    jpaService.execute(new CoordJobInsertJPAExecutor(coordJob));
953                }
954                catch (JPAExecutorException je) {
955                    throw new CommandException(je);
956                }
957            }
958            return jobId;
959        }
960    
961        /* (non-Javadoc)
962         * @see org.apache.oozie.command.XCommand#getEntityKey()
963         */
964        @Override
965        protected String getEntityKey() {
966            return null;
967        }
968    
969        /* (non-Javadoc)
970         * @see org.apache.oozie.command.XCommand#isLockRequired()
971         */
972        @Override
973        protected boolean isLockRequired() {
974            return false;
975        }
976    
977        /* (non-Javadoc)
978         * @see org.apache.oozie.command.XCommand#loadState()
979         */
980        @Override
981        protected void loadState() throws CommandException {
982            jpaService = Services.get().get(JPAService.class);
983            if (jpaService == null) {
984                throw new CommandException(ErrorCode.E0610);
985            }
986            coordJob = new CoordinatorJobBean();
987            if (this.bundleId != null) {
988                // this coord job is created from bundle
989                coordJob.setBundleId(this.bundleId);
990                // first use bundle id if submit thru bundle
991                logInfo.setParameter(DagXLogInfoService.JOB, this.bundleId);
992                LogUtils.setLogInfo(logInfo);
993            }
994            if (this.coordName != null) {
995                // this coord job is created from bundle
996                coordJob.setAppName(this.coordName);
997            }
998            setJob(coordJob);
999    
1000        }
1001    
1002        /* (non-Javadoc)
1003         * @see org.apache.oozie.command.XCommand#verifyPrecondition()
1004         */
1005        @Override
1006        protected void verifyPrecondition() throws CommandException {
1007    
1008        }
1009    
1010        /* (non-Javadoc)
1011         * @see org.apache.oozie.command.TransitionXCommand#notifyParent()
1012         */
1013        @Override
1014        public void notifyParent() throws CommandException {
1015            // update bundle action
1016            if (coordJob.getBundleId() != null) {
1017                LOG.debug("Updating bundle record: " + coordJob.getBundleId() + " for coord id: " + coordJob.getId());
1018                BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus);
1019                bundleStatusUpdate.call();
1020            }
1021        }
1022    
1023        /* (non-Javadoc)
1024         * @see org.apache.oozie.command.TransitionXCommand#updateJob()
1025         */
1026        @Override
1027        public void updateJob() throws CommandException {
1028        }
1029    
1030        /* (non-Javadoc)
1031         * @see org.apache.oozie.command.TransitionXCommand#getJob()
1032         */
1033        @Override
1034        public Job getJob() {
1035            return coordJob;
1036        }
1037    }