This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.coord;
019
020 import java.io.IOException;
021 import java.io.InputStreamReader;
022 import java.io.Reader;
023 import java.io.StringReader;
024 import java.io.StringWriter;
025 import java.net.URI;
026 import java.net.URISyntaxException;
027 import java.text.DateFormat;
028 import java.text.SimpleDateFormat;
029 import java.util.ArrayList;
030 import java.util.Date;
031 import java.util.HashMap;
032 import java.util.HashSet;
033 import java.util.Iterator;
034 import java.util.List;
035 import java.util.Map;
036 import java.util.Set;
037 import java.util.TimeZone;
038 import java.util.TreeSet;
039
040 import javax.xml.transform.stream.StreamSource;
041 import javax.xml.validation.Validator;
042
043 import org.apache.hadoop.conf.Configuration;
044 import org.apache.hadoop.fs.FileSystem;
045 import org.apache.hadoop.fs.Path;
046 import org.apache.oozie.CoordinatorJobBean;
047 import org.apache.oozie.ErrorCode;
048 import org.apache.oozie.client.CoordinatorJob;
049 import org.apache.oozie.client.Job;
050 import org.apache.oozie.client.OozieClient;
051 import org.apache.oozie.client.CoordinatorJob.Execution;
052 import org.apache.oozie.command.CommandException;
053 import org.apache.oozie.command.SubmitTransitionXCommand;
054 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
055 import org.apache.oozie.coord.CoordELEvaluator;
056 import org.apache.oozie.coord.CoordELFunctions;
057 import org.apache.oozie.coord.CoordinatorJobException;
058 import org.apache.oozie.coord.TimeUnit;
059 import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor;
060 import org.apache.oozie.executor.jpa.JPAExecutorException;
061 import org.apache.oozie.service.DagXLogInfoService;
062 import org.apache.oozie.service.HadoopAccessorException;
063 import org.apache.oozie.service.HadoopAccessorService;
064 import org.apache.oozie.service.JPAService;
065 import org.apache.oozie.service.SchemaService;
066 import org.apache.oozie.service.Service;
067 import org.apache.oozie.service.Services;
068 import org.apache.oozie.service.UUIDService;
069 import org.apache.oozie.service.WorkflowAppService;
070 import org.apache.oozie.service.SchemaService.SchemaName;
071 import org.apache.oozie.service.UUIDService.ApplicationType;
072 import org.apache.oozie.util.ConfigUtils;
073 import org.apache.oozie.util.DateUtils;
074 import org.apache.oozie.util.ELEvaluator;
075 import org.apache.oozie.util.IOUtils;
076 import org.apache.oozie.util.InstrumentUtils;
077 import org.apache.oozie.util.LogUtils;
078 import org.apache.oozie.util.ParamChecker;
079 import org.apache.oozie.util.PropertiesUtils;
080 import org.apache.oozie.util.XConfiguration;
081 import org.apache.oozie.util.XLog;
082 import org.apache.oozie.util.XmlUtils;
083 import org.jdom.Attribute;
084 import org.jdom.Element;
085 import org.jdom.JDOMException;
086 import org.jdom.Namespace;
087 import org.xml.sax.SAXException;
088
089 /**
090 * This class provides the functionalities to resolve a coordinator job XML and write the job information into a DB
091 * table.
092 * <p/>
093 * Specifically it performs the following functions: 1. Resolve all the variables or properties using job
094 * configurations. 2. Insert all datasets definition as part of the <data-in> and <data-out> tags. 3. Validate the XML
095 * at runtime.
096 */
097 public class CoordSubmitXCommand extends SubmitTransitionXCommand {
098
099 private Configuration conf;
100 private final String authToken;
101 private final String bundleId;
102 private final String coordName;
103 private boolean dryrun;
104 private JPAService jpaService = null;
105 private CoordinatorJob.Status prevStatus = CoordinatorJob.Status.PREP;
106
107 public static final String CONFIG_DEFAULT = "coord-config-default.xml";
108 public static final String COORDINATOR_XML_FILE = "coordinator.xml";
109 public final String COORD_INPUT_EVENTS ="input-events";
110 public final String COORD_OUTPUT_EVENTS = "output-events";
111 public final String COORD_INPUT_EVENTS_DATA_IN ="data-in";
112 public final String COORD_OUTPUT_EVENTS_DATA_OUT = "data-out";
113
114 private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>();
115 private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>();
116
117 private CoordinatorJobBean coordJob = null;
118 /**
119 * Default timeout for normal jobs, in minutes, after which coordinator input check will timeout
120 */
121 public static final String CONF_DEFAULT_TIMEOUT_NORMAL = Service.CONF_PREFIX + "coord.normal.default.timeout";
122
123 public static final String CONF_DEFAULT_CONCURRENCY = Service.CONF_PREFIX + "coord.default.concurrency";
124
125 public static final String CONF_DEFAULT_THROTTLE = Service.CONF_PREFIX + "coord.default.throttle";
126
127 public static final String CONF_MAT_THROTTLING_FACTOR = Service.CONF_PREFIX
128 + "coord.materialization.throttling.factor";
129
130 /**
131 * Default MAX timeout in minutes, after which coordinator input check will timeout
132 */
133 public static final String CONF_DEFAULT_MAX_TIMEOUT = Service.CONF_PREFIX + "coord.default.max.timeout";
134
135
136 public static final String CONF_QUEUE_SIZE = Service.CONF_PREFIX + "CallableQueueService.queue.size";
137
138 private ELEvaluator evalFreq = null;
139 private ELEvaluator evalNofuncs = null;
140 private ELEvaluator evalData = null;
141 private ELEvaluator evalInst = null;
142 private ELEvaluator evalAction = null;
143 private ELEvaluator evalSla = null;
144
145 static {
146 String[] badUserProps = { PropertiesUtils.YEAR, PropertiesUtils.MONTH, PropertiesUtils.DAY,
147 PropertiesUtils.HOUR, PropertiesUtils.MINUTE, PropertiesUtils.DAYS, PropertiesUtils.HOURS,
148 PropertiesUtils.MINUTES, PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB,
149 PropertiesUtils.TB, PropertiesUtils.PB, PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN,
150 PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN, PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS };
151 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES);
152
153 String[] badDefaultProps = { PropertiesUtils.HADOOP_USER};
154 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES);
155 PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES);
156 }
157
158 /**
159 * Constructor to create the Coordinator Submit Command.
160 *
161 * @param conf : Configuration for Coordinator job
162 * @param authToken : To be used for authentication
163 */
164 public CoordSubmitXCommand(Configuration conf, String authToken) {
165 super("coord_submit", "coord_submit", 1);
166 this.conf = ParamChecker.notNull(conf, "conf");
167 this.authToken = ParamChecker.notEmpty(authToken, "authToken");
168 this.bundleId = null;
169 this.coordName = null;
170 }
171
172 /**
173 * Constructor to create the Coordinator Submit Command by bundle job.
174 *
175 * @param conf : Configuration for Coordinator job
176 * @param authToken : To be used for authentication
177 * @param bundleId : bundle id
178 * @param coordName : coord name
179 */
180 public CoordSubmitXCommand(Configuration conf, String authToken, String bundleId, String coordName) {
181 super("coord_submit", "coord_submit", 1);
182 this.conf = ParamChecker.notNull(conf, "conf");
183 this.authToken = ParamChecker.notEmpty(authToken, "authToken");
184 this.bundleId = ParamChecker.notEmpty(bundleId, "bundleId");
185 this.coordName = ParamChecker.notEmpty(coordName, "coordName");
186 }
187
188 /**
189 * Constructor to create the Coordinator Submit Command.
190 *
191 * @param dryrun : if dryrun
192 * @param conf : Configuration for Coordinator job
193 * @param authToken : To be used for authentication
194 */
195 public CoordSubmitXCommand(boolean dryrun, Configuration conf, String authToken) {
196 this(conf, authToken);
197 this.dryrun = dryrun;
198 }
199
200 /* (non-Javadoc)
201 * @see org.apache.oozie.command.XCommand#execute()
202 */
203 @Override
204 protected String submit() throws CommandException {
205 String jobId = null;
206 LOG.info("STARTED Coordinator Submit");
207 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
208
209 boolean exceptionOccured = false;
210 try {
211 mergeDefaultConfig();
212
213 String appXml = readAndValidateXml();
214 coordJob.setOrigJobXml(appXml);
215 LOG.debug("jobXml after initial validation " + XmlUtils.prettyPrint(appXml).toString());
216
217 String appNamespace = readAppNamespace(appXml);
218 coordJob.setAppNamespace(appNamespace);
219
220 appXml = XmlUtils.removeComments(appXml);
221 initEvaluators();
222 Element eJob = basicResolveAndIncludeDS(appXml, conf, coordJob);
223
224 // checking if the coordinator application data input/output events
225 // specify multiple data instance values in erroneous manner
226 checkMultipleTimeInstances(eJob, COORD_INPUT_EVENTS, COORD_INPUT_EVENTS_DATA_IN);
227 checkMultipleTimeInstances(eJob, COORD_OUTPUT_EVENTS, COORD_OUTPUT_EVENTS_DATA_OUT);
228
229 LOG.debug("jobXml after all validation " + XmlUtils.prettyPrint(eJob).toString());
230
231 jobId = storeToDB(eJob, coordJob);
232 // log job info for coordinator job
233 LogUtils.setLogInfo(coordJob, logInfo);
234 LOG = XLog.resetPrefix(LOG);
235
236 if (!dryrun) {
237 // submit a command to materialize jobs for the next 1 hour (3600 secs)
238 // so we don't wait 10 mins for the Service to run.
239 queue(new CoordMaterializeTransitionXCommand(jobId, 3600), 100);
240 }
241 else {
242 Date startTime = coordJob.getStartTime();
243 long startTimeMilli = startTime.getTime();
244 long endTimeMilli = startTimeMilli + (3600 * 1000);
245 Date jobEndTime = coordJob.getEndTime();
246 Date endTime = new Date(endTimeMilli);
247 if (endTime.compareTo(jobEndTime) > 0) {
248 endTime = jobEndTime;
249 }
250 jobId = coordJob.getId();
251 LOG.info("[" + jobId + "]: Update status to RUNNING");
252 coordJob.setStatus(Job.Status.RUNNING);
253 coordJob.setPending();
254 CoordActionMaterializeCommand coordActionMatCom = new CoordActionMaterializeCommand(jobId, startTime,
255 endTime);
256 Configuration jobConf = null;
257 try {
258 jobConf = new XConfiguration(new StringReader(coordJob.getConf()));
259 }
260 catch (IOException e1) {
261 LOG.warn("Configuration parse error. read from DB :" + coordJob.getConf(), e1);
262 }
263 String action = coordActionMatCom.materializeJobs(true, coordJob, jobConf, null);
264 String output = coordJob.getJobXml() + System.getProperty("line.separator")
265 + "***actions for instance***" + action;
266 return output;
267 }
268 }
269 catch (CoordinatorJobException cex) {
270 exceptionOccured = true;
271 LOG.warn("ERROR: ", cex);
272 throw new CommandException(cex);
273 }
274 catch (IllegalArgumentException iex) {
275 exceptionOccured = true;
276 LOG.warn("ERROR: ", iex);
277 throw new CommandException(ErrorCode.E1003, iex);
278 }
279 catch (Exception ex) {
280 exceptionOccured = true;
281 LOG.warn("ERROR: ", ex);
282 throw new CommandException(ErrorCode.E0803, ex);
283 }
284 finally {
285 if (exceptionOccured) {
286 if(coordJob.getId() == null || coordJob.getId().equalsIgnoreCase("")){
287 coordJob.setStatus(CoordinatorJob.Status.FAILED);
288 coordJob.resetPending();
289 }
290 }
291 }
292
293 LOG.info("ENDED Coordinator Submit jobId=" + jobId);
294 return jobId;
295 }
296
297 /*
298 * Check against multiple data instance values inside a single <instance> tag
299 * If found, the job is not submitted and user is informed to correct the error, instead of defaulting to the first instance value in the list
300 */
301 private void checkMultipleTimeInstances(Element eCoordJob, String eventType, String dataType) throws CoordinatorJobException {
302 Element eventsSpec, dataSpec, instance;
303 List<Element> instanceSpecList;
304 Namespace ns = eCoordJob.getNamespace();
305 String instanceValue;
306 eventsSpec = eCoordJob.getChild(eventType, ns);
307 if (eventsSpec != null) {
308 dataSpec = eventsSpec.getChild(dataType, ns);
309 if (dataSpec != null) {
310 // In case of input-events, there can be multiple child <instance> datasets. Iterating to ensure none of them have errors
311 instanceSpecList = dataSpec.getChildren("instance", ns);
312 Iterator instanceIter = instanceSpecList.iterator();
313 while(instanceIter.hasNext()) {
314 instance = ((Element) instanceIter.next());
315 if(instance.getContentSize() == 0) { //empty string or whitespace
316 throw new CoordinatorJobException(ErrorCode.E1021, "<instance> tag within " + eventType + " is empty!");
317 }
318 instanceValue = instance.getContent(0).toString();
319 boolean isInvalid = false;
320 try {
321 isInvalid = evalAction.checkForExistence(instanceValue, ",");
322 } catch (Exception e) {
323 handleELParseException(eventType, dataType, instanceValue);
324 }
325 if (isInvalid) { // reaching this block implies instance is not empty i.e. length > 0
326 handleExpresionWithMultipleInstances(eventType, dataType, instanceValue);
327 }
328 }
329 }
330 }
331 }
332
333 private void handleELParseException(String eventType, String dataType, String instanceValue)
334 throws CoordinatorJobException {
335 String correctAction = null;
336 if(dataType.equals(COORD_INPUT_EVENTS_DATA_IN)) {
337 correctAction = "Coordinator app definition should have valid <instance> tag for data-in";
338 } else if(dataType.equals(COORD_OUTPUT_EVENTS_DATA_OUT)) {
339 correctAction = "Coordinator app definition should have valid <instance> tag for data-out";
340 }
341 throw new CoordinatorJobException(ErrorCode.E1021, eventType + " instance '" + instanceValue
342 + "' is not valid. Coordinator job NOT SUBMITTED. " + correctAction);
343 }
344
345 private void handleExpresionWithMultipleInstances(String eventType, String dataType, String instanceValue)
346 throws CoordinatorJobException {
347 String correctAction = null;
348 if(dataType.equals(COORD_INPUT_EVENTS_DATA_IN)) {
349 correctAction = "Coordinator app definition should have separate <instance> tag per data-in instance";
350 } else if(dataType.equals(COORD_OUTPUT_EVENTS_DATA_OUT)) {
351 correctAction = "Coordinator app definition can have only one <instance> tag per data-out instance";
352 }
353 throw new CoordinatorJobException(ErrorCode.E1021, eventType + " instance '" + instanceValue
354 + "' contains more than one date instance. Coordinator job NOT SUBMITTED. " + correctAction);
355 }
356
357 /**
358 * Read the application XML and validate against coordinator Schema
359 *
360 * @return validated coordinator XML
361 * @throws CoordinatorJobException thrown if unable to read or validate coordinator xml
362 */
363 private String readAndValidateXml() throws CoordinatorJobException {
364 String appPath = ParamChecker.notEmpty(conf.get(OozieClient.COORDINATOR_APP_PATH),
365 OozieClient.COORDINATOR_APP_PATH);
366 String coordXml = readDefinition(appPath);
367 validateXml(coordXml);
368 return coordXml;
369 }
370
371 /**
372 * Validate against Coordinator XSD file
373 *
374 * @param xmlContent : Input coordinator xml
375 * @throws CoordinatorJobException thrown if unable to validate coordinator xml
376 */
377 private void validateXml(String xmlContent) throws CoordinatorJobException {
378 javax.xml.validation.Schema schema = Services.get().get(SchemaService.class).getSchema(SchemaName.COORDINATOR);
379 Validator validator = schema.newValidator();
380 try {
381 validator.validate(new StreamSource(new StringReader(xmlContent)));
382 }
383 catch (SAXException ex) {
384 LOG.warn("SAXException :", ex);
385 throw new CoordinatorJobException(ErrorCode.E0701, ex.getMessage(), ex);
386 }
387 catch (IOException ex) {
388 LOG.warn("IOException :", ex);
389 throw new CoordinatorJobException(ErrorCode.E0702, ex.getMessage(), ex);
390 }
391 }
392
393 /**
394 * Read the application XML schema namespace
395 *
396 * @param xmlContent input coordinator xml
397 * @return app xml namespace
398 * @throws CoordinatorJobException
399 */
400 private String readAppNamespace(String xmlContent) throws CoordinatorJobException {
401 try {
402 Element coordXmlElement = XmlUtils.parseXml(xmlContent);
403 Namespace ns = coordXmlElement.getNamespace();
404 if (ns != null && bundleId != null && ns.getURI().equals(SchemaService.COORDINATOR_NAMESPACE_URI_1)) {
405 throw new CoordinatorJobException(ErrorCode.E1319, "bundle app can not submit coordinator namespace "
406 + SchemaService.COORDINATOR_NAMESPACE_URI_1 + ", please use 0.2 or later");
407 }
408 if (ns != null) {
409 return ns.getURI();
410 }
411 else {
412 throw new CoordinatorJobException(ErrorCode.E0700, "the application xml namespace is not given");
413 }
414 }
415 catch (JDOMException ex) {
416 LOG.warn("JDOMException :", ex);
417 throw new CoordinatorJobException(ErrorCode.E0700, ex.getMessage(), ex);
418 }
419 }
420
421 /**
422 * Merge default configuration with user-defined configuration.
423 *
424 * @throws CommandException thrown if failed to read or merge configurations
425 */
426 protected void mergeDefaultConfig() throws CommandException {
427 Path configDefault = null;
428 try {
429 String coordAppPathStr = conf.get(OozieClient.COORDINATOR_APP_PATH);
430 Path coordAppPath = new Path(coordAppPathStr);
431 String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
432 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
433 Configuration fsConf = has.createJobConf(coordAppPath.toUri().getAuthority());
434 FileSystem fs = has.createFileSystem(user, coordAppPath.toUri(), fsConf);
435
436 // app path could be a directory
437 if (!fs.isFile(coordAppPath)) {
438 configDefault = new Path(coordAppPath, CONFIG_DEFAULT);
439 } else {
440 configDefault = new Path(coordAppPath.getParent(), CONFIG_DEFAULT);
441 }
442
443 if (fs.exists(configDefault)) {
444 Configuration defaultConf = new XConfiguration(fs.open(configDefault));
445 PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES);
446 XConfiguration.injectDefaults(defaultConf, conf);
447 }
448 else {
449 LOG.info("configDefault Doesn't exist " + configDefault);
450 }
451 PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES);
452
453 // Resolving all variables in the job properties.
454 // This ensures the Hadoop Configuration semantics is preserved.
455 XConfiguration resolvedVarsConf = new XConfiguration();
456 for (Map.Entry<String, String> entry : conf) {
457 resolvedVarsConf.set(entry.getKey(), conf.get(entry.getKey()));
458 }
459 conf = resolvedVarsConf;
460 }
461 catch (IOException e) {
462 throw new CommandException(ErrorCode.E0702, e.getMessage() + " : Problem reading default config "
463 + configDefault, e);
464 }
465 catch (HadoopAccessorException e) {
466 throw new CommandException(e);
467 }
468 LOG.debug("Merged CONF :" + XmlUtils.prettyPrint(conf).toString());
469 }
470
471 /**
472 * The method resolve all the variables that are defined in configuration. It also include the data set definition
473 * from dataset file into XML.
474 *
475 * @param appXml : Original job XML
476 * @param conf : Configuration of the job
477 * @param coordJob : Coordinator job bean to be populated.
478 * @return Resolved and modified job XML element.
479 * @throws CoordinatorJobException thrown if failed to resolve basic entities or include referred datasets
480 * @throws Exception thrown if failed to resolve basic entities or include referred datasets
481 */
482 public Element basicResolveAndIncludeDS(String appXml, Configuration conf, CoordinatorJobBean coordJob)
483 throws CoordinatorJobException, Exception {
484 Element basicResolvedApp = resolveInitial(conf, appXml, coordJob);
485 includeDataSets(basicResolvedApp, conf);
486 return basicResolvedApp;
487 }
488
489 /**
490 * Insert data set into data-in and data-out tags.
491 *
492 * @param eAppXml : coordinator application XML
493 * @param eDatasets : DataSet XML
494 */
495 @SuppressWarnings("unchecked")
496 private void insertDataSet(Element eAppXml, Element eDatasets) {
497 // Adding DS definition in the coordinator XML
498 Element inputList = eAppXml.getChild("input-events", eAppXml.getNamespace());
499 if (inputList != null) {
500 for (Element dataIn : (List<Element>) inputList.getChildren("data-in", eAppXml.getNamespace())) {
501 Element eDataset = findDataSet(eDatasets, dataIn.getAttributeValue("dataset"));
502 dataIn.getContent().add(0, eDataset);
503 }
504 }
505 Element outputList = eAppXml.getChild("output-events", eAppXml.getNamespace());
506 if (outputList != null) {
507 for (Element dataOut : (List<Element>) outputList.getChildren("data-out", eAppXml.getNamespace())) {
508 Element eDataset = findDataSet(eDatasets, dataOut.getAttributeValue("dataset"));
509 dataOut.getContent().add(0, eDataset);
510 }
511 }
512 }
513
514 /**
515 * Find a specific dataset from a list of Datasets.
516 *
517 * @param eDatasets : List of data sets
518 * @param name : queried data set name
519 * @return one Dataset element. otherwise throw Exception
520 */
521 @SuppressWarnings("unchecked")
522 private static Element findDataSet(Element eDatasets, String name) {
523 for (Element eDataset : (List<Element>) eDatasets.getChildren("dataset", eDatasets.getNamespace())) {
524 if (eDataset.getAttributeValue("name").equals(name)) {
525 eDataset = (Element) eDataset.clone();
526 eDataset.detach();
527 return eDataset;
528 }
529 }
530 throw new RuntimeException("undefined dataset: " + name);
531 }
532
533 /**
534 * Initialize all the required EL Evaluators.
535 */
536 protected void initEvaluators() {
537 evalFreq = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-submit-freq");
538 evalNofuncs = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-submit-nofuncs");
539 evalInst = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-submit-instances");
540 evalSla = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-sla-submit");
541 evalAction = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-action-start");
542 }
543
544 /**
545 * Resolve basic entities using job Configuration.
546 *
547 * @param conf :Job configuration
548 * @param appXml : Original job XML
549 * @param coordJob : Coordinator job bean to be populated.
550 * @return Resolved job XML element.
551 * @throws CoordinatorJobException thrown if failed to resolve basic entities
552 * @throws Exception thrown if failed to resolve basic entities
553 */
554 @SuppressWarnings("unchecked")
555 protected Element resolveInitial(Configuration conf, String appXml, CoordinatorJobBean coordJob)
556 throws CoordinatorJobException, Exception {
557 Element eAppXml = XmlUtils.parseXml(appXml);
558 // job's main attributes
559 // frequency
560 String val = resolveAttribute("frequency", eAppXml, evalFreq);
561 int ival = ParamChecker.checkInteger(val, "frequency");
562 ParamChecker.checkGTZero(ival, "frequency");
563 coordJob.setFrequency(ival);
564 TimeUnit tmp = (evalFreq.getVariable("timeunit") == null) ? TimeUnit.MINUTE : ((TimeUnit) evalFreq
565 .getVariable("timeunit"));
566 addAnAttribute("freq_timeunit", eAppXml, tmp.toString());
567 // TimeUnit
568 coordJob.setTimeUnit(CoordinatorJob.Timeunit.valueOf(tmp.toString()));
569 // End Of Duration
570 tmp = evalFreq.getVariable("endOfDuration") == null ? TimeUnit.NONE : ((TimeUnit) evalFreq
571 .getVariable("endOfDuration"));
572 addAnAttribute("end_of_duration", eAppXml, tmp.toString());
573 // coordJob.setEndOfDuration(tmp) // TODO: Add new attribute in Job bean
574
575 // Application name
576 if (this.coordName == null) {
577 val = resolveAttribute("name", eAppXml, evalNofuncs);
578 coordJob.setAppName(val);
579 }
580 else {
581 // this coord job is created from bundle
582 coordJob.setAppName(this.coordName);
583 }
584
585 // start time
586 val = resolveAttribute("start", eAppXml, evalNofuncs);
587 ParamChecker.checkUTC(val, "start");
588 coordJob.setStartTime(DateUtils.parseDateUTC(val));
589 // end time
590 val = resolveAttribute("end", eAppXml, evalNofuncs);
591 ParamChecker.checkUTC(val, "end");
592 coordJob.setEndTime(DateUtils.parseDateUTC(val));
593 // Time zone
594 val = resolveAttribute("timezone", eAppXml, evalNofuncs);
595 ParamChecker.checkTimeZone(val, "timezone");
596 coordJob.setTimeZone(val);
597
598 // controls
599 val = resolveTagContents("timeout", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs);
600 if (val == "") {
601 val = Services.get().getConf().get(CONF_DEFAULT_TIMEOUT_NORMAL);
602 }
603
604 ival = ParamChecker.checkInteger(val, "timeout");
605 if (ival < 0 || ival > Services.get().getConf().getInt(CONF_DEFAULT_MAX_TIMEOUT, 129600)) {
606 ival = Services.get().getConf().getInt(CONF_DEFAULT_MAX_TIMEOUT, 129600);
607 }
608 coordJob.setTimeout(ival);
609
610 val = resolveTagContents("concurrency", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs);
611 if (val == null || val.isEmpty()) {
612 val = Services.get().getConf().get(CONF_DEFAULT_CONCURRENCY, "1");
613 }
614 ival = ParamChecker.checkInteger(val, "concurrency");
615 coordJob.setConcurrency(ival);
616
617 val = resolveTagContents("throttle", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs);
618 if (val == null || val.isEmpty()) {
619 int defaultThrottle = Services.get().getConf().getInt(CONF_DEFAULT_THROTTLE, 12);
620 ival = defaultThrottle;
621 }
622 else {
623 ival = ParamChecker.checkInteger(val, "throttle");
624 }
625 int maxQueue = Services.get().getConf().getInt(CONF_QUEUE_SIZE, 10000);
626 float factor = Services.get().getConf().getFloat(CONF_MAT_THROTTLING_FACTOR, 0.10f);
627 int maxThrottle = (int) (maxQueue * factor);
628 if (ival > maxThrottle || ival < 1) {
629 ival = maxThrottle;
630 }
631 LOG.debug("max throttle " + ival);
632 coordJob.setMatThrottling(ival);
633
634 val = resolveTagContents("execution", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs);
635 if (val == "") {
636 val = Execution.FIFO.toString();
637 }
638 coordJob.setExecution(Execution.valueOf(val));
639 String[] acceptedVals = { Execution.LIFO.toString(), Execution.FIFO.toString(), Execution.LAST_ONLY.toString() };
640 ParamChecker.isMember(val, acceptedVals, "execution");
641
642 // datasets
643 resolveTagContents("include", eAppXml.getChild("datasets", eAppXml.getNamespace()), evalNofuncs);
644 // for each data set
645 resolveDataSets(eAppXml);
646 HashMap<String, String> dataNameList = new HashMap<String, String>();
647 resolveIOEvents(eAppXml, dataNameList);
648
649 resolveTagContents("app-path", eAppXml.getChild("action", eAppXml.getNamespace()).getChild("workflow",
650 eAppXml.getNamespace()), evalNofuncs);
651 // TODO: If action or workflow tag is missing, NullPointerException will
652 // occur
653 Element configElem = eAppXml.getChild("action", eAppXml.getNamespace()).getChild("workflow",
654 eAppXml.getNamespace()).getChild("configuration", eAppXml.getNamespace());
655 evalData = CoordELEvaluator.createELEvaluatorForDataEcho(conf, "coord-job-submit-data", dataNameList);
656 if (configElem != null) {
657 for (Element propElem : (List<Element>) configElem.getChildren("property", configElem.getNamespace())) {
658 resolveTagContents("name", propElem, evalData);
659 // Want to check the data-integrity but don't want to modify the
660 // XML
661 // for properties only
662 Element tmpProp = (Element) propElem.clone();
663 resolveTagContents("value", tmpProp, evalData);
664 }
665 }
666 resolveSLA(eAppXml, coordJob);
667 return eAppXml;
668 }
669
670 /**
671 * Resolve SLA events
672 *
673 * @param eAppXml job XML
674 * @param coordJob coordinator job bean
675 * @throws CommandException thrown if failed to resolve sla events
676 */
677 private void resolveSLA(Element eAppXml, CoordinatorJobBean coordJob) throws CommandException {
678 Element eSla = eAppXml.getChild("action", eAppXml.getNamespace()).getChild("info",
679 Namespace.getNamespace(SchemaService.SLA_NAME_SPACE_URI));
680
681 if (eSla != null) {
682 String slaXml = XmlUtils.prettyPrint(eSla).toString();
683 try {
684 // EL evaluation
685 slaXml = evalSla.evaluate(slaXml, String.class);
686 // Validate against semantic SXD
687 XmlUtils.validateData(slaXml, SchemaName.SLA_ORIGINAL);
688 }
689 catch (Exception e) {
690 throw new CommandException(ErrorCode.E1004, "Validation ERROR :" + e.getMessage(), e);
691 }
692 }
693 }
694
695 /**
696 * Resolve input-events/data-in and output-events/data-out tags.
697 *
698 * @param eJob : Job element
699 * @throws CoordinatorJobException thrown if failed to resolve input and output events
700 */
701 @SuppressWarnings("unchecked")
702 private void resolveIOEvents(Element eJobOrg, HashMap<String, String> dataNameList) throws CoordinatorJobException {
703 // Resolving input-events/data-in
704 // Clone the job and don't update anything in the original
705 Element eJob = (Element) eJobOrg.clone();
706 Element inputList = eJob.getChild("input-events", eJob.getNamespace());
707 if (inputList != null) {
708 TreeSet<String> eventNameSet = new TreeSet<String>();
709 for (Element dataIn : (List<Element>) inputList.getChildren("data-in", eJob.getNamespace())) {
710 String dataInName = dataIn.getAttributeValue("name");
711 dataNameList.put(dataInName, "data-in");
712 // check whether there is any duplicate data-in name
713 if (eventNameSet.contains(dataInName)) {
714 throw new RuntimeException("Duplicate dataIn name " + dataInName);
715 }
716 else {
717 eventNameSet.add(dataInName);
718 }
719 resolveTagContents("instance", dataIn, evalInst);
720 resolveTagContents("start-instance", dataIn, evalInst);
721 resolveTagContents("end-instance", dataIn, evalInst);
722 }
723 }
724 // Resolving output-events/data-out
725 Element outputList = eJob.getChild("output-events", eJob.getNamespace());
726 if (outputList != null) {
727 TreeSet<String> eventNameSet = new TreeSet<String>();
728 for (Element dataOut : (List<Element>) outputList.getChildren("data-out", eJob.getNamespace())) {
729 String dataOutName = dataOut.getAttributeValue("name");
730 dataNameList.put(dataOutName, "data-out");
731 // check whether there is any duplicate data-out name
732 if (eventNameSet.contains(dataOutName)) {
733 throw new RuntimeException("Duplicate dataIn name " + dataOutName);
734 }
735 else {
736 eventNameSet.add(dataOutName);
737 }
738 resolveTagContents("instance", dataOut, evalInst);
739 }
740 }
741
742 }
743
744 /**
745 * Add an attribute into XML element.
746 *
747 * @param attrName :attribute name
748 * @param elem : Element to add attribute
749 * @param value :Value of attribute
750 */
751 private void addAnAttribute(String attrName, Element elem, String value) {
752 elem.setAttribute(attrName, value);
753 }
754
755 /**
756 * Resolve datasets using job configuration.
757 *
758 * @param eAppXml : Job Element XML
759 * @throws Exception thrown if failed to resolve datasets
760 */
761 @SuppressWarnings("unchecked")
762 private void resolveDataSets(Element eAppXml) throws Exception {
763 Element datasetList = eAppXml.getChild("datasets", eAppXml.getNamespace());
764 if (datasetList != null) {
765
766 List<Element> dsElems = datasetList.getChildren("dataset", eAppXml.getNamespace());
767 resolveDataSets(dsElems);
768 resolveTagContents("app-path", eAppXml.getChild("action", eAppXml.getNamespace()).getChild("workflow",
769 eAppXml.getNamespace()), evalNofuncs);
770 }
771 }
772
773 /**
774 * Resolve datasets using job configuration.
775 *
776 * @param dsElems : Data set XML element.
777 * @throws CoordinatorJobException thrown if failed to resolve datasets
778 */
779 private void resolveDataSets(List<Element> dsElems) throws CoordinatorJobException {
780 for (Element dsElem : dsElems) {
781 // Setting up default TimeUnit and EndOFDuraion
782 evalFreq.setVariable("timeunit", TimeUnit.MINUTE);
783 evalFreq.setVariable("endOfDuration", TimeUnit.NONE);
784
785 String val = resolveAttribute("frequency", dsElem, evalFreq);
786 int ival = ParamChecker.checkInteger(val, "frequency");
787 ParamChecker.checkGTZero(ival, "frequency");
788 addAnAttribute("freq_timeunit", dsElem, evalFreq.getVariable("timeunit") == null ? TimeUnit.MINUTE
789 .toString() : ((TimeUnit) evalFreq.getVariable("timeunit")).toString());
790 addAnAttribute("end_of_duration", dsElem, evalFreq.getVariable("endOfDuration") == null ? TimeUnit.NONE
791 .toString() : ((TimeUnit) evalFreq.getVariable("endOfDuration")).toString());
792 val = resolveAttribute("initial-instance", dsElem, evalNofuncs);
793 ParamChecker.checkUTC(val, "initial-instance");
794 checkInitialInstance(val);
795 val = resolveAttribute("timezone", dsElem, evalNofuncs);
796 ParamChecker.checkTimeZone(val, "timezone");
797 resolveTagContents("uri-template", dsElem, evalNofuncs);
798 resolveTagContents("done-flag", dsElem, evalNofuncs);
799 }
800 }
801
802 /**
803 * Resolve the content of a tag.
804 *
805 * @param tagName : Tag name of job XML i.e. <timeout> 10 </timeout>
806 * @param elem : Element where the tag exists.
807 * @param eval : EL evealuator
808 * @return Resolved tag content.
809 * @throws CoordinatorJobException thrown if failed to resolve tag content
810 */
811 @SuppressWarnings("unchecked")
812 private String resolveTagContents(String tagName, Element elem, ELEvaluator eval) throws CoordinatorJobException {
813 String ret = "";
814 if (elem != null) {
815 for (Element tagElem : (List<Element>) elem.getChildren(tagName, elem.getNamespace())) {
816 if (tagElem != null) {
817 String updated;
818 try {
819 updated = CoordELFunctions.evalAndWrap(eval, tagElem.getText().trim());
820
821 }
822 catch (Exception e) {
823 throw new CoordinatorJobException(ErrorCode.E1004, e.getMessage(), e);
824 }
825 tagElem.removeContent();
826 tagElem.addContent(updated);
827 ret += updated;
828 }
829 }
830 }
831 return ret;
832 }
833
834 /**
835 * Resolve an attribute value.
836 *
837 * @param attrName : Attribute name.
838 * @param elem : XML Element where attribute is defiend
839 * @param eval : ELEvaluator used to resolve
840 * @return Resolved attribute value
841 * @throws CoordinatorJobException thrown if failed to resolve an attribute value
842 */
843 private String resolveAttribute(String attrName, Element elem, ELEvaluator eval) throws CoordinatorJobException {
844 Attribute attr = elem.getAttribute(attrName);
845 String val = null;
846 if (attr != null) {
847 try {
848 val = CoordELFunctions.evalAndWrap(eval, attr.getValue().trim());
849 }
850 catch (Exception e) {
851 throw new CoordinatorJobException(ErrorCode.E1004, e.getMessage(), e);
852 }
853 attr.setValue(val);
854 }
855 return val;
856 }
857
858 /**
859 * Include referred datasets into XML.
860 *
861 * @param resolvedXml : Job XML element.
862 * @param conf : Job configuration
863 * @throws CoordinatorJobException thrown if failed to include referred datasets into XML
864 */
865 @SuppressWarnings("unchecked")
866 protected void includeDataSets(Element resolvedXml, Configuration conf) throws CoordinatorJobException {
867 Element datasets = resolvedXml.getChild("datasets", resolvedXml.getNamespace());
868 Element allDataSets = new Element("all_datasets", resolvedXml.getNamespace());
869 List<String> dsList = new ArrayList<String>();
870 if (datasets != null) {
871 for (Element includeElem : (List<Element>) datasets.getChildren("include", datasets.getNamespace())) {
872 String incDSFile = includeElem.getTextTrim();
873 includeOneDSFile(incDSFile, dsList, allDataSets, datasets.getNamespace());
874 }
875 for (Element e : (List<Element>) datasets.getChildren("dataset", datasets.getNamespace())) {
876 String dsName = e.getAttributeValue("name");
877 if (dsList.contains(dsName)) {// Override with this DS
878 // Remove old DS
879 removeDataSet(allDataSets, dsName);
880 }
881 else {
882 dsList.add(dsName);
883 }
884 allDataSets.addContent((Element) e.clone());
885 }
886 }
887 insertDataSet(resolvedXml, allDataSets);
888 resolvedXml.removeChild("datasets", resolvedXml.getNamespace());
889 }
890
891 /**
892 * Include one dataset file.
893 *
894 * @param incDSFile : Include data set filename.
895 * @param dsList :List of dataset names to verify the duplicate.
896 * @param allDataSets : Element that includes all dataset definitions.
897 * @param dsNameSpace : Data set name space
898 * @throws CoordinatorJobException thrown if failed to include one dataset file
899 */
900 @SuppressWarnings("unchecked")
901 private void includeOneDSFile(String incDSFile, List<String> dsList, Element allDataSets, Namespace dsNameSpace)
902 throws CoordinatorJobException {
903 Element tmpDataSets = null;
904 try {
905 String dsXml = readDefinition(incDSFile);
906 LOG.debug("DSFILE :" + incDSFile + "\n" + dsXml);
907 tmpDataSets = XmlUtils.parseXml(dsXml);
908 }
909 catch (JDOMException e) {
910 LOG.warn("Error parsing included dataset [{0}]. Message [{1}]", incDSFile, e.getMessage());
911 throw new CoordinatorJobException(ErrorCode.E0700, e.getMessage());
912 }
913 resolveDataSets(tmpDataSets.getChildren("dataset"));
914 for (Element e : (List<Element>) tmpDataSets.getChildren("dataset")) {
915 String dsName = e.getAttributeValue("name");
916 if (dsList.contains(dsName)) {
917 throw new RuntimeException("Duplicate Dataset " + dsName);
918 }
919 dsList.add(dsName);
920 Element tmp = (Element) e.clone();
921 // TODO: Don't like to over-write the external/include DS's namespace
922 tmp.setNamespace(dsNameSpace);
923 tmp.getChild("uri-template").setNamespace(dsNameSpace);
924 if (e.getChild("done-flag") != null) {
925 tmp.getChild("done-flag").setNamespace(dsNameSpace);
926 }
927 allDataSets.addContent(tmp);
928 }
929 // nested include
930 for (Element includeElem : (List<Element>) tmpDataSets.getChildren("include", tmpDataSets.getNamespace())) {
931 String incFile = includeElem.getTextTrim();
932 includeOneDSFile(incFile, dsList, allDataSets, dsNameSpace);
933 }
934 }
935
936 /**
937 * Remove a dataset from a list of dataset.
938 *
939 * @param eDatasets : List of dataset
940 * @param name : Dataset name to be removed.
941 */
942 @SuppressWarnings("unchecked")
943 private static void removeDataSet(Element eDatasets, String name) {
944 for (Element eDataset : (List<Element>) eDatasets.getChildren("dataset", eDatasets.getNamespace())) {
945 if (eDataset.getAttributeValue("name").equals(name)) {
946 eDataset.detach();
947 }
948 }
949 throw new RuntimeException("undefined dataset: " + name);
950 }
951
952 /**
953 * Read coordinator definition.
954 *
955 * @param appPath application path.
956 * @return coordinator definition.
957 * @throws CoordinatorJobException thrown if the definition could not be read.
958 */
959 protected String readDefinition(String appPath) throws CoordinatorJobException {
960 String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
961 // Configuration confHadoop = CoordUtils.getHadoopConf(conf);
962 try {
963 URI uri = new URI(appPath);
964 LOG.debug("user =" + user);
965 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
966 Configuration fsConf = has.createJobConf(uri.getAuthority());
967 FileSystem fs = has.createFileSystem(user, uri, fsConf);
968 Path appDefPath = null;
969
970 // app path could be a directory
971 Path path = new Path(uri.getPath());
972 // check file exists for dataset include file, app xml already checked
973 if (!fs.exists(path)) {
974 throw new URISyntaxException(path.toString(), "path not existed : " + path.toString());
975 }
976 if (!fs.isFile(path)) {
977 appDefPath = new Path(path, COORDINATOR_XML_FILE);
978 } else {
979 appDefPath = path;
980 }
981
982 Reader reader = new InputStreamReader(fs.open(appDefPath));
983 StringWriter writer = new StringWriter();
984 IOUtils.copyCharStream(reader, writer);
985 return writer.toString();
986 }
987 catch (IOException ex) {
988 LOG.warn("IOException :" + XmlUtils.prettyPrint(conf), ex);
989 throw new CoordinatorJobException(ErrorCode.E1001, ex.getMessage(), ex);
990 }
991 catch (URISyntaxException ex) {
992 LOG.warn("URISyException :" + ex.getMessage());
993 throw new CoordinatorJobException(ErrorCode.E1002, appPath, ex.getMessage(), ex);
994 }
995 catch (HadoopAccessorException ex) {
996 throw new CoordinatorJobException(ex);
997 }
998 catch (Exception ex) {
999 LOG.warn("Exception :", ex);
1000 throw new CoordinatorJobException(ErrorCode.E1001, ex.getMessage(), ex);
1001 }
1002 }
1003
1004 /**
1005 * Write a coordinator job into database
1006 *
1007 * @param eJob : XML element of job
1008 * @param coordJob : Coordinator job bean
1009 * @return Job id
1010 * @throws CommandException thrown if unable to save coordinator job to db
1011 */
1012 private String storeToDB(Element eJob, CoordinatorJobBean coordJob) throws CommandException {
1013 String jobId = Services.get().get(UUIDService.class).generateId(ApplicationType.COORDINATOR);
1014 coordJob.setId(jobId);
1015 coordJob.setAuthToken(this.authToken);
1016
1017 coordJob.setAppPath(conf.get(OozieClient.COORDINATOR_APP_PATH));
1018 coordJob.setCreatedTime(new Date());
1019 coordJob.setUser(conf.get(OozieClient.USER_NAME));
1020 String group = ConfigUtils.getWithDeprecatedCheck(conf, OozieClient.JOB_ACL, OozieClient.GROUP_NAME, null);
1021 coordJob.setGroup(group);
1022 coordJob.setConf(XmlUtils.prettyPrint(conf).toString());
1023 coordJob.setJobXml(XmlUtils.prettyPrint(eJob).toString());
1024 coordJob.setLastActionNumber(0);
1025 coordJob.setLastModifiedTime(new Date());
1026
1027 if (!dryrun) {
1028 coordJob.setLastModifiedTime(new Date());
1029 try {
1030 jpaService.execute(new CoordJobInsertJPAExecutor(coordJob));
1031 }
1032 catch (JPAExecutorException je) {
1033 throw new CommandException(je);
1034 }
1035 }
1036 return jobId;
1037 }
1038
1039 /*
1040 * this method checks if the initial-instance specified for a particular
1041 is not a date earlier than the oozie server default Jan 01, 1970 00:00Z UTC
1042 */
1043 private void checkInitialInstance(String val) throws CoordinatorJobException, IllegalArgumentException {
1044 Date initialInstance, givenInstance;
1045 DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm'Z'");
1046 df.setTimeZone(TimeZone.getTimeZone("UTC"));
1047 try {
1048 initialInstance = DateUtils.parseDateUTC("1970-01-01T00:00Z");
1049 givenInstance = DateUtils.parseDateUTC(val);
1050 }
1051 catch (Exception e) {
1052 throw new IllegalArgumentException("Unable to parse dataset initial-instance string '" + val + "' to Date object. ",e);
1053 }
1054 if(givenInstance.compareTo(initialInstance) < 0) {
1055 throw new CoordinatorJobException(ErrorCode.E1021, "Dataset initial-instance " + df.format(givenInstance) + " is earlier than the default initial instance " + df.format(initialInstance));
1056 }
1057 }
1058
1059 /* (non-Javadoc)
1060 * @see org.apache.oozie.command.XCommand#getEntityKey()
1061 */
1062 @Override
1063 public String getEntityKey() {
1064 return null;
1065 }
1066
1067 /* (non-Javadoc)
1068 * @see org.apache.oozie.command.XCommand#isLockRequired()
1069 */
1070 @Override
1071 protected boolean isLockRequired() {
1072 return false;
1073 }
1074
1075 /* (non-Javadoc)
1076 * @see org.apache.oozie.command.XCommand#loadState()
1077 */
1078 @Override
1079 protected void loadState() throws CommandException {
1080 jpaService = Services.get().get(JPAService.class);
1081 if (jpaService == null) {
1082 throw new CommandException(ErrorCode.E0610);
1083 }
1084 coordJob = new CoordinatorJobBean();
1085 if (this.bundleId != null) {
1086 // this coord job is created from bundle
1087 coordJob.setBundleId(this.bundleId);
1088 // first use bundle id if submit thru bundle
1089 logInfo.setParameter(DagXLogInfoService.JOB, this.bundleId);
1090 LogUtils.setLogInfo(logInfo);
1091 }
1092 if (this.coordName != null) {
1093 // this coord job is created from bundle
1094 coordJob.setAppName(this.coordName);
1095 }
1096 setJob(coordJob);
1097
1098 }
1099
1100 /* (non-Javadoc)
1101 * @see org.apache.oozie.command.XCommand#verifyPrecondition()
1102 */
1103 @Override
1104 protected void verifyPrecondition() throws CommandException {
1105
1106 }
1107
1108 /* (non-Javadoc)
1109 * @see org.apache.oozie.command.TransitionXCommand#notifyParent()
1110 */
1111 @Override
1112 public void notifyParent() throws CommandException {
1113 // update bundle action
1114 if (coordJob.getBundleId() != null) {
1115 LOG.debug("Updating bundle record: " + coordJob.getBundleId() + " for coord id: " + coordJob.getId());
1116 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus);
1117 bundleStatusUpdate.call();
1118 }
1119 }
1120
1121 /* (non-Javadoc)
1122 * @see org.apache.oozie.command.TransitionXCommand#updateJob()
1123 */
1124 @Override
1125 public void updateJob() throws CommandException {
1126 }
1127
1128 /* (non-Javadoc)
1129 * @see org.apache.oozie.command.TransitionXCommand#getJob()
1130 */
1131 @Override
1132 public Job getJob() {
1133 return coordJob;
1134 }
1135 }