This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.coord;
019
020 import java.io.IOException;
021 import java.io.InputStreamReader;
022 import java.io.Reader;
023 import java.io.StringReader;
024 import java.io.StringWriter;
025 import java.net.URI;
026 import java.net.URISyntaxException;
027 import java.util.ArrayList;
028 import java.util.Date;
029 import java.util.HashMap;
030 import java.util.HashSet;
031 import java.util.Iterator;
032 import java.util.List;
033 import java.util.Map;
034 import java.util.Set;
035 import java.util.TreeSet;
036
037 import javax.xml.transform.stream.StreamSource;
038 import javax.xml.validation.Validator;
039
040 import org.apache.hadoop.conf.Configuration;
041 import org.apache.hadoop.fs.FileSystem;
042 import org.apache.hadoop.fs.Path;
043 import org.apache.oozie.CoordinatorJobBean;
044 import org.apache.oozie.ErrorCode;
045 import org.apache.oozie.client.CoordinatorJob;
046 import org.apache.oozie.client.Job;
047 import org.apache.oozie.client.OozieClient;
048 import org.apache.oozie.client.CoordinatorJob.Execution;
049 import org.apache.oozie.command.CommandException;
050 import org.apache.oozie.command.SubmitTransitionXCommand;
051 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
052 import org.apache.oozie.coord.CoordELEvaluator;
053 import org.apache.oozie.coord.CoordELFunctions;
054 import org.apache.oozie.coord.CoordinatorJobException;
055 import org.apache.oozie.coord.TimeUnit;
056 import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor;
057 import org.apache.oozie.executor.jpa.JPAExecutorException;
058 import org.apache.oozie.service.DagXLogInfoService;
059 import org.apache.oozie.service.HadoopAccessorException;
060 import org.apache.oozie.service.HadoopAccessorService;
061 import org.apache.oozie.service.JPAService;
062 import org.apache.oozie.service.SchemaService;
063 import org.apache.oozie.service.Service;
064 import org.apache.oozie.service.Services;
065 import org.apache.oozie.service.UUIDService;
066 import org.apache.oozie.service.SchemaService.SchemaName;
067 import org.apache.oozie.service.UUIDService.ApplicationType;
068 import org.apache.oozie.util.ConfigUtils;
069 import org.apache.oozie.util.DateUtils;
070 import org.apache.oozie.util.ELEvaluator;
071 import org.apache.oozie.util.ELUtils;
072 import org.apache.oozie.util.IOUtils;
073 import org.apache.oozie.util.InstrumentUtils;
074 import org.apache.oozie.util.LogUtils;
075 import org.apache.oozie.util.ParamChecker;
076 import org.apache.oozie.util.ParameterVerifier;
077 import org.apache.oozie.util.ParameterVerifierException;
078 import org.apache.oozie.util.PropertiesUtils;
079 import org.apache.oozie.util.XConfiguration;
080 import org.apache.oozie.util.XLog;
081 import org.apache.oozie.util.XmlUtils;
082 import org.jdom.Attribute;
083 import org.jdom.Element;
084 import org.jdom.JDOMException;
085 import org.jdom.Namespace;
086 import org.xml.sax.SAXException;
087
088 /**
089 * This class provides the functionalities to resolve a coordinator job XML and write the job information into a DB
090 * table.
091 * <p/>
092 * Specifically it performs the following functions: 1. Resolve all the variables or properties using job
093 * configurations. 2. Insert all datasets definition as part of the <data-in> and <data-out> tags. 3. Validate the XML
094 * at runtime.
095 */
096 public class CoordSubmitXCommand extends SubmitTransitionXCommand {
097
098 private Configuration conf;
099 private final String bundleId;
100 private final String coordName;
101 private boolean dryrun;
102 private JPAService jpaService = null;
103 private CoordinatorJob.Status prevStatus = CoordinatorJob.Status.PREP;
104
105 public static final String CONFIG_DEFAULT = "coord-config-default.xml";
106 public static final String COORDINATOR_XML_FILE = "coordinator.xml";
107 public final String COORD_INPUT_EVENTS ="input-events";
108 public final String COORD_OUTPUT_EVENTS = "output-events";
109 public final String COORD_INPUT_EVENTS_DATA_IN ="data-in";
110 public final String COORD_OUTPUT_EVENTS_DATA_OUT = "data-out";
111
112 private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>();
113 private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>();
114
115 private CoordinatorJobBean coordJob = null;
116 /**
117 * Default timeout for normal jobs, in minutes, after which coordinator input check will timeout
118 */
119 public static final String CONF_DEFAULT_TIMEOUT_NORMAL = Service.CONF_PREFIX + "coord.normal.default.timeout";
120
121 public static final String CONF_DEFAULT_CONCURRENCY = Service.CONF_PREFIX + "coord.default.concurrency";
122
123 public static final String CONF_DEFAULT_THROTTLE = Service.CONF_PREFIX + "coord.default.throttle";
124
125 public static final String CONF_MAT_THROTTLING_FACTOR = Service.CONF_PREFIX
126 + "coord.materialization.throttling.factor";
127
128 /**
129 * Default MAX timeout in minutes, after which coordinator input check will timeout
130 */
131 public static final String CONF_DEFAULT_MAX_TIMEOUT = Service.CONF_PREFIX + "coord.default.max.timeout";
132
133
134 public static final String CONF_QUEUE_SIZE = Service.CONF_PREFIX + "CallableQueueService.queue.size";
135
136 private ELEvaluator evalFreq = null;
137 private ELEvaluator evalNofuncs = null;
138 private ELEvaluator evalData = null;
139 private ELEvaluator evalInst = null;
140 private ELEvaluator evalAction = null;
141 private ELEvaluator evalSla = null;
142
143 static {
144 String[] badUserProps = { PropertiesUtils.YEAR, PropertiesUtils.MONTH, PropertiesUtils.DAY,
145 PropertiesUtils.HOUR, PropertiesUtils.MINUTE, PropertiesUtils.DAYS, PropertiesUtils.HOURS,
146 PropertiesUtils.MINUTES, PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB,
147 PropertiesUtils.TB, PropertiesUtils.PB, PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN,
148 PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN, PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS };
149 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES);
150
151 String[] badDefaultProps = { PropertiesUtils.HADOOP_USER};
152 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES);
153 PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES);
154 }
155
156 /**
157 * Constructor to create the Coordinator Submit Command.
158 *
159 * @param conf : Configuration for Coordinator job
160 */
161 public CoordSubmitXCommand(Configuration conf) {
162 super("coord_submit", "coord_submit", 1);
163 this.conf = ParamChecker.notNull(conf, "conf");
164 this.bundleId = null;
165 this.coordName = null;
166 }
167
168 /**
169 * Constructor to create the Coordinator Submit Command by bundle job.
170 *
171 * @param conf : Configuration for Coordinator job
172 * @param bundleId : bundle id
173 * @param coordName : coord name
174 */
175 public CoordSubmitXCommand(Configuration conf, String bundleId, String coordName) {
176 super("coord_submit", "coord_submit", 1);
177 this.conf = ParamChecker.notNull(conf, "conf");
178 this.bundleId = ParamChecker.notEmpty(bundleId, "bundleId");
179 this.coordName = ParamChecker.notEmpty(coordName, "coordName");
180 }
181
182 /**
183 * Constructor to create the Coordinator Submit Command.
184 *
185 * @param dryrun : if dryrun
186 * @param conf : Configuration for Coordinator job
187 */
188 public CoordSubmitXCommand(boolean dryrun, Configuration conf) {
189 this(conf);
190 this.dryrun = dryrun;
191 }
192
193 /* (non-Javadoc)
194 * @see org.apache.oozie.command.XCommand#execute()
195 */
196 @Override
197 protected String submit() throws CommandException {
198 String jobId = null;
199 LOG.info("STARTED Coordinator Submit");
200 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
201
202 boolean exceptionOccured = false;
203 try {
204 mergeDefaultConfig();
205
206 String appXml = readAndValidateXml();
207 coordJob.setOrigJobXml(appXml);
208 LOG.debug("jobXml after initial validation " + XmlUtils.prettyPrint(appXml).toString());
209
210 Element eXml = XmlUtils.parseXml(appXml);
211
212 String appNamespace = readAppNamespace(eXml);
213 coordJob.setAppNamespace(appNamespace);
214
215 ParameterVerifier.verifyParameters(conf, eXml);
216
217 appXml = XmlUtils.removeComments(appXml);
218 initEvaluators();
219 Element eJob = basicResolveAndIncludeDS(appXml, conf, coordJob);
220
221 validateCoordinatorJob();
222
223 // checking if the coordinator application data input/output events
224 // specify multiple data instance values in erroneous manner
225 checkMultipleTimeInstances(eJob, COORD_INPUT_EVENTS, COORD_INPUT_EVENTS_DATA_IN);
226 checkMultipleTimeInstances(eJob, COORD_OUTPUT_EVENTS, COORD_OUTPUT_EVENTS_DATA_OUT);
227
228 LOG.debug("jobXml after all validation " + XmlUtils.prettyPrint(eJob).toString());
229
230 jobId = storeToDB(eJob, coordJob);
231 // log job info for coordinator job
232 LogUtils.setLogInfo(coordJob, logInfo);
233 LOG = XLog.resetPrefix(LOG);
234
235 if (!dryrun) {
236 // submit a command to materialize jobs for the next 1 hour (3600 secs)
237 // so we don't wait 10 mins for the Service to run.
238 queue(new CoordMaterializeTransitionXCommand(jobId, 3600), 100);
239 }
240 else {
241 Date startTime = coordJob.getStartTime();
242 long startTimeMilli = startTime.getTime();
243 long endTimeMilli = startTimeMilli + (3600 * 1000);
244 Date jobEndTime = coordJob.getEndTime();
245 Date endTime = new Date(endTimeMilli);
246 if (endTime.compareTo(jobEndTime) > 0) {
247 endTime = jobEndTime;
248 }
249 jobId = coordJob.getId();
250 LOG.info("[" + jobId + "]: Update status to RUNNING");
251 coordJob.setStatus(Job.Status.RUNNING);
252 coordJob.setPending();
253 CoordActionMaterializeCommand coordActionMatCom = new CoordActionMaterializeCommand(jobId, startTime,
254 endTime);
255 Configuration jobConf = null;
256 try {
257 jobConf = new XConfiguration(new StringReader(coordJob.getConf()));
258 }
259 catch (IOException e1) {
260 LOG.warn("Configuration parse error. read from DB :" + coordJob.getConf(), e1);
261 }
262 String action = coordActionMatCom.materializeJobs(true, coordJob, jobConf, null);
263 String output = coordJob.getJobXml() + System.getProperty("line.separator")
264 + "***actions for instance***" + action;
265 return output;
266 }
267 }
268 catch (JDOMException jex) {
269 exceptionOccured = true;
270 LOG.warn("ERROR: ", jex);
271 throw new CommandException(ErrorCode.E0700, jex.getMessage(), jex);
272 }
273 catch (CoordinatorJobException cex) {
274 exceptionOccured = true;
275 LOG.warn("ERROR: ", cex);
276 throw new CommandException(cex);
277 }
278 catch (ParameterVerifierException pex) {
279 exceptionOccured = true;
280 LOG.warn("ERROR: ", pex);
281 throw new CommandException(pex);
282 }
283 catch (IllegalArgumentException iex) {
284 exceptionOccured = true;
285 LOG.warn("ERROR: ", iex);
286 throw new CommandException(ErrorCode.E1003, iex.getMessage(), iex);
287 }
288 catch (Exception ex) {
289 exceptionOccured = true;
290 LOG.warn("ERROR: ", ex);
291 throw new CommandException(ErrorCode.E0803, ex.getMessage(), ex);
292 }
293 finally {
294 if (exceptionOccured) {
295 if(coordJob.getId() == null || coordJob.getId().equalsIgnoreCase("")){
296 coordJob.setStatus(CoordinatorJob.Status.FAILED);
297 coordJob.resetPending();
298 }
299 }
300 }
301
302 LOG.info("ENDED Coordinator Submit jobId=" + jobId);
303 return jobId;
304 }
305
306 /**
307 * Method that validates values in the definition for correctness. Placeholder to add more.
308 */
309 private void validateCoordinatorJob() {
310 // check if startTime < endTime
311 if (coordJob.getStartTime().after(coordJob.getEndTime())) {
312 throw new IllegalArgumentException("Coordinator Start Time cannot be greater than End Time.");
313 }
314 }
315
316 /*
317 * Check against multiple data instance values inside a single <instance> <start-instance> or <end-instance> tag
318 * If found, the job is not submitted and user is informed to correct the error, instead of defaulting to the first instance value in the list
319 */
320 private void checkMultipleTimeInstances(Element eCoordJob, String eventType, String dataType) throws CoordinatorJobException {
321 Element eventsSpec, dataSpec, instance;
322 List<Element> instanceSpecList;
323 Namespace ns = eCoordJob.getNamespace();
324 String instanceValue;
325 eventsSpec = eCoordJob.getChild(eventType, ns);
326 if (eventsSpec != null) {
327 dataSpec = eventsSpec.getChild(dataType, ns);
328 if (dataSpec != null) {
329 // In case of input-events, there can be multiple child <instance> datasets. Iterating to ensure none of them have errors
330 instanceSpecList = dataSpec.getChildren("instance", ns);
331 Iterator instanceIter = instanceSpecList.iterator();
332 while(instanceIter.hasNext()) {
333 instance = ((Element) instanceIter.next());
334 if(instance.getContentSize() == 0) { //empty string or whitespace
335 throw new CoordinatorJobException(ErrorCode.E1021, "<instance> tag within " + eventType + " is empty!");
336 }
337 instanceValue = instance.getContent(0).toString();
338 boolean isInvalid = false;
339 try {
340 isInvalid = evalAction.checkForExistence(instanceValue, ",");
341 } catch (Exception e) {
342 handleELParseException(eventType, dataType, instanceValue);
343 }
344 if (isInvalid) { // reaching this block implies instance is not empty i.e. length > 0
345 handleExpresionWithMultipleInstances(eventType, dataType, instanceValue);
346 }
347 }
348
349 // In case of input-events, there can be multiple child <start-instance> datasets. Iterating to ensure none of them have errors
350 instanceSpecList = dataSpec.getChildren("start-instance", ns);
351 instanceIter = instanceSpecList.iterator();
352 while(instanceIter.hasNext()) {
353 instance = ((Element) instanceIter.next());
354 if(instance.getContentSize() == 0) { //empty string or whitespace
355 throw new CoordinatorJobException(ErrorCode.E1021, "<start-instance> tag within " + eventType + " is empty!");
356 }
357 instanceValue = instance.getContent(0).toString();
358 boolean isInvalid = false;
359 try {
360 isInvalid = evalAction.checkForExistence(instanceValue, ",");
361 } catch (Exception e) {
362 handleELParseException(eventType, dataType, instanceValue);
363 }
364 if (isInvalid) { // reaching this block implies start instance is not empty i.e. length > 0
365 handleExpresionWithStartMultipleInstances(eventType, dataType, instanceValue);
366 }
367 }
368
369 // In case of input-events, there can be multiple child <end-instance> datasets. Iterating to ensure none of them have errors
370 instanceSpecList = dataSpec.getChildren("end-instance", ns);
371 instanceIter = instanceSpecList.iterator();
372 while(instanceIter.hasNext()) {
373 instance = ((Element) instanceIter.next());
374 if(instance.getContentSize() == 0) { //empty string or whitespace
375 throw new CoordinatorJobException(ErrorCode.E1021, "<end-instance> tag within " + eventType + " is empty!");
376 }
377 instanceValue = instance.getContent(0).toString();
378 boolean isInvalid = false;
379 try {
380 isInvalid = evalAction.checkForExistence(instanceValue, ",");
381 } catch (Exception e) {
382 handleELParseException(eventType, dataType, instanceValue);
383 }
384 if (isInvalid) { // reaching this block implies instance is not empty i.e. length > 0
385 handleExpresionWithMultipleEndInstances(eventType, dataType, instanceValue);
386 }
387 }
388
389 }
390 }
391 }
392
393 private void handleELParseException(String eventType, String dataType, String instanceValue)
394 throws CoordinatorJobException {
395 String correctAction = null;
396 if(dataType.equals(COORD_INPUT_EVENTS_DATA_IN)) {
397 correctAction = "Coordinator app definition should have valid <instance> tag for data-in";
398 } else if(dataType.equals(COORD_OUTPUT_EVENTS_DATA_OUT)) {
399 correctAction = "Coordinator app definition should have valid <instance> tag for data-out";
400 }
401 throw new CoordinatorJobException(ErrorCode.E1021, eventType + " instance '" + instanceValue
402 + "' is not valid. Coordinator job NOT SUBMITTED. " + correctAction);
403 }
404
405 private void handleExpresionWithMultipleInstances(String eventType, String dataType, String instanceValue)
406 throws CoordinatorJobException {
407 String correctAction = null;
408 if(dataType.equals(COORD_INPUT_EVENTS_DATA_IN)) {
409 correctAction = "Coordinator app definition should have separate <instance> tag per data-in instance";
410 } else if(dataType.equals(COORD_OUTPUT_EVENTS_DATA_OUT)) {
411 correctAction = "Coordinator app definition can have only one <instance> tag per data-out instance";
412 }
413 throw new CoordinatorJobException(ErrorCode.E1021, eventType + " instance '" + instanceValue
414 + "' contains more than one date instance. Coordinator job NOT SUBMITTED. " + correctAction);
415 }
416
417 private void handleExpresionWithStartMultipleInstances(String eventType, String dataType, String instanceValue)
418 throws CoordinatorJobException {
419 String correctAction = "Coordinator app definition should not have multiple start-instances";
420 throw new CoordinatorJobException(ErrorCode.E1021, eventType + " start-instance '" + instanceValue
421 + "' contains more than one date start-instance. Coordinator job NOT SUBMITTED. " + correctAction);
422 }
423
424 private void handleExpresionWithMultipleEndInstances(String eventType, String dataType, String instanceValue)
425 throws CoordinatorJobException {
426 String correctAction = "Coordinator app definition should not have multiple end-instances";
427 throw new CoordinatorJobException(ErrorCode.E1021, eventType + " end-instance '" + instanceValue
428 + "' contains more than one date end-instance. Coordinator job NOT SUBMITTED. " + correctAction);
429 }
430
431 /**
432 * Read the application XML and validate against coordinator Schema
433 *
434 * @return validated coordinator XML
435 * @throws CoordinatorJobException thrown if unable to read or validate coordinator xml
436 */
437 private String readAndValidateXml() throws CoordinatorJobException {
438 String appPath = ParamChecker.notEmpty(conf.get(OozieClient.COORDINATOR_APP_PATH),
439 OozieClient.COORDINATOR_APP_PATH);
440 String coordXml = readDefinition(appPath);
441 validateXml(coordXml);
442 return coordXml;
443 }
444
445 /**
446 * Validate against Coordinator XSD file
447 *
448 * @param xmlContent : Input coordinator xml
449 * @throws CoordinatorJobException thrown if unable to validate coordinator xml
450 */
451 private void validateXml(String xmlContent) throws CoordinatorJobException {
452 javax.xml.validation.Schema schema = Services.get().get(SchemaService.class).getSchema(SchemaName.COORDINATOR);
453 Validator validator = schema.newValidator();
454 try {
455 validator.validate(new StreamSource(new StringReader(xmlContent)));
456 }
457 catch (SAXException ex) {
458 LOG.warn("SAXException :", ex);
459 throw new CoordinatorJobException(ErrorCode.E0701, ex.getMessage(), ex);
460 }
461 catch (IOException ex) {
462 LOG.warn("IOException :", ex);
463 throw new CoordinatorJobException(ErrorCode.E0702, ex.getMessage(), ex);
464 }
465 }
466
467 /**
468 * Read the application XML schema namespace
469 *
470 * @param coordXmlElement input coordinator xml Element
471 * @return app xml namespace
472 * @throws CoordinatorJobException
473 */
474 private String readAppNamespace(Element coordXmlElement) throws CoordinatorJobException {
475 Namespace ns = coordXmlElement.getNamespace();
476 if (ns != null && bundleId != null && ns.getURI().equals(SchemaService.COORDINATOR_NAMESPACE_URI_1)) {
477 throw new CoordinatorJobException(ErrorCode.E1319, "bundle app can not submit coordinator namespace "
478 + SchemaService.COORDINATOR_NAMESPACE_URI_1 + ", please use 0.2 or later");
479 }
480 if (ns != null) {
481 return ns.getURI();
482 }
483 else {
484 throw new CoordinatorJobException(ErrorCode.E0700, "the application xml namespace is not given");
485 }
486 }
487
488 /**
489 * Merge default configuration with user-defined configuration.
490 *
491 * @throws CommandException thrown if failed to read or merge configurations
492 */
493 protected void mergeDefaultConfig() throws CommandException {
494 Path configDefault = null;
495 try {
496 String coordAppPathStr = conf.get(OozieClient.COORDINATOR_APP_PATH);
497 Path coordAppPath = new Path(coordAppPathStr);
498 String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
499 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
500 Configuration fsConf = has.createJobConf(coordAppPath.toUri().getAuthority());
501 FileSystem fs = has.createFileSystem(user, coordAppPath.toUri(), fsConf);
502
503 // app path could be a directory
504 if (!fs.isFile(coordAppPath)) {
505 configDefault = new Path(coordAppPath, CONFIG_DEFAULT);
506 } else {
507 configDefault = new Path(coordAppPath.getParent(), CONFIG_DEFAULT);
508 }
509
510 if (fs.exists(configDefault)) {
511 Configuration defaultConf = new XConfiguration(fs.open(configDefault));
512 PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES);
513 XConfiguration.injectDefaults(defaultConf, conf);
514 }
515 else {
516 LOG.info("configDefault Doesn't exist " + configDefault);
517 }
518 PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES);
519
520 // Resolving all variables in the job properties.
521 // This ensures the Hadoop Configuration semantics is preserved.
522 XConfiguration resolvedVarsConf = new XConfiguration();
523 for (Map.Entry<String, String> entry : conf) {
524 resolvedVarsConf.set(entry.getKey(), conf.get(entry.getKey()));
525 }
526 conf = resolvedVarsConf;
527 }
528 catch (IOException e) {
529 throw new CommandException(ErrorCode.E0702, e.getMessage() + " : Problem reading default config "
530 + configDefault, e);
531 }
532 catch (HadoopAccessorException e) {
533 throw new CommandException(e);
534 }
535 LOG.debug("Merged CONF :" + XmlUtils.prettyPrint(conf).toString());
536 }
537
538 /**
539 * The method resolve all the variables that are defined in configuration. It also include the data set definition
540 * from dataset file into XML.
541 *
542 * @param appXml : Original job XML
543 * @param conf : Configuration of the job
544 * @param coordJob : Coordinator job bean to be populated.
545 * @return Resolved and modified job XML element.
546 * @throws CoordinatorJobException thrown if failed to resolve basic entities or include referred datasets
547 * @throws Exception thrown if failed to resolve basic entities or include referred datasets
548 */
549 public Element basicResolveAndIncludeDS(String appXml, Configuration conf, CoordinatorJobBean coordJob)
550 throws CoordinatorJobException, Exception {
551 Element basicResolvedApp = resolveInitial(conf, appXml, coordJob);
552 includeDataSets(basicResolvedApp, conf);
553 return basicResolvedApp;
554 }
555
556 /**
557 * Insert data set into data-in and data-out tags.
558 *
559 * @param eAppXml : coordinator application XML
560 * @param eDatasets : DataSet XML
561 */
562 @SuppressWarnings("unchecked")
563 private void insertDataSet(Element eAppXml, Element eDatasets) {
564 // Adding DS definition in the coordinator XML
565 Element inputList = eAppXml.getChild("input-events", eAppXml.getNamespace());
566 if (inputList != null) {
567 for (Element dataIn : (List<Element>) inputList.getChildren("data-in", eAppXml.getNamespace())) {
568 Element eDataset = findDataSet(eDatasets, dataIn.getAttributeValue("dataset"));
569 dataIn.getContent().add(0, eDataset);
570 }
571 }
572 Element outputList = eAppXml.getChild("output-events", eAppXml.getNamespace());
573 if (outputList != null) {
574 for (Element dataOut : (List<Element>) outputList.getChildren("data-out", eAppXml.getNamespace())) {
575 Element eDataset = findDataSet(eDatasets, dataOut.getAttributeValue("dataset"));
576 dataOut.getContent().add(0, eDataset);
577 }
578 }
579 }
580
581 /**
582 * Find a specific dataset from a list of Datasets.
583 *
584 * @param eDatasets : List of data sets
585 * @param name : queried data set name
586 * @return one Dataset element. otherwise throw Exception
587 */
588 @SuppressWarnings("unchecked")
589 private static Element findDataSet(Element eDatasets, String name) {
590 for (Element eDataset : (List<Element>) eDatasets.getChildren("dataset", eDatasets.getNamespace())) {
591 if (eDataset.getAttributeValue("name").equals(name)) {
592 eDataset = (Element) eDataset.clone();
593 eDataset.detach();
594 return eDataset;
595 }
596 }
597 throw new RuntimeException("undefined dataset: " + name);
598 }
599
600 /**
601 * Initialize all the required EL Evaluators.
602 */
603 protected void initEvaluators() {
604 evalFreq = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-submit-freq");
605 evalNofuncs = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-submit-nofuncs");
606 evalInst = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-submit-instances");
607 evalSla = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-sla-submit");
608 evalAction = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-action-start");
609 }
610
611 /**
612 * Resolve basic entities using job Configuration.
613 *
614 * @param conf :Job configuration
615 * @param appXml : Original job XML
616 * @param coordJob : Coordinator job bean to be populated.
617 * @return Resolved job XML element.
618 * @throws CoordinatorJobException thrown if failed to resolve basic entities
619 * @throws Exception thrown if failed to resolve basic entities
620 */
621 @SuppressWarnings("unchecked")
622 protected Element resolveInitial(Configuration conf, String appXml, CoordinatorJobBean coordJob)
623 throws CoordinatorJobException, Exception {
624 Element eAppXml = XmlUtils.parseXml(appXml);
625 // job's main attributes
626 // frequency
627 String val = resolveAttribute("frequency", eAppXml, evalFreq);
628 int ival = ParamChecker.checkInteger(val, "frequency");
629 ParamChecker.checkGTZero(ival, "frequency");
630 coordJob.setFrequency(Integer.toString(ival));
631 TimeUnit tmp = (evalFreq.getVariable("timeunit") == null) ? TimeUnit.MINUTE : ((TimeUnit) evalFreq
632 .getVariable("timeunit"));
633 addAnAttribute("freq_timeunit", eAppXml, tmp.toString());
634 // TimeUnit
635 coordJob.setTimeUnit(CoordinatorJob.Timeunit.valueOf(tmp.toString()));
636 // End Of Duration
637 tmp = evalFreq.getVariable("endOfDuration") == null ? TimeUnit.NONE : ((TimeUnit) evalFreq
638 .getVariable("endOfDuration"));
639 addAnAttribute("end_of_duration", eAppXml, tmp.toString());
640 // coordJob.setEndOfDuration(tmp) // TODO: Add new attribute in Job bean
641
642 // Application name
643 if (this.coordName == null) {
644 String name = ELUtils.resolveAppName(eAppXml.getAttribute("name").getValue(), conf);
645 coordJob.setAppName(name);
646 }
647 else {
648 // this coord job is created from bundle
649 coordJob.setAppName(this.coordName);
650 }
651
652 // start time
653 val = resolveAttribute("start", eAppXml, evalNofuncs);
654 ParamChecker.checkDateOozieTZ(val, "start");
655 coordJob.setStartTime(DateUtils.parseDateOozieTZ(val));
656 // end time
657 val = resolveAttribute("end", eAppXml, evalNofuncs);
658 ParamChecker.checkDateOozieTZ(val, "end");
659 coordJob.setEndTime(DateUtils.parseDateOozieTZ(val));
660 // Time zone
661 val = resolveAttribute("timezone", eAppXml, evalNofuncs);
662 ParamChecker.checkTimeZone(val, "timezone");
663 coordJob.setTimeZone(val);
664
665 // controls
666 val = resolveTagContents("timeout", eAppXml.getChild("controls", eAppXml.getNamespace()), evalFreq);
667 if (val == "") {
668 val = Services.get().getConf().get(CONF_DEFAULT_TIMEOUT_NORMAL);
669 }
670
671 ival = ParamChecker.checkInteger(val, "timeout");
672 if (ival < 0 || ival > Services.get().getConf().getInt(CONF_DEFAULT_MAX_TIMEOUT, 129600)) {
673 ival = Services.get().getConf().getInt(CONF_DEFAULT_MAX_TIMEOUT, 129600);
674 }
675 coordJob.setTimeout(ival);
676
677 val = resolveTagContents("concurrency", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs);
678 if (val == null || val.isEmpty()) {
679 val = Services.get().getConf().get(CONF_DEFAULT_CONCURRENCY, "1");
680 }
681 ival = ParamChecker.checkInteger(val, "concurrency");
682 coordJob.setConcurrency(ival);
683
684 val = resolveTagContents("throttle", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs);
685 if (val == null || val.isEmpty()) {
686 int defaultThrottle = Services.get().getConf().getInt(CONF_DEFAULT_THROTTLE, 12);
687 ival = defaultThrottle;
688 }
689 else {
690 ival = ParamChecker.checkInteger(val, "throttle");
691 }
692 int maxQueue = Services.get().getConf().getInt(CONF_QUEUE_SIZE, 10000);
693 float factor = Services.get().getConf().getFloat(CONF_MAT_THROTTLING_FACTOR, 0.10f);
694 int maxThrottle = (int) (maxQueue * factor);
695 if (ival > maxThrottle || ival < 1) {
696 ival = maxThrottle;
697 }
698 LOG.debug("max throttle " + ival);
699 coordJob.setMatThrottling(ival);
700
701 val = resolveTagContents("execution", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs);
702 if (val == "") {
703 val = Execution.FIFO.toString();
704 }
705 coordJob.setExecution(Execution.valueOf(val));
706 String[] acceptedVals = { Execution.LIFO.toString(), Execution.FIFO.toString(), Execution.LAST_ONLY.toString() };
707 ParamChecker.isMember(val, acceptedVals, "execution");
708
709 // datasets
710 resolveTagContents("include", eAppXml.getChild("datasets", eAppXml.getNamespace()), evalNofuncs);
711 // for each data set
712 resolveDataSets(eAppXml);
713 HashMap<String, String> dataNameList = new HashMap<String, String>();
714 resolveIOEvents(eAppXml, dataNameList);
715
716 resolveTagContents("app-path", eAppXml.getChild("action", eAppXml.getNamespace()).getChild("workflow",
717 eAppXml.getNamespace()), evalNofuncs);
718 // TODO: If action or workflow tag is missing, NullPointerException will
719 // occur
720 Element configElem = eAppXml.getChild("action", eAppXml.getNamespace()).getChild("workflow",
721 eAppXml.getNamespace()).getChild("configuration", eAppXml.getNamespace());
722 evalData = CoordELEvaluator.createELEvaluatorForDataEcho(conf, "coord-job-submit-data", dataNameList);
723 if (configElem != null) {
724 for (Element propElem : (List<Element>) configElem.getChildren("property", configElem.getNamespace())) {
725 resolveTagContents("name", propElem, evalData);
726 // Want to check the data-integrity but don't want to modify the
727 // XML
728 // for properties only
729 Element tmpProp = (Element) propElem.clone();
730 resolveTagContents("value", tmpProp, evalData);
731 }
732 }
733 resolveSLA(eAppXml, coordJob);
734 return eAppXml;
735 }
736
737 /**
738 * Resolve SLA events
739 *
740 * @param eAppXml job XML
741 * @param coordJob coordinator job bean
742 * @throws CommandException thrown if failed to resolve sla events
743 */
744 private void resolveSLA(Element eAppXml, CoordinatorJobBean coordJob) throws CommandException {
745 Element eSla = XmlUtils.getSLAElement(eAppXml.getChild("action", eAppXml.getNamespace()));
746
747 if (eSla != null) {
748 String slaXml = XmlUtils.prettyPrint(eSla).toString();
749 try {
750 // EL evaluation
751 slaXml = evalSla.evaluate(slaXml, String.class);
752 // Validate against semantic SXD
753 XmlUtils.validateData(slaXml, SchemaName.SLA_ORIGINAL);
754 }
755 catch (Exception e) {
756 throw new CommandException(ErrorCode.E1004, "Validation ERROR :" + e.getMessage(), e);
757 }
758 }
759 }
760
761 /**
762 * Resolve input-events/data-in and output-events/data-out tags.
763 *
764 * @param eJob : Job element
765 * @throws CoordinatorJobException thrown if failed to resolve input and output events
766 */
767 @SuppressWarnings("unchecked")
768 private void resolveIOEvents(Element eJobOrg, HashMap<String, String> dataNameList) throws CoordinatorJobException {
769 // Resolving input-events/data-in
770 // Clone the job and don't update anything in the original
771 Element eJob = (Element) eJobOrg.clone();
772 Element inputList = eJob.getChild("input-events", eJob.getNamespace());
773 if (inputList != null) {
774 TreeSet<String> eventNameSet = new TreeSet<String>();
775 for (Element dataIn : (List<Element>) inputList.getChildren("data-in", eJob.getNamespace())) {
776 String dataInName = dataIn.getAttributeValue("name");
777 dataNameList.put(dataInName, "data-in");
778 // check whether there is any duplicate data-in name
779 if (eventNameSet.contains(dataInName)) {
780 throw new RuntimeException("Duplicate dataIn name " + dataInName);
781 }
782 else {
783 eventNameSet.add(dataInName);
784 }
785 resolveTagContents("instance", dataIn, evalInst);
786 resolveTagContents("start-instance", dataIn, evalInst);
787 resolveTagContents("end-instance", dataIn, evalInst);
788 }
789 }
790 // Resolving output-events/data-out
791 Element outputList = eJob.getChild("output-events", eJob.getNamespace());
792 if (outputList != null) {
793 TreeSet<String> eventNameSet = new TreeSet<String>();
794 for (Element dataOut : (List<Element>) outputList.getChildren("data-out", eJob.getNamespace())) {
795 String dataOutName = dataOut.getAttributeValue("name");
796 dataNameList.put(dataOutName, "data-out");
797 // check whether there is any duplicate data-out name
798 if (eventNameSet.contains(dataOutName)) {
799 throw new RuntimeException("Duplicate dataIn name " + dataOutName);
800 }
801 else {
802 eventNameSet.add(dataOutName);
803 }
804 resolveTagContents("instance", dataOut, evalInst);
805 }
806 }
807
808 }
809
810 /**
811 * Add an attribute into XML element.
812 *
813 * @param attrName :attribute name
814 * @param elem : Element to add attribute
815 * @param value :Value of attribute
816 */
817 private void addAnAttribute(String attrName, Element elem, String value) {
818 elem.setAttribute(attrName, value);
819 }
820
821 /**
822 * Resolve datasets using job configuration.
823 *
824 * @param eAppXml : Job Element XML
825 * @throws Exception thrown if failed to resolve datasets
826 */
827 @SuppressWarnings("unchecked")
828 private void resolveDataSets(Element eAppXml) throws Exception {
829 Element datasetList = eAppXml.getChild("datasets", eAppXml.getNamespace());
830 if (datasetList != null) {
831
832 List<Element> dsElems = datasetList.getChildren("dataset", eAppXml.getNamespace());
833 resolveDataSets(dsElems);
834 resolveTagContents("app-path", eAppXml.getChild("action", eAppXml.getNamespace()).getChild("workflow",
835 eAppXml.getNamespace()), evalNofuncs);
836 }
837 }
838
839 /**
840 * Resolve datasets using job configuration.
841 *
842 * @param dsElems : Data set XML element.
843 * @throws CoordinatorJobException thrown if failed to resolve datasets
844 */
845 private void resolveDataSets(List<Element> dsElems) throws CoordinatorJobException {
846 for (Element dsElem : dsElems) {
847 // Setting up default TimeUnit and EndOFDuraion
848 evalFreq.setVariable("timeunit", TimeUnit.MINUTE);
849 evalFreq.setVariable("endOfDuration", TimeUnit.NONE);
850
851 String val = resolveAttribute("frequency", dsElem, evalFreq);
852 int ival = ParamChecker.checkInteger(val, "frequency");
853 ParamChecker.checkGTZero(ival, "frequency");
854 addAnAttribute("freq_timeunit", dsElem, evalFreq.getVariable("timeunit") == null ? TimeUnit.MINUTE
855 .toString() : ((TimeUnit) evalFreq.getVariable("timeunit")).toString());
856 addAnAttribute("end_of_duration", dsElem, evalFreq.getVariable("endOfDuration") == null ? TimeUnit.NONE
857 .toString() : ((TimeUnit) evalFreq.getVariable("endOfDuration")).toString());
858 val = resolveAttribute("initial-instance", dsElem, evalNofuncs);
859 ParamChecker.checkDateOozieTZ(val, "initial-instance");
860 checkInitialInstance(val);
861 val = resolveAttribute("timezone", dsElem, evalNofuncs);
862 ParamChecker.checkTimeZone(val, "timezone");
863 resolveTagContents("uri-template", dsElem, evalNofuncs);
864 resolveTagContents("done-flag", dsElem, evalNofuncs);
865 }
866 }
867
868 /**
869 * Resolve the content of a tag.
870 *
871 * @param tagName : Tag name of job XML i.e. <timeout> 10 </timeout>
872 * @param elem : Element where the tag exists.
873 * @param eval : EL evealuator
874 * @return Resolved tag content.
875 * @throws CoordinatorJobException thrown if failed to resolve tag content
876 */
877 @SuppressWarnings("unchecked")
878 private String resolveTagContents(String tagName, Element elem, ELEvaluator eval) throws CoordinatorJobException {
879 String ret = "";
880 if (elem != null) {
881 for (Element tagElem : (List<Element>) elem.getChildren(tagName, elem.getNamespace())) {
882 if (tagElem != null) {
883 String updated;
884 try {
885 updated = CoordELFunctions.evalAndWrap(eval, tagElem.getText().trim());
886
887 }
888 catch (Exception e) {
889 throw new CoordinatorJobException(ErrorCode.E1004, e.getMessage(), e);
890 }
891 tagElem.removeContent();
892 tagElem.addContent(updated);
893 ret += updated;
894 }
895 }
896 }
897 return ret;
898 }
899
900 /**
901 * Resolve an attribute value.
902 *
903 * @param attrName : Attribute name.
904 * @param elem : XML Element where attribute is defiend
905 * @param eval : ELEvaluator used to resolve
906 * @return Resolved attribute value
907 * @throws CoordinatorJobException thrown if failed to resolve an attribute value
908 */
909 private String resolveAttribute(String attrName, Element elem, ELEvaluator eval) throws CoordinatorJobException {
910 Attribute attr = elem.getAttribute(attrName);
911 String val = null;
912 if (attr != null) {
913 try {
914 val = CoordELFunctions.evalAndWrap(eval, attr.getValue().trim());
915 }
916 catch (Exception e) {
917 throw new CoordinatorJobException(ErrorCode.E1004, e.getMessage(), e);
918 }
919 attr.setValue(val);
920 }
921 return val;
922 }
923
924 /**
925 * Include referred datasets into XML.
926 *
927 * @param resolvedXml : Job XML element.
928 * @param conf : Job configuration
929 * @throws CoordinatorJobException thrown if failed to include referred datasets into XML
930 */
931 @SuppressWarnings("unchecked")
932 protected void includeDataSets(Element resolvedXml, Configuration conf) throws CoordinatorJobException {
933 Element datasets = resolvedXml.getChild("datasets", resolvedXml.getNamespace());
934 Element allDataSets = new Element("all_datasets", resolvedXml.getNamespace());
935 List<String> dsList = new ArrayList<String>();
936 if (datasets != null) {
937 for (Element includeElem : (List<Element>) datasets.getChildren("include", datasets.getNamespace())) {
938 String incDSFile = includeElem.getTextTrim();
939 includeOneDSFile(incDSFile, dsList, allDataSets, datasets.getNamespace());
940 }
941 for (Element e : (List<Element>) datasets.getChildren("dataset", datasets.getNamespace())) {
942 String dsName = e.getAttributeValue("name");
943 if (dsList.contains(dsName)) {// Override with this DS
944 // Remove duplicate
945 removeDataSet(allDataSets, dsName);
946 }
947 else {
948 dsList.add(dsName);
949 }
950 allDataSets.addContent((Element) e.clone());
951 }
952 }
953 insertDataSet(resolvedXml, allDataSets);
954 resolvedXml.removeChild("datasets", resolvedXml.getNamespace());
955 }
956
957 /**
958 * Include one dataset file.
959 *
960 * @param incDSFile : Include data set filename.
961 * @param dsList :List of dataset names to verify the duplicate.
962 * @param allDataSets : Element that includes all dataset definitions.
963 * @param dsNameSpace : Data set name space
964 * @throws CoordinatorJobException thrown if failed to include one dataset file
965 */
966 @SuppressWarnings("unchecked")
967 private void includeOneDSFile(String incDSFile, List<String> dsList, Element allDataSets, Namespace dsNameSpace)
968 throws CoordinatorJobException {
969 Element tmpDataSets = null;
970 try {
971 String dsXml = readDefinition(incDSFile);
972 LOG.debug("DSFILE :" + incDSFile + "\n" + dsXml);
973 tmpDataSets = XmlUtils.parseXml(dsXml);
974 }
975 catch (JDOMException e) {
976 LOG.warn("Error parsing included dataset [{0}]. Message [{1}]", incDSFile, e.getMessage());
977 throw new CoordinatorJobException(ErrorCode.E0700, e.getMessage());
978 }
979 resolveDataSets(tmpDataSets.getChildren("dataset"));
980 for (Element e : (List<Element>) tmpDataSets.getChildren("dataset")) {
981 String dsName = e.getAttributeValue("name");
982 if (dsList.contains(dsName)) {
983 throw new RuntimeException("Duplicate Dataset " + dsName);
984 }
985 dsList.add(dsName);
986 Element tmp = (Element) e.clone();
987 // TODO: Don't like to over-write the external/include DS's namespace
988 tmp.setNamespace(dsNameSpace);
989 tmp.getChild("uri-template").setNamespace(dsNameSpace);
990 if (e.getChild("done-flag") != null) {
991 tmp.getChild("done-flag").setNamespace(dsNameSpace);
992 }
993 allDataSets.addContent(tmp);
994 }
995 // nested include
996 for (Element includeElem : (List<Element>) tmpDataSets.getChildren("include", tmpDataSets.getNamespace())) {
997 String incFile = includeElem.getTextTrim();
998 includeOneDSFile(incFile, dsList, allDataSets, dsNameSpace);
999 }
1000 }
1001
1002 /**
1003 * Remove a dataset from a list of dataset.
1004 *
1005 * @param eDatasets : List of dataset
1006 * @param name : Dataset name to be removed.
1007 */
1008 @SuppressWarnings("unchecked")
1009 private static void removeDataSet(Element eDatasets, String name) {
1010 for (Element eDataset : (List<Element>) eDatasets.getChildren("dataset", eDatasets.getNamespace())) {
1011 if (eDataset.getAttributeValue("name").equals(name)) {
1012 eDataset.detach();
1013 return;
1014 }
1015 }
1016 throw new RuntimeException("undefined dataset: " + name);
1017 }
1018
1019 /**
1020 * Read coordinator definition.
1021 *
1022 * @param appPath application path.
1023 * @return coordinator definition.
1024 * @throws CoordinatorJobException thrown if the definition could not be read.
1025 */
1026 protected String readDefinition(String appPath) throws CoordinatorJobException {
1027 String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
1028 // Configuration confHadoop = CoordUtils.getHadoopConf(conf);
1029 try {
1030 URI uri = new URI(appPath);
1031 LOG.debug("user =" + user);
1032 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
1033 Configuration fsConf = has.createJobConf(uri.getAuthority());
1034 FileSystem fs = has.createFileSystem(user, uri, fsConf);
1035 Path appDefPath = null;
1036
1037 // app path could be a directory
1038 Path path = new Path(uri.getPath());
1039 // check file exists for dataset include file, app xml already checked
1040 if (!fs.exists(path)) {
1041 throw new URISyntaxException(path.toString(), "path not existed : " + path.toString());
1042 }
1043 if (!fs.isFile(path)) {
1044 appDefPath = new Path(path, COORDINATOR_XML_FILE);
1045 } else {
1046 appDefPath = path;
1047 }
1048
1049 Reader reader = new InputStreamReader(fs.open(appDefPath));
1050 StringWriter writer = new StringWriter();
1051 IOUtils.copyCharStream(reader, writer);
1052 return writer.toString();
1053 }
1054 catch (IOException ex) {
1055 LOG.warn("IOException :" + XmlUtils.prettyPrint(conf), ex);
1056 throw new CoordinatorJobException(ErrorCode.E1001, ex.getMessage(), ex);
1057 }
1058 catch (URISyntaxException ex) {
1059 LOG.warn("URISyException :" + ex.getMessage());
1060 throw new CoordinatorJobException(ErrorCode.E1002, appPath, ex.getMessage(), ex);
1061 }
1062 catch (HadoopAccessorException ex) {
1063 throw new CoordinatorJobException(ex);
1064 }
1065 catch (Exception ex) {
1066 LOG.warn("Exception :", ex);
1067 throw new CoordinatorJobException(ErrorCode.E1001, ex.getMessage(), ex);
1068 }
1069 }
1070
1071 /**
1072 * Write a coordinator job into database
1073 *
1074 * @param eJob : XML element of job
1075 * @param coordJob : Coordinator job bean
1076 * @return Job id
1077 * @throws CommandException thrown if unable to save coordinator job to db
1078 */
1079 private String storeToDB(Element eJob, CoordinatorJobBean coordJob) throws CommandException {
1080 String jobId = Services.get().get(UUIDService.class).generateId(ApplicationType.COORDINATOR);
1081 coordJob.setId(jobId);
1082
1083 coordJob.setAppPath(conf.get(OozieClient.COORDINATOR_APP_PATH));
1084 coordJob.setCreatedTime(new Date());
1085 coordJob.setUser(conf.get(OozieClient.USER_NAME));
1086 String group = ConfigUtils.getWithDeprecatedCheck(conf, OozieClient.JOB_ACL, OozieClient.GROUP_NAME, null);
1087 coordJob.setGroup(group);
1088 coordJob.setConf(XmlUtils.prettyPrint(conf).toString());
1089 coordJob.setJobXml(XmlUtils.prettyPrint(eJob).toString());
1090 coordJob.setLastActionNumber(0);
1091 coordJob.setLastModifiedTime(new Date());
1092
1093 if (!dryrun) {
1094 coordJob.setLastModifiedTime(new Date());
1095 try {
1096 jpaService.execute(new CoordJobInsertJPAExecutor(coordJob));
1097 }
1098 catch (JPAExecutorException jpaee) {
1099 coordJob.setId(null);
1100 coordJob.setStatus(CoordinatorJob.Status.FAILED);
1101 throw new CommandException(jpaee);
1102 }
1103 }
1104 return jobId;
1105 }
1106
1107 /*
1108 * this method checks if the initial-instance specified for a particular
1109 is not a date earlier than the oozie server default Jan 01, 1970 00:00Z UTC
1110 */
1111 private void checkInitialInstance(String val) throws CoordinatorJobException, IllegalArgumentException {
1112 Date initialInstance, givenInstance;
1113 try {
1114 initialInstance = DateUtils.parseDateUTC("1970-01-01T00:00Z");
1115 givenInstance = DateUtils.parseDateOozieTZ(val);
1116 }
1117 catch (Exception e) {
1118 throw new IllegalArgumentException("Unable to parse dataset initial-instance string '" + val +
1119 "' to Date object. ",e);
1120 }
1121 if(givenInstance.compareTo(initialInstance) < 0) {
1122 throw new CoordinatorJobException(ErrorCode.E1021, "Dataset initial-instance " + val +
1123 " is earlier than the default initial instance " + DateUtils.formatDateOozieTZ(initialInstance));
1124 }
1125 }
1126
1127 /* (non-Javadoc)
1128 * @see org.apache.oozie.command.XCommand#getEntityKey()
1129 */
1130 @Override
1131 public String getEntityKey() {
1132 return null;
1133 }
1134
1135 /* (non-Javadoc)
1136 * @see org.apache.oozie.command.XCommand#isLockRequired()
1137 */
1138 @Override
1139 protected boolean isLockRequired() {
1140 return false;
1141 }
1142
1143 /* (non-Javadoc)
1144 * @see org.apache.oozie.command.XCommand#loadState()
1145 */
1146 @Override
1147 protected void loadState() throws CommandException {
1148 jpaService = Services.get().get(JPAService.class);
1149 if (jpaService == null) {
1150 throw new CommandException(ErrorCode.E0610);
1151 }
1152 coordJob = new CoordinatorJobBean();
1153 if (this.bundleId != null) {
1154 // this coord job is created from bundle
1155 coordJob.setBundleId(this.bundleId);
1156 // first use bundle id if submit thru bundle
1157 logInfo.setParameter(DagXLogInfoService.JOB, this.bundleId);
1158 LogUtils.setLogInfo(logInfo);
1159 }
1160 if (this.coordName != null) {
1161 // this coord job is created from bundle
1162 coordJob.setAppName(this.coordName);
1163 }
1164 setJob(coordJob);
1165
1166 }
1167
1168 /* (non-Javadoc)
1169 * @see org.apache.oozie.command.XCommand#verifyPrecondition()
1170 */
1171 @Override
1172 protected void verifyPrecondition() throws CommandException {
1173
1174 }
1175
1176 /* (non-Javadoc)
1177 * @see org.apache.oozie.command.TransitionXCommand#notifyParent()
1178 */
1179 @Override
1180 public void notifyParent() throws CommandException {
1181 // update bundle action
1182 if (coordJob.getBundleId() != null) {
1183 LOG.debug("Updating bundle record: " + coordJob.getBundleId() + " for coord id: " + coordJob.getId());
1184 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus);
1185 bundleStatusUpdate.call();
1186 }
1187 }
1188
1189 /* (non-Javadoc)
1190 * @see org.apache.oozie.command.TransitionXCommand#updateJob()
1191 */
1192 @Override
1193 public void updateJob() throws CommandException {
1194 }
1195
1196 /* (non-Javadoc)
1197 * @see org.apache.oozie.command.TransitionXCommand#getJob()
1198 */
1199 @Override
1200 public Job getJob() {
1201 return coordJob;
1202 }
1203
1204 @Override
1205 public void performWrites() throws CommandException {
1206 }
1207 }