001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.coord;
019    
020    import java.io.IOException;
021    import java.io.InputStreamReader;
022    import java.io.Reader;
023    import java.io.StringReader;
024    import java.io.StringWriter;
025    import java.net.URI;
026    import java.net.URISyntaxException;
027    import java.util.ArrayList;
028    import java.util.Date;
029    import java.util.HashMap;
030    import java.util.HashSet;
031    import java.util.List;
032    import java.util.Set;
033    import java.util.TreeSet;
034    
035    import javax.xml.transform.stream.StreamSource;
036    import javax.xml.validation.Validator;
037    
038    import org.apache.hadoop.conf.Configuration;
039    import org.apache.hadoop.fs.FileSystem;
040    import org.apache.hadoop.fs.Path;
041    import org.apache.oozie.CoordinatorJobBean;
042    import org.apache.oozie.ErrorCode;
043    import org.apache.oozie.client.CoordinatorJob;
044    import org.apache.oozie.client.OozieClient;
045    import org.apache.oozie.client.CoordinatorJob.Execution;
046    import org.apache.oozie.command.CommandException;
047    import org.apache.oozie.coord.CoordELEvaluator;
048    import org.apache.oozie.coord.CoordELFunctions;
049    import org.apache.oozie.coord.CoordUtils;
050    import org.apache.oozie.coord.CoordinatorJobException;
051    import org.apache.oozie.coord.TimeUnit;
052    import org.apache.oozie.service.DagXLogInfoService;
053    import org.apache.oozie.service.HadoopAccessorException;
054    import org.apache.oozie.service.SchemaService;
055    import org.apache.oozie.service.Service;
056    import org.apache.oozie.service.Services;
057    import org.apache.oozie.service.UUIDService;
058    import org.apache.oozie.service.HadoopAccessorService;
059    import org.apache.oozie.service.WorkflowAppService;
060    import org.apache.oozie.service.SchemaService.SchemaName;
061    import org.apache.oozie.service.UUIDService.ApplicationType;
062    import org.apache.oozie.store.CoordinatorStore;
063    import org.apache.oozie.store.StoreException;
064    import org.apache.oozie.util.DateUtils;
065    import org.apache.oozie.util.ELEvaluator;
066    import org.apache.oozie.util.IOUtils;
067    import org.apache.oozie.util.ParamChecker;
068    import org.apache.oozie.util.PropertiesUtils;
069    import org.apache.oozie.util.XConfiguration;
070    import org.apache.oozie.util.XLog;
071    import org.apache.oozie.util.XmlUtils;
072    import org.apache.oozie.workflow.WorkflowException;
073    import org.jdom.Attribute;
074    import org.jdom.Element;
075    import org.jdom.JDOMException;
076    import org.jdom.Namespace;
077    import org.xml.sax.SAXException;
078    
079    /**
080     * This class provides the functionalities to resolve a coordinator job XML and write the job information into a DB
081     * table. <p/> Specifically it performs the following functions: 1. Resolve all the variables or properties using job
082     * configurations. 2. Insert all datasets definition as part of the <data-in> and <data-out> tags. 3. Validate the XML
083     * at runtime.
084     */
085    public class CoordSubmitCommand extends CoordinatorCommand<String> {
086    
087        private Configuration conf;
088        private String authToken;
089        private boolean dryrun;
090    
091        public static final String CONFIG_DEFAULT = "coord-config-default.xml";
092        public static final String COORDINATOR_XML_FILE = "coordinator.xml";
093    
094        private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>();
095        private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>();
096        /**
097         * Default timeout for normal jobs, in minutes, after which coordinator input check will timeout
098         */
099        public static final String CONF_DEFAULT_TIMEOUT_NORMAL = Service.CONF_PREFIX + "coord.normal.default.timeout";
100    
101        private XLog log = XLog.getLog(getClass());
102        private ELEvaluator evalFreq = null;
103        private ELEvaluator evalNofuncs = null;
104        private ELEvaluator evalData = null;
105        private ELEvaluator evalInst = null;
106        private ELEvaluator evalSla = null;
107    
108        static {
109            String[] badUserProps = {PropertiesUtils.YEAR, PropertiesUtils.MONTH, PropertiesUtils.DAY,
110                    PropertiesUtils.HOUR, PropertiesUtils.MINUTE, PropertiesUtils.DAYS, PropertiesUtils.HOURS,
111                    PropertiesUtils.MINUTES, PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB,
112                    PropertiesUtils.TB, PropertiesUtils.PB, PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN,
113                    PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN, PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS};
114            PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES);
115    
116            String[] badDefaultProps = {PropertiesUtils.HADOOP_USER, PropertiesUtils.HADOOP_UGI,
117                    WorkflowAppService.HADOOP_JT_KERBEROS_NAME, WorkflowAppService.HADOOP_NN_KERBEROS_NAME};
118            PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES);
119            PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES);
120        }
121    
122        /**
123         * Constructor to create the Coordinator Submit Command.
124         *
125         * @param conf : Configuration for Coordinator job
126         * @param authToken : To be used for authentication
127         */
128        public CoordSubmitCommand(Configuration conf, String authToken) {
129            super("coord_submit", "coord_submit", 1, XLog.STD);
130            this.conf = ParamChecker.notNull(conf, "conf");
131            this.authToken = ParamChecker.notEmpty(authToken, "authToken");
132        }
133    
134        public CoordSubmitCommand(boolean dryrun, Configuration conf, String authToken) {
135            super("coord_submit", "coord_submit", 1, XLog.STD, dryrun);
136            this.conf = ParamChecker.notNull(conf, "conf");
137            this.authToken = ParamChecker.notEmpty(authToken, "authToken");
138            this.dryrun = dryrun;
139            // TODO Auto-generated constructor stub
140        }
141    
142        /*
143         * (non-Javadoc)
144         * 
145         * @see org.apache.oozie.command.Command#call(org.apache.oozie.store.Store)
146         */
147        @Override
148        protected String call(CoordinatorStore store) throws StoreException, CommandException {
149            String jobId = null;
150            log.info("STARTED Coordinator Submit");
151            incrJobCounter(1);
152            CoordinatorJobBean coordJob = new CoordinatorJobBean();
153            try {
154                XLog.Info.get().setParameter(DagXLogInfoService.TOKEN, conf.get(OozieClient.LOG_TOKEN));
155                mergeDefaultConfig();
156    
157                String appXml = readAndValidateXml();
158                coordJob.setOrigJobXml(appXml);
159                log.debug("jobXml after initial validation " + XmlUtils.prettyPrint(appXml).toString());
160                appXml = XmlUtils.removeComments(appXml);
161                initEvaluators();
162                Element eJob = basicResolveAndIncludeDS(appXml, conf, coordJob);
163                log.debug("jobXml after all validation " + XmlUtils.prettyPrint(eJob).toString());
164    
165                jobId = storeToDB(eJob, store, coordJob);
166                // log JOB info for coordinator jobs
167                setLogInfo(coordJob);
168                log = XLog.getLog(getClass());
169    
170                if (!dryrun) {
171                    // submit a command to materialize jobs for the next 1 hour (3600 secs)
172                    // so we don't wait 10 mins for the Service to run.
173                    queueCallable(new CoordJobMatLookupCommand(jobId, 3600), 100);
174                }
175                else {
176                    Date startTime = coordJob.getStartTime();
177                    long startTimeMilli = startTime.getTime();
178                    long endTimeMilli = startTimeMilli + (3600 * 1000);
179                    Date jobEndTime = coordJob.getEndTime();
180                    Date endTime = new Date(endTimeMilli);
181                    if (endTime.compareTo(jobEndTime) > 0) {
182                        endTime = jobEndTime;
183                    }
184                    jobId = coordJob.getId();
185                    log.info("[" + jobId + "]: Update status to PREMATER");
186                    coordJob.setStatus(CoordinatorJob.Status.PREMATER);
187                    CoordActionMaterializeCommand coordActionMatCom = new CoordActionMaterializeCommand(jobId, startTime,
188                                                                                                        endTime);
189                    Configuration jobConf = null;
190                    try {
191                        jobConf = new XConfiguration(new StringReader(coordJob.getConf()));
192                    }
193                    catch (IOException e1) {
194                        log.warn("Configuration parse error. read from DB :" + coordJob.getConf(), e1);
195                    }
196                    String action = coordActionMatCom.materializeJobs(true, coordJob, jobConf, null);
197                    String output = coordJob.getJobXml() + System.getProperty("line.separator")
198                            + "***actions for instance***" + action;
199                    return output;
200                }
201            }
202            catch (CoordinatorJobException ex) {
203                log.warn("ERROR:  ", ex);
204                throw new CommandException(ex);
205            }
206            catch (IllegalArgumentException iex) {
207                log.warn("ERROR:  ", iex);
208                throw new CommandException(ErrorCode.E1003, iex);
209            }
210            catch (Exception ex) {// TODO
211                log.warn("ERROR:  ", ex);
212                throw new CommandException(ErrorCode.E0803, ex);
213            }
214            log.info("ENDED Coordinator Submit jobId=" + jobId);
215            return jobId;
216        }
217    
218        /**
219         * Read the application XML and validate against coordinator Schema
220         *
221         * @return validated coordinator XML
222         * @throws CoordinatorJobException
223         */
224        private String readAndValidateXml() throws CoordinatorJobException {
225            String appPath = ParamChecker.notEmpty(conf.get(OozieClient.COORDINATOR_APP_PATH),
226                                                   OozieClient.COORDINATOR_APP_PATH);// TODO: COORDINATOR_APP_PATH
227            String coordXml = readDefinition(appPath);
228            validateXml(coordXml);
229            return coordXml;
230        }
231    
232        /**
233         * Validate against Coordinator XSD file
234         *
235         * @param xmlContent : Input coordinator xml
236         * @throws CoordinatorJobException
237         */
238        private void validateXml(String xmlContent) throws CoordinatorJobException {
239            javax.xml.validation.Schema schema = Services.get().get(SchemaService.class).getSchema(SchemaName.COORDINATOR);
240            Validator validator = schema.newValidator();
241            // log.warn("XML " + xmlContent);
242            try {
243                validator.validate(new StreamSource(new StringReader(xmlContent)));
244            }
245            catch (SAXException ex) {
246                log.warn("SAXException :", ex);
247                throw new CoordinatorJobException(ErrorCode.E0701, ex.getMessage(), ex);
248            }
249            catch (IOException ex) {
250                // ex.printStackTrace();
251                log.warn("IOException :", ex);
252                throw new CoordinatorJobException(ErrorCode.E0702, ex.getMessage(), ex);
253            }
254        }
255    
256        /**
257         * Merge default configuration with user-defined configuration.
258         *
259         * @throws CommandException
260         */
261        protected void mergeDefaultConfig() throws CommandException {
262            Path coordAppDir = new Path(conf.get(OozieClient.COORDINATOR_APP_PATH)).getParent();
263            Path configDefault = new Path(coordAppDir, CONFIG_DEFAULT);
264            // Configuration fsConfig = new Configuration();
265            // log.warn("CONFIG :" + configDefault.toUri());
266            Configuration fsConfig = CoordUtils.getHadoopConf(conf);
267            FileSystem fs;
268            // TODO: which conf?
269            try {
270                String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
271                String group = ParamChecker.notEmpty(conf.get(OozieClient.GROUP_NAME), OozieClient.GROUP_NAME);
272                fs = Services.get().get(HadoopAccessorService.class).createFileSystem(user, group, configDefault.toUri(),
273                                                                                      conf);
274                if (fs.exists(configDefault)) {
275                    Configuration defaultConf = new XConfiguration(fs.open(configDefault));
276                    PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES);
277                    XConfiguration.injectDefaults(defaultConf, conf);
278                }
279                else {
280                    log.info("configDefault Doesn't exist " + configDefault);
281                }
282                PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES);
283            }
284            catch (IOException e) {
285                throw new CommandException(ErrorCode.E0702, e.getMessage() + " : Problem reading default config "
286                        + configDefault, e);
287            }
288            catch (HadoopAccessorException e) {
289                throw new CommandException(e);
290            }
291            log.debug("Merged CONF :" + XmlUtils.prettyPrint(conf).toString());
292        }
293    
294        /**
295         * The method resolve all the variables that are defined in configuration. It also include the data set definition
296         * from dataset file into XML.
297         *
298         * @param appXml : Original job XML
299         * @param conf : Configuration of the job
300         * @param coordJob : Coordinator job bean to be populated.
301         * @return : Resolved and modified job XML element.
302         * @throws Exception
303         */
304        public Element basicResolveAndIncludeDS(String appXml, Configuration conf, CoordinatorJobBean coordJob)
305                throws CoordinatorJobException, Exception {
306            Element basicResolvedApp = resolveInitial(conf, appXml, coordJob);
307            includeDataSets(basicResolvedApp, conf);
308            return basicResolvedApp;
309        }
310    
311        /**
312         * Insert data set into data-in and data-out tags.
313         *
314         * @param eAppXml : coordinator application XML
315         * @param eDatasets : DataSet XML
316         * @return updated application
317         */
318        private void insertDataSet(Element eAppXml, Element eDatasets) {
319            // Adding DS definition in the coordinator XML
320            Element inputList = eAppXml.getChild("input-events", eAppXml.getNamespace());
321            if (inputList != null) {
322                for (Element dataIn : (List<Element>) inputList.getChildren("data-in", eAppXml.getNamespace())) {
323                    Element eDataset = findDataSet(eDatasets, dataIn.getAttributeValue("dataset"));
324                    dataIn.getContent().add(0, eDataset);
325                }
326            }
327            Element outputList = eAppXml.getChild("output-events", eAppXml.getNamespace());
328            if (outputList != null) {
329                for (Element dataOut : (List<Element>) outputList.getChildren("data-out", eAppXml.getNamespace())) {
330                    Element eDataset = findDataSet(eDatasets, dataOut.getAttributeValue("dataset"));
331                    dataOut.getContent().add(0, eDataset);
332                }
333            }
334        }
335    
336        /**
337         * Find a specific dataset from a list of Datasets.
338         *
339         * @param eDatasets : List of data sets
340         * @param name : queried data set name
341         * @return one Dataset element. otherwise throw Exception
342         */
343        private static Element findDataSet(Element eDatasets, String name) {
344            for (Element eDataset : (List<Element>) eDatasets.getChildren("dataset", eDatasets.getNamespace())) {
345                if (eDataset.getAttributeValue("name").equals(name)) {
346                    eDataset = (Element) eDataset.clone();
347                    eDataset.detach();
348                    return eDataset;
349                }
350            }
351            throw new RuntimeException("undefined dataset: " + name);
352        }
353    
354        /**
355         * Initialize all the required EL Evaluators.
356         */
357        protected void initEvaluators() {
358            evalFreq = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-submit-freq");
359            evalNofuncs = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-submit-nofuncs");
360            evalInst = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-submit-instances");
361            evalSla = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-sla-submit");
362        }
363    
364        /**
365         * Resolve basic entities using job Configuration.
366         *
367         * @param conf :Job configuration
368         * @param appXml : Original job XML
369         * @param coordJob : Coordinator job bean to be populated.
370         * @return Resolved job XML element.
371         * @throws Exception
372         */
373        protected Element resolveInitial(Configuration conf, String appXml, CoordinatorJobBean coordJob)
374                throws CoordinatorJobException, Exception {
375            Element eAppXml = XmlUtils.parseXml(appXml);
376            // job's main attributes
377            // frequency
378            String val = resolveAttribute("frequency", eAppXml, evalFreq);
379            int ival = ParamChecker.checkInteger(val, "frequency");
380            ParamChecker.checkGTZero(ival, "frequency");
381            coordJob.setFrequency(ival);
382            TimeUnit tmp = (evalFreq.getVariable("timeunit") == null) ? TimeUnit.MINUTE : ((TimeUnit) evalFreq
383                    .getVariable("timeunit"));
384            addAnAttribute("freq_timeunit", eAppXml, tmp.toString()); // TODO: Store
385            // TimeUnit
386            coordJob.setTimeUnit(CoordinatorJob.Timeunit.valueOf(tmp.toString()));
387            // End Of Duration
388            tmp = evalFreq.getVariable("endOfDuration") == null ? TimeUnit.NONE : ((TimeUnit) evalFreq
389                    .getVariable("endOfDuration"));
390            addAnAttribute("end_of_duration", eAppXml, tmp.toString());
391            // coordJob.setEndOfDuration(tmp) // TODO: Add new attribute in Job bean
392    
393            // start time
394            val = resolveAttribute("start", eAppXml, evalNofuncs);
395            ParamChecker.checkUTC(val, "start");
396            coordJob.setStartTime(DateUtils.parseDateUTC(val));
397            // end time
398            val = resolveAttribute("end", eAppXml, evalNofuncs);
399            ParamChecker.checkUTC(val, "end");
400            coordJob.setEndTime(DateUtils.parseDateUTC(val));
401            // Time zone
402            val = resolveAttribute("timezone", eAppXml, evalNofuncs);
403            ParamChecker.checkTimeZone(val, "timezone");
404            coordJob.setTimeZone(val);
405    
406            // controls
407            val = resolveTagContents("timeout", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs);
408            if (val == "") {
409                val = Services.get().getConf().get(CONF_DEFAULT_TIMEOUT_NORMAL);
410            }
411    
412            ival = ParamChecker.checkInteger(val, "timeout");
413            // ParamChecker.checkGEZero(ival, "timeout");
414            coordJob.setTimeout(ival);
415            val = resolveTagContents("concurrency", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs);
416            if (val == "") {
417                val = "-1";
418            }
419            ival = ParamChecker.checkInteger(val, "concurrency");
420            // ParamChecker.checkGEZero(ival, "concurrency");
421            coordJob.setConcurrency(ival);
422            val = resolveTagContents("execution", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs);
423            if (val == "") {
424                val = Execution.FIFO.toString();
425            }
426            coordJob.setExecution(Execution.valueOf(val));
427            String[] acceptedVals = {Execution.LIFO.toString(), Execution.FIFO.toString(), Execution.LAST_ONLY.toString()};
428            ParamChecker.isMember(val, acceptedVals, "execution");
429    
430            // datasets
431            resolveTagContents("include", eAppXml.getChild("datasets", eAppXml.getNamespace()), evalNofuncs);
432            // for each data set
433            resolveDataSets(eAppXml);
434            HashMap<String, String> dataNameList = new HashMap<String, String>();
435            resolveIOEvents(eAppXml, dataNameList);
436    
437            resolveTagContents("app-path", eAppXml.getChild("action", eAppXml.getNamespace()).getChild("workflow",
438                                                                                                       eAppXml.getNamespace()), evalNofuncs);
439            // TODO: If action or workflow tag is missing, NullPointerException will
440            // occur
441            Element configElem = eAppXml.getChild("action", eAppXml.getNamespace()).getChild("workflow",
442                                                                                             eAppXml.getNamespace()).getChild("configuration", eAppXml.getNamespace());
443            evalData = CoordELEvaluator.createELEvaluatorForDataEcho(conf, "coord-job-submit-data", dataNameList);
444            if (configElem != null) {
445                for (Element propElem : (List<Element>) configElem.getChildren("property", configElem.getNamespace())) {
446                    resolveTagContents("name", propElem, evalData);
447                    // log.warn("Value :");
448                    // Want to check the data-integrity but don't want to modify the
449                    // XML
450                    // for properties only
451                    Element tmpProp = (Element) propElem.clone();
452                    resolveTagContents("value", tmpProp, evalData);
453                    // val = resolveTagContents("value", propElem, evalData);
454                    // log.warn("Value OK :" + val);
455                }
456            }
457            resolveSLA(eAppXml, coordJob);
458            return eAppXml;
459        }
460    
461        private void resolveSLA(Element eAppXml, CoordinatorJobBean coordJob) throws CommandException {
462            // String prefix = XmlUtils.getNamespacePrefix(eAppXml,
463            // SchemaService.SLA_NAME_SPACE_URI);
464            Element eSla = eAppXml.getChild("action", eAppXml.getNamespace()).getChild("info",
465                                                                                       Namespace.getNamespace(SchemaService.SLA_NAME_SPACE_URI));
466    
467            if (eSla != null) {
468                String slaXml = XmlUtils.prettyPrint(eSla).toString();
469                try {
470                    // EL evaluation
471                    slaXml = evalSla.evaluate(slaXml, String.class);
472                    // Validate against semantic SXD
473                    XmlUtils.validateData(slaXml, SchemaName.SLA_ORIGINAL);
474                }
475                catch (Exception e) {
476                    throw new CommandException(ErrorCode.E1004, "Validation ERROR :" + e.getMessage(), e);
477                }
478            }
479        }
480    
481        /**
482         * Resolve input-events/data-in and output-events/data-out tags.
483         *
484         * @param eJob : Job element
485         * @throws CoordinatorJobException
486         */
487        private void resolveIOEvents(Element eJobOrg, HashMap<String, String> dataNameList) throws CoordinatorJobException {
488            // Resolving input-events/data-in
489            // Clone the job and don't update anything in the original
490            Element eJob = (Element) eJobOrg.clone();
491            Element inputList = eJob.getChild("input-events", eJob.getNamespace());
492            if (inputList != null) {
493                TreeSet<String> eventNameSet = new TreeSet<String>();
494                for (Element dataIn : (List<Element>) inputList.getChildren("data-in", eJob.getNamespace())) {
495                    String dataInName = dataIn.getAttributeValue("name");
496                    dataNameList.put(dataInName, "data-in");
497                    // check whether there is any duplicate data-in name
498                    if (eventNameSet.contains(dataInName)) {
499                        throw new RuntimeException("Duplicate dataIn name " + dataInName);
500                    }
501                    else {
502                        eventNameSet.add(dataInName);
503                    }
504                    resolveTagContents("instance", dataIn, evalInst);
505                    resolveTagContents("start-instance", dataIn, evalInst);
506                    resolveTagContents("end-instance", dataIn, evalInst);
507                }
508            }
509            // Resolving output-events/data-out
510            Element outputList = eJob.getChild("output-events", eJob.getNamespace());
511            if (outputList != null) {
512                TreeSet<String> eventNameSet = new TreeSet<String>();
513                for (Element dataOut : (List<Element>) outputList.getChildren("data-out", eJob.getNamespace())) {
514                    String dataOutName = dataOut.getAttributeValue("name");
515                    dataNameList.put(dataOutName, "data-out");
516                    // check whether there is any duplicate data-out name
517                    if (eventNameSet.contains(dataOutName)) {
518                        throw new RuntimeException("Duplicate dataIn name " + dataOutName);
519                    }
520                    else {
521                        eventNameSet.add(dataOutName);
522                    }
523                    resolveTagContents("instance", dataOut, evalInst);
524                }
525            }
526    
527        }
528    
529        /**
530         * Add an attribute into XML element.
531         *
532         * @param attrName :attribute name
533         * @param elem : Element to add attribute
534         * @param value :Value of attribute
535         */
536        private void addAnAttribute(String attrName, Element elem, String value) {
537            elem.setAttribute(attrName, value);
538        }
539    
540        /**
541         * Resolve Data set using job configuration.
542         *
543         * @param eAppXml : Job Element XML
544         * @throws Exception
545         */
546        private void resolveDataSets(Element eAppXml) throws Exception {
547            Element datasetList = eAppXml.getChild("datasets", eAppXml.getNamespace());
548            if (datasetList != null) {
549    
550                List<Element> dsElems = datasetList.getChildren("dataset", eAppXml.getNamespace());
551                resolveDataSets(dsElems);
552                resolveTagContents("app-path", eAppXml.getChild("action", eAppXml.getNamespace()).getChild("workflow",
553                                                                                                           eAppXml.getNamespace()), evalNofuncs);
554            }
555        }
556    
557        /**
558         * Resolve Data set using job configuration.
559         *
560         * @param dsElems : Data set XML element.
561         * @throws CoordinatorJobException
562         * @throws Exception
563         */
564        private void resolveDataSets(List<Element> dsElems) throws CoordinatorJobException /*
565                                                                                            * throws
566                                                                                            * Exception
567                                                                                            */ {
568            for (Element dsElem : dsElems) {
569                // Setting up default TimeUnit and EndOFDuraion
570                evalFreq.setVariable("timeunit", TimeUnit.MINUTE);
571                evalFreq.setVariable("endOfDuration", TimeUnit.NONE);
572    
573                String val = resolveAttribute("frequency", dsElem, evalFreq);
574                int ival = ParamChecker.checkInteger(val, "frequency");
575                ParamChecker.checkGTZero(ival, "frequency");
576                addAnAttribute("freq_timeunit", dsElem, evalFreq.getVariable("timeunit") == null ? TimeUnit.MINUTE
577                        .toString() : ((TimeUnit) evalFreq.getVariable("timeunit")).toString());
578                addAnAttribute("end_of_duration", dsElem, evalFreq.getVariable("endOfDuration") == null ? TimeUnit.NONE
579                        .toString() : ((TimeUnit) evalFreq.getVariable("endOfDuration")).toString());
580                val = resolveAttribute("initial-instance", dsElem, evalNofuncs);
581                ParamChecker.checkUTC(val, "initial-instance");
582                val = resolveAttribute("timezone", dsElem, evalNofuncs);
583                ParamChecker.checkTimeZone(val, "timezone");
584                resolveTagContents("uri-template", dsElem, evalNofuncs);
585                resolveTagContents("done-flag", dsElem, evalNofuncs);
586            }
587        }
588    
589        /**
590         * Resolve the content of a tag.
591         *
592         * @param tagName : Tag name of job XML i.e. <timeout> 10 </timeout>
593         * @param elem : Element where the tag exists.
594         * @param eval :
595         * @return Resolved tag content.
596         * @throws CoordinatorJobException
597         */
598        private String resolveTagContents(String tagName, Element elem, ELEvaluator eval) throws CoordinatorJobException {
599            String ret = "";
600            if (elem != null) {
601                for (Element tagElem : (List<Element>) elem.getChildren(tagName, elem.getNamespace())) {
602                    if (tagElem != null) {
603                        String updated;
604                        try {
605                            updated = CoordELFunctions.evalAndWrap(eval, tagElem.getText().trim());
606    
607                        }
608                        catch (Exception e) {
609                            // e.printStackTrace();
610                            throw new CoordinatorJobException(ErrorCode.E1004, e.getMessage(), e);
611                        }
612                        tagElem.removeContent();
613                        tagElem.addContent(updated);
614                        ret += updated;
615                    }
616                    /*
617                     * else { //TODO: unlike event }
618                     */
619                }
620            }
621            return ret;
622        }
623    
624        /**
625         * Resolve an attribute value.
626         *
627         * @param attrName : Attribute name.
628         * @param elem : XML Element where attribute is defiend
629         * @param eval : ELEvaluator used to resolve
630         * @return Resolved attribute value
631         * @throws CoordinatorJobException
632         */
633        private String resolveAttribute(String attrName, Element elem, ELEvaluator eval) throws CoordinatorJobException {
634            Attribute attr = elem.getAttribute(attrName);
635            String val = null;
636            if (attr != null) {
637                try {
638                    val = CoordELFunctions.evalAndWrap(eval, attr.getValue().trim());
639    
640                }
641                catch (Exception e) {
642                    // e.printStackTrace();
643                    throw new CoordinatorJobException(ErrorCode.E1004, e.getMessage(), e);
644                }
645                attr.setValue(val);
646            }
647            return val;
648        }
649    
650        /**
651         * Include referred Datasets into XML.
652         *
653         * @param resolvedXml : Job XML element.
654         * @param conf : Job configuration
655         * @throws CoordinatorJobException
656         */
657        protected void includeDataSets(Element resolvedXml, Configuration conf) throws CoordinatorJobException
658            /* throws Exception */ {
659            Element datasets = resolvedXml.getChild("datasets", resolvedXml.getNamespace());
660            Element allDataSets = new Element("all_datasets", resolvedXml.getNamespace());
661            List<String> dsList = new ArrayList<String>();
662            if (datasets != null) {
663                for (Element includeElem : (List<Element>) datasets.getChildren("include", datasets.getNamespace())) {
664                    String incDSFile = includeElem.getTextTrim();
665                    // log.warn(" incDSFile " + incDSFile);
666                    includeOneDSFile(incDSFile, dsList, allDataSets, datasets.getNamespace());
667                }
668                for (Element e : (List<Element>) datasets.getChildren("dataset", datasets.getNamespace())) {
669                    String dsName = (String) e.getAttributeValue("name");
670                    if (dsList.contains(dsName)) {// Override with this DS
671                        // Remove old DS
672                        removeDataSet(allDataSets, dsName);
673                        // throw new RuntimeException("Duplicate Dataset " +
674                        // dsName);
675                    }
676                    else {
677                        dsList.add(dsName);
678                    }
679                    allDataSets.addContent((Element) e.clone());
680                }
681            }
682            insertDataSet(resolvedXml, allDataSets);
683            resolvedXml.removeChild("datasets", resolvedXml.getNamespace());
684        }
685    
686        /**
687         * Include One Dataset file.
688         *
689         * @param incDSFile : Include data set filename.
690         * @param dsList :List of dataset names to verify the duplicate.
691         * @param allDataSets : Element that includes all dataset definitions.
692         * @param dsNameSpace : Data set name space
693         * @throws CoordinatorJobException
694         * @throws Exception
695         */
696        private void includeOneDSFile(String incDSFile, List<String> dsList, Element allDataSets, Namespace dsNameSpace)
697                throws CoordinatorJobException {
698            Element tmpDataSets = null;
699            try {
700                String dsXml = readDefinition(incDSFile);
701                log.debug("DSFILE :" + incDSFile + "\n" + dsXml);
702                tmpDataSets = XmlUtils.parseXml(dsXml);
703            }
704            /*
705             * catch (IOException iex) {XLog.getLog(getClass()).warn(
706             * "Error reading included dataset file [{0}].  Message [{1}]",
707             * incDSFile, iex.getMessage()); throw new
708             * CommandException(ErrorCode.E0803, iex.getMessage()); }
709             */
710            catch (JDOMException e) {
711                log.warn("Error parsing included dataset [{0}].  Message [{1}]", incDSFile, e.getMessage());
712                throw new CoordinatorJobException(ErrorCode.E0700, e.getMessage());
713            }
714            resolveDataSets((List<Element>) tmpDataSets.getChildren("dataset"));
715            for (Element e : (List<Element>) tmpDataSets.getChildren("dataset")) {
716                String dsName = (String) e.getAttributeValue("name");
717                if (dsList.contains(dsName)) {
718                    throw new RuntimeException("Duplicate Dataset " + dsName);
719                }
720                dsList.add(dsName);
721                Element tmp = (Element) e.clone();
722                // TODO: Don't like to over-write the external/include DS's
723                // namespace
724                tmp.setNamespace(dsNameSpace);// TODO:
725                tmp.getChild("uri-template").setNamespace(dsNameSpace);
726                if (e.getChild("done-flag") != null) {
727                    tmp.getChild("done-flag").setNamespace(dsNameSpace);
728                }
729                allDataSets.addContent(tmp);
730            }
731            // nested include
732            for (Element includeElem : (List<Element>) tmpDataSets.getChildren("include", tmpDataSets.getNamespace())) {
733                String incFile = includeElem.getTextTrim();
734                // log.warn("incDSFile "+ incDSFile);
735                includeOneDSFile(incFile, dsList, allDataSets, dsNameSpace);
736            }
737        }
738    
739        /**
740         * Remove a dataset from a list of dataset.
741         *
742         * @param eDatasets : List of dataset
743         * @param name : Dataset name to be removed.
744         */
745        private static void removeDataSet(Element eDatasets, String name) {
746            for (Element eDataset : (List<Element>) eDatasets.getChildren("dataset", eDatasets.getNamespace())) {
747                if (eDataset.getAttributeValue("name").equals(name)) {
748                    eDataset.detach();
749                }
750            }
751            throw new RuntimeException("undefined dataset: " + name);
752        }
753    
754        /**
755         * Read workflow definition.
756         *
757         * @param appPath application path.
758         * @param user user name.
759         * @param group group name.
760         * @param autToken authentication token.
761         * @return workflow definition.
762         * @throws WorkflowException thrown if the definition could not be read.
763         */
764        protected String readDefinition(String appPath) throws CoordinatorJobException {
765            String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
766            String group = ParamChecker.notEmpty(conf.get(OozieClient.GROUP_NAME), OozieClient.GROUP_NAME);
767            Configuration confHadoop = CoordUtils.getHadoopConf(conf);
768            try {
769                URI uri = new URI(appPath);
770                log.debug("user =" + user + " group =" + group);
771                FileSystem fs = Services.get().get(HadoopAccessorService.class).createFileSystem(user, group, uri, conf);
772                Path p = new Path(uri.getPath());
773    
774                // Reader reader = new InputStreamReader(fs.open(new Path(uri
775                // .getPath(), fileName)));
776                Reader reader = new InputStreamReader(fs.open(p));// TODO
777                StringWriter writer = new StringWriter();
778                IOUtils.copyCharStream(reader, writer);
779                return writer.toString();
780            }
781            catch (IOException ex) {
782                log.warn("IOException :" + XmlUtils.prettyPrint(confHadoop), ex);
783                throw new CoordinatorJobException(ErrorCode.E1001, ex.getMessage(), ex); // TODO:
784            }
785            catch (URISyntaxException ex) {
786                log.warn("URISyException :" + ex.getMessage());
787                throw new CoordinatorJobException(ErrorCode.E1002, appPath, ex.getMessage(), ex);// TODO:
788            }
789            catch (HadoopAccessorException ex) {
790                throw new CoordinatorJobException(ex);
791            }
792            catch (Exception ex) {
793                log.warn("Exception :", ex);
794                throw new CoordinatorJobException(ErrorCode.E1001, ex.getMessage(), ex);// TODO:
795            }
796        }
797    
798        /**
799         * Write a Coordinator Job into database
800         *
801         * @param eJob : XML element of job
802         * @param store : Coordinator Store to write.
803         * @param coordJob : Coordinator job bean
804         * @return Job if.
805         * @throws StoreException
806         */
807        private String storeToDB(Element eJob, CoordinatorStore store, CoordinatorJobBean coordJob) throws StoreException {
808            String jobId = Services.get().get(UUIDService.class).generateId(ApplicationType.COORDINATOR);
809            coordJob.setId(jobId);
810            coordJob.setAuthToken(this.authToken);
811            coordJob.setAppName(eJob.getAttributeValue("name"));
812            coordJob.setAppPath(conf.get(OozieClient.COORDINATOR_APP_PATH));
813            coordJob.setStatus(CoordinatorJob.Status.PREP);
814            coordJob.setCreatedTime(new Date()); // TODO: Do we need that?
815            coordJob.setUser(conf.get(OozieClient.USER_NAME));
816            coordJob.setGroup(conf.get(OozieClient.GROUP_NAME));
817            coordJob.setConf(XmlUtils.prettyPrint(conf).toString());
818            coordJob.setJobXml(XmlUtils.prettyPrint(eJob).toString());
819            coordJob.setLastActionNumber(0);
820            coordJob.setLastModifiedTime(new Date());
821    
822            if (!dryrun) {
823                store.insertCoordinatorJob(coordJob);
824            }
825            return jobId;
826        }
827    
828        /**
829         * For unit-testing only. Will ultimately go away
830         *
831         * @param args
832         * @throws Exception
833         * @throws JDOMException
834         */
835        public static void main(String[] args) throws Exception {
836            // TODO Auto-generated method stub
837            // Configuration conf = new XConfiguration(IOUtils.getResourceAsReader(
838            // "org/apache/oozie/coord/conf.xml", -1));
839    
840            Configuration conf = new XConfiguration();
841    
842            // base case
843            // conf.set(OozieClient.COORDINATOR_APP_PATH,
844            // "file:///Users/danielwo/oozie/workflows/coord/test1/");
845    
846            // no input datasets
847            // conf.set(OozieClient.COORDINATOR_APP_PATH,
848            // "file:///Users/danielwo/oozie/workflows/coord/coord_noinput/");
849            // conf.set(OozieClient.COORDINATOR_APP_PATH,
850            // "file:///Users/danielwo/oozie/workflows/coord/coord_use_apppath/");
851    
852            // only 1 instance
853            // conf.set(OozieClient.COORDINATOR_APP_PATH,
854            // "file:///Users/danielwo/oozie/workflows/coord/coord_oneinstance/");
855    
856            // no local props in xml
857            // conf.set(OozieClient.COORDINATOR_APP_PATH,
858            // "file:///Users/danielwo/oozie/workflows/coord/coord_noprops/");
859    
860            conf.set(OozieClient.COORDINATOR_APP_PATH,
861                     "file:///homes/test/workspace/sandbox_krishna/oozie-main/core/src/main/java/org/apache/oozie/coord/");
862            conf.set(OozieClient.USER_NAME, "test");
863            // conf.set(OozieClient.USER_NAME, "danielwo");
864            conf.set(OozieClient.GROUP_NAME, "other");
865            // System.out.println("appXml :"+ appXml + "\n conf :"+ conf);
866            new Services().init();
867            try {
868                CoordSubmitCommand sc = new CoordSubmitCommand(conf, "TESTING");
869                String jobId = sc.call();
870                System.out.println("Job Id " + jobId);
871                Thread.sleep(80000);
872            }
873            finally {
874                Services.get().destroy();
875            }
876        }
877    }