This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.coord;
019
020 import java.io.IOException;
021 import java.io.InputStreamReader;
022 import java.io.Reader;
023 import java.io.StringReader;
024 import java.io.StringWriter;
025 import java.net.URI;
026 import java.net.URISyntaxException;
027 import java.util.ArrayList;
028 import java.util.Date;
029 import java.util.HashMap;
030 import java.util.HashSet;
031 import java.util.List;
032 import java.util.Set;
033 import java.util.TreeSet;
034
035 import javax.xml.transform.stream.StreamSource;
036 import javax.xml.validation.Validator;
037
038 import org.apache.hadoop.conf.Configuration;
039 import org.apache.hadoop.fs.FileSystem;
040 import org.apache.hadoop.fs.Path;
041 import org.apache.oozie.CoordinatorJobBean;
042 import org.apache.oozie.ErrorCode;
043 import org.apache.oozie.client.CoordinatorJob;
044 import org.apache.oozie.client.OozieClient;
045 import org.apache.oozie.client.CoordinatorJob.Execution;
046 import org.apache.oozie.command.CommandException;
047 import org.apache.oozie.coord.CoordELEvaluator;
048 import org.apache.oozie.coord.CoordELFunctions;
049 import org.apache.oozie.coord.CoordUtils;
050 import org.apache.oozie.coord.CoordinatorJobException;
051 import org.apache.oozie.coord.TimeUnit;
052 import org.apache.oozie.service.DagXLogInfoService;
053 import org.apache.oozie.service.HadoopAccessorException;
054 import org.apache.oozie.service.SchemaService;
055 import org.apache.oozie.service.Service;
056 import org.apache.oozie.service.Services;
057 import org.apache.oozie.service.UUIDService;
058 import org.apache.oozie.service.HadoopAccessorService;
059 import org.apache.oozie.service.WorkflowAppService;
060 import org.apache.oozie.service.SchemaService.SchemaName;
061 import org.apache.oozie.service.UUIDService.ApplicationType;
062 import org.apache.oozie.store.CoordinatorStore;
063 import org.apache.oozie.store.StoreException;
064 import org.apache.oozie.util.DateUtils;
065 import org.apache.oozie.util.ELEvaluator;
066 import org.apache.oozie.util.IOUtils;
067 import org.apache.oozie.util.ParamChecker;
068 import org.apache.oozie.util.PropertiesUtils;
069 import org.apache.oozie.util.XConfiguration;
070 import org.apache.oozie.util.XLog;
071 import org.apache.oozie.util.XmlUtils;
072 import org.apache.oozie.workflow.WorkflowException;
073 import org.jdom.Attribute;
074 import org.jdom.Element;
075 import org.jdom.JDOMException;
076 import org.jdom.Namespace;
077 import org.xml.sax.SAXException;
078
079 /**
080 * This class provides the functionalities to resolve a coordinator job XML and write the job information into a DB
081 * table. <p/> Specifically it performs the following functions: 1. Resolve all the variables or properties using job
082 * configurations. 2. Insert all datasets definition as part of the <data-in> and <data-out> tags. 3. Validate the XML
083 * at runtime.
084 */
085 public class CoordSubmitCommand extends CoordinatorCommand<String> {
086
087 private Configuration conf;
088 private String authToken;
089 private boolean dryrun;
090
091 public static final String CONFIG_DEFAULT = "coord-config-default.xml";
092 public static final String COORDINATOR_XML_FILE = "coordinator.xml";
093
094 private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>();
095 private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>();
096 /**
097 * Default timeout for normal jobs, in minutes, after which coordinator input check will timeout
098 */
099 public static final String CONF_DEFAULT_TIMEOUT_NORMAL = Service.CONF_PREFIX + "coord.normal.default.timeout";
100
101 private XLog log = XLog.getLog(getClass());
102 private ELEvaluator evalFreq = null;
103 private ELEvaluator evalNofuncs = null;
104 private ELEvaluator evalData = null;
105 private ELEvaluator evalInst = null;
106 private ELEvaluator evalSla = null;
107
108 static {
109 String[] badUserProps = {PropertiesUtils.YEAR, PropertiesUtils.MONTH, PropertiesUtils.DAY,
110 PropertiesUtils.HOUR, PropertiesUtils.MINUTE, PropertiesUtils.DAYS, PropertiesUtils.HOURS,
111 PropertiesUtils.MINUTES, PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB,
112 PropertiesUtils.TB, PropertiesUtils.PB, PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN,
113 PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN, PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS};
114 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES);
115
116 String[] badDefaultProps = {PropertiesUtils.HADOOP_USER, PropertiesUtils.HADOOP_UGI,
117 WorkflowAppService.HADOOP_JT_KERBEROS_NAME, WorkflowAppService.HADOOP_NN_KERBEROS_NAME};
118 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES);
119 PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES);
120 }
121
122 /**
123 * Constructor to create the Coordinator Submit Command.
124 *
125 * @param conf : Configuration for Coordinator job
126 * @param authToken : To be used for authentication
127 */
128 public CoordSubmitCommand(Configuration conf, String authToken) {
129 super("coord_submit", "coord_submit", 1, XLog.STD);
130 this.conf = ParamChecker.notNull(conf, "conf");
131 this.authToken = ParamChecker.notEmpty(authToken, "authToken");
132 }
133
134 public CoordSubmitCommand(boolean dryrun, Configuration conf, String authToken) {
135 super("coord_submit", "coord_submit", 1, XLog.STD, dryrun);
136 this.conf = ParamChecker.notNull(conf, "conf");
137 this.authToken = ParamChecker.notEmpty(authToken, "authToken");
138 this.dryrun = dryrun;
139 // TODO Auto-generated constructor stub
140 }
141
142 /*
143 * (non-Javadoc)
144 *
145 * @see org.apache.oozie.command.Command#call(org.apache.oozie.store.Store)
146 */
147 @Override
148 protected String call(CoordinatorStore store) throws StoreException, CommandException {
149 String jobId = null;
150 log.info("STARTED Coordinator Submit");
151 incrJobCounter(1);
152 CoordinatorJobBean coordJob = new CoordinatorJobBean();
153 try {
154 XLog.Info.get().setParameter(DagXLogInfoService.TOKEN, conf.get(OozieClient.LOG_TOKEN));
155 mergeDefaultConfig();
156
157 String appXml = readAndValidateXml();
158 coordJob.setOrigJobXml(appXml);
159 log.debug("jobXml after initial validation " + XmlUtils.prettyPrint(appXml).toString());
160 appXml = XmlUtils.removeComments(appXml);
161 initEvaluators();
162 Element eJob = basicResolveAndIncludeDS(appXml, conf, coordJob);
163 log.debug("jobXml after all validation " + XmlUtils.prettyPrint(eJob).toString());
164
165 jobId = storeToDB(eJob, store, coordJob);
166 // log JOB info for coordinator jobs
167 setLogInfo(coordJob);
168 log = XLog.getLog(getClass());
169
170 if (!dryrun) {
171 // submit a command to materialize jobs for the next 1 hour (3600 secs)
172 // so we don't wait 10 mins for the Service to run.
173 queueCallable(new CoordJobMatLookupCommand(jobId, 3600), 100);
174 }
175 else {
176 Date startTime = coordJob.getStartTime();
177 long startTimeMilli = startTime.getTime();
178 long endTimeMilli = startTimeMilli + (3600 * 1000);
179 Date jobEndTime = coordJob.getEndTime();
180 Date endTime = new Date(endTimeMilli);
181 if (endTime.compareTo(jobEndTime) > 0) {
182 endTime = jobEndTime;
183 }
184 jobId = coordJob.getId();
185 log.info("[" + jobId + "]: Update status to PREMATER");
186 coordJob.setStatus(CoordinatorJob.Status.PREMATER);
187 CoordActionMaterializeCommand coordActionMatCom = new CoordActionMaterializeCommand(jobId, startTime,
188 endTime);
189 Configuration jobConf = null;
190 try {
191 jobConf = new XConfiguration(new StringReader(coordJob.getConf()));
192 }
193 catch (IOException e1) {
194 log.warn("Configuration parse error. read from DB :" + coordJob.getConf(), e1);
195 }
196 String action = coordActionMatCom.materializeJobs(true, coordJob, jobConf, null);
197 String output = coordJob.getJobXml() + System.getProperty("line.separator")
198 + "***actions for instance***" + action;
199 return output;
200 }
201 }
202 catch (CoordinatorJobException ex) {
203 log.warn("ERROR: ", ex);
204 throw new CommandException(ex);
205 }
206 catch (IllegalArgumentException iex) {
207 log.warn("ERROR: ", iex);
208 throw new CommandException(ErrorCode.E1003, iex);
209 }
210 catch (Exception ex) {// TODO
211 log.warn("ERROR: ", ex);
212 throw new CommandException(ErrorCode.E0803, ex);
213 }
214 log.info("ENDED Coordinator Submit jobId=" + jobId);
215 return jobId;
216 }
217
218 /**
219 * Read the application XML and validate against coordinator Schema
220 *
221 * @return validated coordinator XML
222 * @throws CoordinatorJobException
223 */
224 private String readAndValidateXml() throws CoordinatorJobException {
225 String appPath = ParamChecker.notEmpty(conf.get(OozieClient.COORDINATOR_APP_PATH),
226 OozieClient.COORDINATOR_APP_PATH);// TODO: COORDINATOR_APP_PATH
227 String coordXml = readDefinition(appPath);
228 validateXml(coordXml);
229 return coordXml;
230 }
231
232 /**
233 * Validate against Coordinator XSD file
234 *
235 * @param xmlContent : Input coordinator xml
236 * @throws CoordinatorJobException
237 */
238 private void validateXml(String xmlContent) throws CoordinatorJobException {
239 javax.xml.validation.Schema schema = Services.get().get(SchemaService.class).getSchema(SchemaName.COORDINATOR);
240 Validator validator = schema.newValidator();
241 // log.warn("XML " + xmlContent);
242 try {
243 validator.validate(new StreamSource(new StringReader(xmlContent)));
244 }
245 catch (SAXException ex) {
246 log.warn("SAXException :", ex);
247 throw new CoordinatorJobException(ErrorCode.E0701, ex.getMessage(), ex);
248 }
249 catch (IOException ex) {
250 // ex.printStackTrace();
251 log.warn("IOException :", ex);
252 throw new CoordinatorJobException(ErrorCode.E0702, ex.getMessage(), ex);
253 }
254 }
255
256 /**
257 * Merge default configuration with user-defined configuration.
258 *
259 * @throws CommandException
260 */
261 protected void mergeDefaultConfig() throws CommandException {
262 Path coordAppDir = new Path(conf.get(OozieClient.COORDINATOR_APP_PATH)).getParent();
263 Path configDefault = new Path(coordAppDir, CONFIG_DEFAULT);
264 // Configuration fsConfig = new Configuration();
265 // log.warn("CONFIG :" + configDefault.toUri());
266 Configuration fsConfig = CoordUtils.getHadoopConf(conf);
267 FileSystem fs;
268 // TODO: which conf?
269 try {
270 String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
271 String group = ParamChecker.notEmpty(conf.get(OozieClient.GROUP_NAME), OozieClient.GROUP_NAME);
272 fs = Services.get().get(HadoopAccessorService.class).createFileSystem(user, group, configDefault.toUri(),
273 conf);
274 if (fs.exists(configDefault)) {
275 Configuration defaultConf = new XConfiguration(fs.open(configDefault));
276 PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES);
277 XConfiguration.injectDefaults(defaultConf, conf);
278 }
279 else {
280 log.info("configDefault Doesn't exist " + configDefault);
281 }
282 PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES);
283 }
284 catch (IOException e) {
285 throw new CommandException(ErrorCode.E0702, e.getMessage() + " : Problem reading default config "
286 + configDefault, e);
287 }
288 catch (HadoopAccessorException e) {
289 throw new CommandException(e);
290 }
291 log.debug("Merged CONF :" + XmlUtils.prettyPrint(conf).toString());
292 }
293
294 /**
295 * The method resolve all the variables that are defined in configuration. It also include the data set definition
296 * from dataset file into XML.
297 *
298 * @param appXml : Original job XML
299 * @param conf : Configuration of the job
300 * @param coordJob : Coordinator job bean to be populated.
301 * @return : Resolved and modified job XML element.
302 * @throws Exception
303 */
304 public Element basicResolveAndIncludeDS(String appXml, Configuration conf, CoordinatorJobBean coordJob)
305 throws CoordinatorJobException, Exception {
306 Element basicResolvedApp = resolveInitial(conf, appXml, coordJob);
307 includeDataSets(basicResolvedApp, conf);
308 return basicResolvedApp;
309 }
310
311 /**
312 * Insert data set into data-in and data-out tags.
313 *
314 * @param eAppXml : coordinator application XML
315 * @param eDatasets : DataSet XML
316 * @return updated application
317 */
318 private void insertDataSet(Element eAppXml, Element eDatasets) {
319 // Adding DS definition in the coordinator XML
320 Element inputList = eAppXml.getChild("input-events", eAppXml.getNamespace());
321 if (inputList != null) {
322 for (Element dataIn : (List<Element>) inputList.getChildren("data-in", eAppXml.getNamespace())) {
323 Element eDataset = findDataSet(eDatasets, dataIn.getAttributeValue("dataset"));
324 dataIn.getContent().add(0, eDataset);
325 }
326 }
327 Element outputList = eAppXml.getChild("output-events", eAppXml.getNamespace());
328 if (outputList != null) {
329 for (Element dataOut : (List<Element>) outputList.getChildren("data-out", eAppXml.getNamespace())) {
330 Element eDataset = findDataSet(eDatasets, dataOut.getAttributeValue("dataset"));
331 dataOut.getContent().add(0, eDataset);
332 }
333 }
334 }
335
336 /**
337 * Find a specific dataset from a list of Datasets.
338 *
339 * @param eDatasets : List of data sets
340 * @param name : queried data set name
341 * @return one Dataset element. otherwise throw Exception
342 */
343 private static Element findDataSet(Element eDatasets, String name) {
344 for (Element eDataset : (List<Element>) eDatasets.getChildren("dataset", eDatasets.getNamespace())) {
345 if (eDataset.getAttributeValue("name").equals(name)) {
346 eDataset = (Element) eDataset.clone();
347 eDataset.detach();
348 return eDataset;
349 }
350 }
351 throw new RuntimeException("undefined dataset: " + name);
352 }
353
354 /**
355 * Initialize all the required EL Evaluators.
356 */
357 protected void initEvaluators() {
358 evalFreq = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-submit-freq");
359 evalNofuncs = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-submit-nofuncs");
360 evalInst = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-submit-instances");
361 evalSla = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-sla-submit");
362 }
363
364 /**
365 * Resolve basic entities using job Configuration.
366 *
367 * @param conf :Job configuration
368 * @param appXml : Original job XML
369 * @param coordJob : Coordinator job bean to be populated.
370 * @return Resolved job XML element.
371 * @throws Exception
372 */
373 protected Element resolveInitial(Configuration conf, String appXml, CoordinatorJobBean coordJob)
374 throws CoordinatorJobException, Exception {
375 Element eAppXml = XmlUtils.parseXml(appXml);
376 // job's main attributes
377 // frequency
378 String val = resolveAttribute("frequency", eAppXml, evalFreq);
379 int ival = ParamChecker.checkInteger(val, "frequency");
380 ParamChecker.checkGTZero(ival, "frequency");
381 coordJob.setFrequency(ival);
382 TimeUnit tmp = (evalFreq.getVariable("timeunit") == null) ? TimeUnit.MINUTE : ((TimeUnit) evalFreq
383 .getVariable("timeunit"));
384 addAnAttribute("freq_timeunit", eAppXml, tmp.toString()); // TODO: Store
385 // TimeUnit
386 coordJob.setTimeUnit(CoordinatorJob.Timeunit.valueOf(tmp.toString()));
387 // End Of Duration
388 tmp = evalFreq.getVariable("endOfDuration") == null ? TimeUnit.NONE : ((TimeUnit) evalFreq
389 .getVariable("endOfDuration"));
390 addAnAttribute("end_of_duration", eAppXml, tmp.toString());
391 // coordJob.setEndOfDuration(tmp) // TODO: Add new attribute in Job bean
392
393 // start time
394 val = resolveAttribute("start", eAppXml, evalNofuncs);
395 ParamChecker.checkUTC(val, "start");
396 coordJob.setStartTime(DateUtils.parseDateUTC(val));
397 // end time
398 val = resolveAttribute("end", eAppXml, evalNofuncs);
399 ParamChecker.checkUTC(val, "end");
400 coordJob.setEndTime(DateUtils.parseDateUTC(val));
401 // Time zone
402 val = resolveAttribute("timezone", eAppXml, evalNofuncs);
403 ParamChecker.checkTimeZone(val, "timezone");
404 coordJob.setTimeZone(val);
405
406 // controls
407 val = resolveTagContents("timeout", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs);
408 if (val == "") {
409 val = Services.get().getConf().get(CONF_DEFAULT_TIMEOUT_NORMAL);
410 }
411
412 ival = ParamChecker.checkInteger(val, "timeout");
413 // ParamChecker.checkGEZero(ival, "timeout");
414 coordJob.setTimeout(ival);
415 val = resolveTagContents("concurrency", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs);
416 if (val == "") {
417 val = "-1";
418 }
419 ival = ParamChecker.checkInteger(val, "concurrency");
420 // ParamChecker.checkGEZero(ival, "concurrency");
421 coordJob.setConcurrency(ival);
422 val = resolveTagContents("execution", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs);
423 if (val == "") {
424 val = Execution.FIFO.toString();
425 }
426 coordJob.setExecution(Execution.valueOf(val));
427 String[] acceptedVals = {Execution.LIFO.toString(), Execution.FIFO.toString(), Execution.LAST_ONLY.toString()};
428 ParamChecker.isMember(val, acceptedVals, "execution");
429
430 // datasets
431 resolveTagContents("include", eAppXml.getChild("datasets", eAppXml.getNamespace()), evalNofuncs);
432 // for each data set
433 resolveDataSets(eAppXml);
434 HashMap<String, String> dataNameList = new HashMap<String, String>();
435 resolveIOEvents(eAppXml, dataNameList);
436
437 resolveTagContents("app-path", eAppXml.getChild("action", eAppXml.getNamespace()).getChild("workflow",
438 eAppXml.getNamespace()), evalNofuncs);
439 // TODO: If action or workflow tag is missing, NullPointerException will
440 // occur
441 Element configElem = eAppXml.getChild("action", eAppXml.getNamespace()).getChild("workflow",
442 eAppXml.getNamespace()).getChild("configuration", eAppXml.getNamespace());
443 evalData = CoordELEvaluator.createELEvaluatorForDataEcho(conf, "coord-job-submit-data", dataNameList);
444 if (configElem != null) {
445 for (Element propElem : (List<Element>) configElem.getChildren("property", configElem.getNamespace())) {
446 resolveTagContents("name", propElem, evalData);
447 // log.warn("Value :");
448 // Want to check the data-integrity but don't want to modify the
449 // XML
450 // for properties only
451 Element tmpProp = (Element) propElem.clone();
452 resolveTagContents("value", tmpProp, evalData);
453 // val = resolveTagContents("value", propElem, evalData);
454 // log.warn("Value OK :" + val);
455 }
456 }
457 resolveSLA(eAppXml, coordJob);
458 return eAppXml;
459 }
460
461 private void resolveSLA(Element eAppXml, CoordinatorJobBean coordJob) throws CommandException {
462 // String prefix = XmlUtils.getNamespacePrefix(eAppXml,
463 // SchemaService.SLA_NAME_SPACE_URI);
464 Element eSla = eAppXml.getChild("action", eAppXml.getNamespace()).getChild("info",
465 Namespace.getNamespace(SchemaService.SLA_NAME_SPACE_URI));
466
467 if (eSla != null) {
468 String slaXml = XmlUtils.prettyPrint(eSla).toString();
469 try {
470 // EL evaluation
471 slaXml = evalSla.evaluate(slaXml, String.class);
472 // Validate against semantic SXD
473 XmlUtils.validateData(slaXml, SchemaName.SLA_ORIGINAL);
474 }
475 catch (Exception e) {
476 throw new CommandException(ErrorCode.E1004, "Validation ERROR :" + e.getMessage(), e);
477 }
478 }
479 }
480
481 /**
482 * Resolve input-events/data-in and output-events/data-out tags.
483 *
484 * @param eJob : Job element
485 * @throws CoordinatorJobException
486 */
487 private void resolveIOEvents(Element eJobOrg, HashMap<String, String> dataNameList) throws CoordinatorJobException {
488 // Resolving input-events/data-in
489 // Clone the job and don't update anything in the original
490 Element eJob = (Element) eJobOrg.clone();
491 Element inputList = eJob.getChild("input-events", eJob.getNamespace());
492 if (inputList != null) {
493 TreeSet<String> eventNameSet = new TreeSet<String>();
494 for (Element dataIn : (List<Element>) inputList.getChildren("data-in", eJob.getNamespace())) {
495 String dataInName = dataIn.getAttributeValue("name");
496 dataNameList.put(dataInName, "data-in");
497 // check whether there is any duplicate data-in name
498 if (eventNameSet.contains(dataInName)) {
499 throw new RuntimeException("Duplicate dataIn name " + dataInName);
500 }
501 else {
502 eventNameSet.add(dataInName);
503 }
504 resolveTagContents("instance", dataIn, evalInst);
505 resolveTagContents("start-instance", dataIn, evalInst);
506 resolveTagContents("end-instance", dataIn, evalInst);
507 }
508 }
509 // Resolving output-events/data-out
510 Element outputList = eJob.getChild("output-events", eJob.getNamespace());
511 if (outputList != null) {
512 TreeSet<String> eventNameSet = new TreeSet<String>();
513 for (Element dataOut : (List<Element>) outputList.getChildren("data-out", eJob.getNamespace())) {
514 String dataOutName = dataOut.getAttributeValue("name");
515 dataNameList.put(dataOutName, "data-out");
516 // check whether there is any duplicate data-out name
517 if (eventNameSet.contains(dataOutName)) {
518 throw new RuntimeException("Duplicate dataIn name " + dataOutName);
519 }
520 else {
521 eventNameSet.add(dataOutName);
522 }
523 resolveTagContents("instance", dataOut, evalInst);
524 }
525 }
526
527 }
528
529 /**
530 * Add an attribute into XML element.
531 *
532 * @param attrName :attribute name
533 * @param elem : Element to add attribute
534 * @param value :Value of attribute
535 */
536 private void addAnAttribute(String attrName, Element elem, String value) {
537 elem.setAttribute(attrName, value);
538 }
539
540 /**
541 * Resolve Data set using job configuration.
542 *
543 * @param eAppXml : Job Element XML
544 * @throws Exception
545 */
546 private void resolveDataSets(Element eAppXml) throws Exception {
547 Element datasetList = eAppXml.getChild("datasets", eAppXml.getNamespace());
548 if (datasetList != null) {
549
550 List<Element> dsElems = datasetList.getChildren("dataset", eAppXml.getNamespace());
551 resolveDataSets(dsElems);
552 resolveTagContents("app-path", eAppXml.getChild("action", eAppXml.getNamespace()).getChild("workflow",
553 eAppXml.getNamespace()), evalNofuncs);
554 }
555 }
556
557 /**
558 * Resolve Data set using job configuration.
559 *
560 * @param dsElems : Data set XML element.
561 * @throws CoordinatorJobException
562 * @throws Exception
563 */
564 private void resolveDataSets(List<Element> dsElems) throws CoordinatorJobException /*
565 * throws
566 * Exception
567 */ {
568 for (Element dsElem : dsElems) {
569 // Setting up default TimeUnit and EndOFDuraion
570 evalFreq.setVariable("timeunit", TimeUnit.MINUTE);
571 evalFreq.setVariable("endOfDuration", TimeUnit.NONE);
572
573 String val = resolveAttribute("frequency", dsElem, evalFreq);
574 int ival = ParamChecker.checkInteger(val, "frequency");
575 ParamChecker.checkGTZero(ival, "frequency");
576 addAnAttribute("freq_timeunit", dsElem, evalFreq.getVariable("timeunit") == null ? TimeUnit.MINUTE
577 .toString() : ((TimeUnit) evalFreq.getVariable("timeunit")).toString());
578 addAnAttribute("end_of_duration", dsElem, evalFreq.getVariable("endOfDuration") == null ? TimeUnit.NONE
579 .toString() : ((TimeUnit) evalFreq.getVariable("endOfDuration")).toString());
580 val = resolveAttribute("initial-instance", dsElem, evalNofuncs);
581 ParamChecker.checkUTC(val, "initial-instance");
582 val = resolveAttribute("timezone", dsElem, evalNofuncs);
583 ParamChecker.checkTimeZone(val, "timezone");
584 resolveTagContents("uri-template", dsElem, evalNofuncs);
585 resolveTagContents("done-flag", dsElem, evalNofuncs);
586 }
587 }
588
589 /**
590 * Resolve the content of a tag.
591 *
592 * @param tagName : Tag name of job XML i.e. <timeout> 10 </timeout>
593 * @param elem : Element where the tag exists.
594 * @param eval :
595 * @return Resolved tag content.
596 * @throws CoordinatorJobException
597 */
598 private String resolveTagContents(String tagName, Element elem, ELEvaluator eval) throws CoordinatorJobException {
599 String ret = "";
600 if (elem != null) {
601 for (Element tagElem : (List<Element>) elem.getChildren(tagName, elem.getNamespace())) {
602 if (tagElem != null) {
603 String updated;
604 try {
605 updated = CoordELFunctions.evalAndWrap(eval, tagElem.getText().trim());
606
607 }
608 catch (Exception e) {
609 // e.printStackTrace();
610 throw new CoordinatorJobException(ErrorCode.E1004, e.getMessage(), e);
611 }
612 tagElem.removeContent();
613 tagElem.addContent(updated);
614 ret += updated;
615 }
616 /*
617 * else { //TODO: unlike event }
618 */
619 }
620 }
621 return ret;
622 }
623
624 /**
625 * Resolve an attribute value.
626 *
627 * @param attrName : Attribute name.
628 * @param elem : XML Element where attribute is defiend
629 * @param eval : ELEvaluator used to resolve
630 * @return Resolved attribute value
631 * @throws CoordinatorJobException
632 */
633 private String resolveAttribute(String attrName, Element elem, ELEvaluator eval) throws CoordinatorJobException {
634 Attribute attr = elem.getAttribute(attrName);
635 String val = null;
636 if (attr != null) {
637 try {
638 val = CoordELFunctions.evalAndWrap(eval, attr.getValue().trim());
639
640 }
641 catch (Exception e) {
642 // e.printStackTrace();
643 throw new CoordinatorJobException(ErrorCode.E1004, e.getMessage(), e);
644 }
645 attr.setValue(val);
646 }
647 return val;
648 }
649
650 /**
651 * Include referred Datasets into XML.
652 *
653 * @param resolvedXml : Job XML element.
654 * @param conf : Job configuration
655 * @throws CoordinatorJobException
656 */
657 protected void includeDataSets(Element resolvedXml, Configuration conf) throws CoordinatorJobException
658 /* throws Exception */ {
659 Element datasets = resolvedXml.getChild("datasets", resolvedXml.getNamespace());
660 Element allDataSets = new Element("all_datasets", resolvedXml.getNamespace());
661 List<String> dsList = new ArrayList<String>();
662 if (datasets != null) {
663 for (Element includeElem : (List<Element>) datasets.getChildren("include", datasets.getNamespace())) {
664 String incDSFile = includeElem.getTextTrim();
665 // log.warn(" incDSFile " + incDSFile);
666 includeOneDSFile(incDSFile, dsList, allDataSets, datasets.getNamespace());
667 }
668 for (Element e : (List<Element>) datasets.getChildren("dataset", datasets.getNamespace())) {
669 String dsName = (String) e.getAttributeValue("name");
670 if (dsList.contains(dsName)) {// Override with this DS
671 // Remove old DS
672 removeDataSet(allDataSets, dsName);
673 // throw new RuntimeException("Duplicate Dataset " +
674 // dsName);
675 }
676 else {
677 dsList.add(dsName);
678 }
679 allDataSets.addContent((Element) e.clone());
680 }
681 }
682 insertDataSet(resolvedXml, allDataSets);
683 resolvedXml.removeChild("datasets", resolvedXml.getNamespace());
684 }
685
686 /**
687 * Include One Dataset file.
688 *
689 * @param incDSFile : Include data set filename.
690 * @param dsList :List of dataset names to verify the duplicate.
691 * @param allDataSets : Element that includes all dataset definitions.
692 * @param dsNameSpace : Data set name space
693 * @throws CoordinatorJobException
694 * @throws Exception
695 */
696 private void includeOneDSFile(String incDSFile, List<String> dsList, Element allDataSets, Namespace dsNameSpace)
697 throws CoordinatorJobException {
698 Element tmpDataSets = null;
699 try {
700 String dsXml = readDefinition(incDSFile);
701 log.debug("DSFILE :" + incDSFile + "\n" + dsXml);
702 tmpDataSets = XmlUtils.parseXml(dsXml);
703 }
704 /*
705 * catch (IOException iex) {XLog.getLog(getClass()).warn(
706 * "Error reading included dataset file [{0}]. Message [{1}]",
707 * incDSFile, iex.getMessage()); throw new
708 * CommandException(ErrorCode.E0803, iex.getMessage()); }
709 */
710 catch (JDOMException e) {
711 log.warn("Error parsing included dataset [{0}]. Message [{1}]", incDSFile, e.getMessage());
712 throw new CoordinatorJobException(ErrorCode.E0700, e.getMessage());
713 }
714 resolveDataSets((List<Element>) tmpDataSets.getChildren("dataset"));
715 for (Element e : (List<Element>) tmpDataSets.getChildren("dataset")) {
716 String dsName = (String) e.getAttributeValue("name");
717 if (dsList.contains(dsName)) {
718 throw new RuntimeException("Duplicate Dataset " + dsName);
719 }
720 dsList.add(dsName);
721 Element tmp = (Element) e.clone();
722 // TODO: Don't like to over-write the external/include DS's
723 // namespace
724 tmp.setNamespace(dsNameSpace);// TODO:
725 tmp.getChild("uri-template").setNamespace(dsNameSpace);
726 if (e.getChild("done-flag") != null) {
727 tmp.getChild("done-flag").setNamespace(dsNameSpace);
728 }
729 allDataSets.addContent(tmp);
730 }
731 // nested include
732 for (Element includeElem : (List<Element>) tmpDataSets.getChildren("include", tmpDataSets.getNamespace())) {
733 String incFile = includeElem.getTextTrim();
734 // log.warn("incDSFile "+ incDSFile);
735 includeOneDSFile(incFile, dsList, allDataSets, dsNameSpace);
736 }
737 }
738
739 /**
740 * Remove a dataset from a list of dataset.
741 *
742 * @param eDatasets : List of dataset
743 * @param name : Dataset name to be removed.
744 */
745 private static void removeDataSet(Element eDatasets, String name) {
746 for (Element eDataset : (List<Element>) eDatasets.getChildren("dataset", eDatasets.getNamespace())) {
747 if (eDataset.getAttributeValue("name").equals(name)) {
748 eDataset.detach();
749 }
750 }
751 throw new RuntimeException("undefined dataset: " + name);
752 }
753
754 /**
755 * Read workflow definition.
756 *
757 * @param appPath application path.
758 * @param user user name.
759 * @param group group name.
760 * @param autToken authentication token.
761 * @return workflow definition.
762 * @throws WorkflowException thrown if the definition could not be read.
763 */
764 protected String readDefinition(String appPath) throws CoordinatorJobException {
765 String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
766 String group = ParamChecker.notEmpty(conf.get(OozieClient.GROUP_NAME), OozieClient.GROUP_NAME);
767 Configuration confHadoop = CoordUtils.getHadoopConf(conf);
768 try {
769 URI uri = new URI(appPath);
770 log.debug("user =" + user + " group =" + group);
771 FileSystem fs = Services.get().get(HadoopAccessorService.class).createFileSystem(user, group, uri, conf);
772 Path p = new Path(uri.getPath());
773
774 // Reader reader = new InputStreamReader(fs.open(new Path(uri
775 // .getPath(), fileName)));
776 Reader reader = new InputStreamReader(fs.open(p));// TODO
777 StringWriter writer = new StringWriter();
778 IOUtils.copyCharStream(reader, writer);
779 return writer.toString();
780 }
781 catch (IOException ex) {
782 log.warn("IOException :" + XmlUtils.prettyPrint(confHadoop), ex);
783 throw new CoordinatorJobException(ErrorCode.E1001, ex.getMessage(), ex); // TODO:
784 }
785 catch (URISyntaxException ex) {
786 log.warn("URISyException :" + ex.getMessage());
787 throw new CoordinatorJobException(ErrorCode.E1002, appPath, ex.getMessage(), ex);// TODO:
788 }
789 catch (HadoopAccessorException ex) {
790 throw new CoordinatorJobException(ex);
791 }
792 catch (Exception ex) {
793 log.warn("Exception :", ex);
794 throw new CoordinatorJobException(ErrorCode.E1001, ex.getMessage(), ex);// TODO:
795 }
796 }
797
798 /**
799 * Write a Coordinator Job into database
800 *
801 * @param eJob : XML element of job
802 * @param store : Coordinator Store to write.
803 * @param coordJob : Coordinator job bean
804 * @return Job if.
805 * @throws StoreException
806 */
807 private String storeToDB(Element eJob, CoordinatorStore store, CoordinatorJobBean coordJob) throws StoreException {
808 String jobId = Services.get().get(UUIDService.class).generateId(ApplicationType.COORDINATOR);
809 coordJob.setId(jobId);
810 coordJob.setAuthToken(this.authToken);
811 coordJob.setAppName(eJob.getAttributeValue("name"));
812 coordJob.setAppPath(conf.get(OozieClient.COORDINATOR_APP_PATH));
813 coordJob.setStatus(CoordinatorJob.Status.PREP);
814 coordJob.setCreatedTime(new Date()); // TODO: Do we need that?
815 coordJob.setUser(conf.get(OozieClient.USER_NAME));
816 coordJob.setGroup(conf.get(OozieClient.GROUP_NAME));
817 coordJob.setConf(XmlUtils.prettyPrint(conf).toString());
818 coordJob.setJobXml(XmlUtils.prettyPrint(eJob).toString());
819 coordJob.setLastActionNumber(0);
820 coordJob.setLastModifiedTime(new Date());
821
822 if (!dryrun) {
823 store.insertCoordinatorJob(coordJob);
824 }
825 return jobId;
826 }
827
828 /**
829 * For unit-testing only. Will ultimately go away
830 *
831 * @param args
832 * @throws Exception
833 * @throws JDOMException
834 */
835 public static void main(String[] args) throws Exception {
836 // TODO Auto-generated method stub
837 // Configuration conf = new XConfiguration(IOUtils.getResourceAsReader(
838 // "org/apache/oozie/coord/conf.xml", -1));
839
840 Configuration conf = new XConfiguration();
841
842 // base case
843 // conf.set(OozieClient.COORDINATOR_APP_PATH,
844 // "file:///Users/danielwo/oozie/workflows/coord/test1/");
845
846 // no input datasets
847 // conf.set(OozieClient.COORDINATOR_APP_PATH,
848 // "file:///Users/danielwo/oozie/workflows/coord/coord_noinput/");
849 // conf.set(OozieClient.COORDINATOR_APP_PATH,
850 // "file:///Users/danielwo/oozie/workflows/coord/coord_use_apppath/");
851
852 // only 1 instance
853 // conf.set(OozieClient.COORDINATOR_APP_PATH,
854 // "file:///Users/danielwo/oozie/workflows/coord/coord_oneinstance/");
855
856 // no local props in xml
857 // conf.set(OozieClient.COORDINATOR_APP_PATH,
858 // "file:///Users/danielwo/oozie/workflows/coord/coord_noprops/");
859
860 conf.set(OozieClient.COORDINATOR_APP_PATH,
861 "file:///homes/test/workspace/sandbox_krishna/oozie-main/core/src/main/java/org/apache/oozie/coord/");
862 conf.set(OozieClient.USER_NAME, "test");
863 // conf.set(OozieClient.USER_NAME, "danielwo");
864 conf.set(OozieClient.GROUP_NAME, "other");
865 // System.out.println("appXml :"+ appXml + "\n conf :"+ conf);
866 new Services().init();
867 try {
868 CoordSubmitCommand sc = new CoordSubmitCommand(conf, "TESTING");
869 String jobId = sc.call();
870 System.out.println("Job Id " + jobId);
871 Thread.sleep(80000);
872 }
873 finally {
874 Services.get().destroy();
875 }
876 }
877 }