This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.coord;
019
020 import java.io.IOException;
021 import java.io.InputStreamReader;
022 import java.io.Reader;
023 import java.io.StringReader;
024 import java.io.StringWriter;
025 import java.net.URI;
026 import java.net.URISyntaxException;
027 import java.util.ArrayList;
028 import java.util.Date;
029 import java.util.HashMap;
030 import java.util.HashSet;
031 import java.util.Iterator;
032 import java.util.List;
033 import java.util.Map;
034 import java.util.Set;
035 import java.util.TreeSet;
036
037 import javax.xml.transform.stream.StreamSource;
038 import javax.xml.validation.Validator;
039
040 import org.apache.hadoop.conf.Configuration;
041 import org.apache.hadoop.fs.FileSystem;
042 import org.apache.hadoop.fs.Path;
043 import org.apache.oozie.CoordinatorJobBean;
044 import org.apache.oozie.ErrorCode;
045 import org.apache.oozie.client.CoordinatorJob;
046 import org.apache.oozie.client.Job;
047 import org.apache.oozie.client.OozieClient;
048 import org.apache.oozie.client.CoordinatorJob.Execution;
049 import org.apache.oozie.command.CommandException;
050 import org.apache.oozie.command.SubmitTransitionXCommand;
051 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
052 import org.apache.oozie.coord.CoordELEvaluator;
053 import org.apache.oozie.coord.CoordELFunctions;
054 import org.apache.oozie.coord.CoordinatorJobException;
055 import org.apache.oozie.coord.TimeUnit;
056 import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor;
057 import org.apache.oozie.executor.jpa.JPAExecutorException;
058 import org.apache.oozie.service.DagXLogInfoService;
059 import org.apache.oozie.service.HadoopAccessorException;
060 import org.apache.oozie.service.HadoopAccessorService;
061 import org.apache.oozie.service.JPAService;
062 import org.apache.oozie.service.SchemaService;
063 import org.apache.oozie.service.Service;
064 import org.apache.oozie.service.Services;
065 import org.apache.oozie.service.UUIDService;
066 import org.apache.oozie.service.SchemaService.SchemaName;
067 import org.apache.oozie.service.UUIDService.ApplicationType;
068 import org.apache.oozie.util.ConfigUtils;
069 import org.apache.oozie.util.DateUtils;
070 import org.apache.oozie.util.ELEvaluator;
071 import org.apache.oozie.util.ELUtils;
072 import org.apache.oozie.util.IOUtils;
073 import org.apache.oozie.util.InstrumentUtils;
074 import org.apache.oozie.util.LogUtils;
075 import org.apache.oozie.util.ParamChecker;
076 import org.apache.oozie.util.ParameterVerifier;
077 import org.apache.oozie.util.ParameterVerifierException;
078 import org.apache.oozie.util.PropertiesUtils;
079 import org.apache.oozie.util.XConfiguration;
080 import org.apache.oozie.util.XLog;
081 import org.apache.oozie.util.XmlUtils;
082 import org.jdom.Attribute;
083 import org.jdom.Element;
084 import org.jdom.JDOMException;
085 import org.jdom.Namespace;
086 import org.xml.sax.SAXException;
087
088 /**
089 * This class provides the functionalities to resolve a coordinator job XML and write the job information into a DB
090 * table.
091 * <p/>
092 * Specifically it performs the following functions: 1. Resolve all the variables or properties using job
093 * configurations. 2. Insert all datasets definition as part of the <data-in> and <data-out> tags. 3. Validate the XML
094 * at runtime.
095 */
096 public class CoordSubmitXCommand extends SubmitTransitionXCommand {
097
098 private Configuration conf;
099 private final String authToken;
100 private final String bundleId;
101 private final String coordName;
102 private boolean dryrun;
103 private JPAService jpaService = null;
104 private CoordinatorJob.Status prevStatus = CoordinatorJob.Status.PREP;
105
106 public static final String CONFIG_DEFAULT = "coord-config-default.xml";
107 public static final String COORDINATOR_XML_FILE = "coordinator.xml";
108 public final String COORD_INPUT_EVENTS ="input-events";
109 public final String COORD_OUTPUT_EVENTS = "output-events";
110 public final String COORD_INPUT_EVENTS_DATA_IN ="data-in";
111 public final String COORD_OUTPUT_EVENTS_DATA_OUT = "data-out";
112
113 private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>();
114 private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>();
115
116 private CoordinatorJobBean coordJob = null;
117 /**
118 * Default timeout for normal jobs, in minutes, after which coordinator input check will timeout
119 */
120 public static final String CONF_DEFAULT_TIMEOUT_NORMAL = Service.CONF_PREFIX + "coord.normal.default.timeout";
121
122 public static final String CONF_DEFAULT_CONCURRENCY = Service.CONF_PREFIX + "coord.default.concurrency";
123
124 public static final String CONF_DEFAULT_THROTTLE = Service.CONF_PREFIX + "coord.default.throttle";
125
126 public static final String CONF_MAT_THROTTLING_FACTOR = Service.CONF_PREFIX
127 + "coord.materialization.throttling.factor";
128
129 /**
130 * Default MAX timeout in minutes, after which coordinator input check will timeout
131 */
132 public static final String CONF_DEFAULT_MAX_TIMEOUT = Service.CONF_PREFIX + "coord.default.max.timeout";
133
134
135 public static final String CONF_QUEUE_SIZE = Service.CONF_PREFIX + "CallableQueueService.queue.size";
136
137 private ELEvaluator evalFreq = null;
138 private ELEvaluator evalNofuncs = null;
139 private ELEvaluator evalData = null;
140 private ELEvaluator evalInst = null;
141 private ELEvaluator evalAction = null;
142 private ELEvaluator evalSla = null;
143
144 static {
145 String[] badUserProps = { PropertiesUtils.YEAR, PropertiesUtils.MONTH, PropertiesUtils.DAY,
146 PropertiesUtils.HOUR, PropertiesUtils.MINUTE, PropertiesUtils.DAYS, PropertiesUtils.HOURS,
147 PropertiesUtils.MINUTES, PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB,
148 PropertiesUtils.TB, PropertiesUtils.PB, PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN,
149 PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN, PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS };
150 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES);
151
152 String[] badDefaultProps = { PropertiesUtils.HADOOP_USER};
153 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES);
154 PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES);
155 }
156
157 /**
158 * Constructor to create the Coordinator Submit Command.
159 *
160 * @param conf : Configuration for Coordinator job
161 * @param authToken : To be used for authentication
162 */
163 public CoordSubmitXCommand(Configuration conf, String authToken) {
164 super("coord_submit", "coord_submit", 1);
165 this.conf = ParamChecker.notNull(conf, "conf");
166 this.authToken = ParamChecker.notEmpty(authToken, "authToken");
167 this.bundleId = null;
168 this.coordName = null;
169 }
170
171 /**
172 * Constructor to create the Coordinator Submit Command by bundle job.
173 *
174 * @param conf : Configuration for Coordinator job
175 * @param authToken : To be used for authentication
176 * @param bundleId : bundle id
177 * @param coordName : coord name
178 */
179 public CoordSubmitXCommand(Configuration conf, String authToken, String bundleId, String coordName) {
180 super("coord_submit", "coord_submit", 1);
181 this.conf = ParamChecker.notNull(conf, "conf");
182 this.authToken = ParamChecker.notEmpty(authToken, "authToken");
183 this.bundleId = ParamChecker.notEmpty(bundleId, "bundleId");
184 this.coordName = ParamChecker.notEmpty(coordName, "coordName");
185 }
186
187 /**
188 * Constructor to create the Coordinator Submit Command.
189 *
190 * @param dryrun : if dryrun
191 * @param conf : Configuration for Coordinator job
192 * @param authToken : To be used for authentication
193 */
194 public CoordSubmitXCommand(boolean dryrun, Configuration conf, String authToken) {
195 this(conf, authToken);
196 this.dryrun = dryrun;
197 }
198
199 /* (non-Javadoc)
200 * @see org.apache.oozie.command.XCommand#execute()
201 */
202 @Override
203 protected String submit() throws CommandException {
204 String jobId = null;
205 LOG.info("STARTED Coordinator Submit");
206 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
207
208 boolean exceptionOccured = false;
209 try {
210 mergeDefaultConfig();
211
212 String appXml = readAndValidateXml();
213 coordJob.setOrigJobXml(appXml);
214 LOG.debug("jobXml after initial validation " + XmlUtils.prettyPrint(appXml).toString());
215
216 Element eXml = XmlUtils.parseXml(appXml);
217
218 String appNamespace = readAppNamespace(eXml);
219 coordJob.setAppNamespace(appNamespace);
220
221 ParameterVerifier.verifyParameters(conf, eXml);
222
223 appXml = XmlUtils.removeComments(appXml);
224 initEvaluators();
225 Element eJob = basicResolveAndIncludeDS(appXml, conf, coordJob);
226
227 validateCoordinatorJob();
228
229 // checking if the coordinator application data input/output events
230 // specify multiple data instance values in erroneous manner
231 checkMultipleTimeInstances(eJob, COORD_INPUT_EVENTS, COORD_INPUT_EVENTS_DATA_IN);
232 checkMultipleTimeInstances(eJob, COORD_OUTPUT_EVENTS, COORD_OUTPUT_EVENTS_DATA_OUT);
233
234 LOG.debug("jobXml after all validation " + XmlUtils.prettyPrint(eJob).toString());
235
236 jobId = storeToDB(eJob, coordJob);
237 // log job info for coordinator job
238 LogUtils.setLogInfo(coordJob, logInfo);
239 LOG = XLog.resetPrefix(LOG);
240
241 if (!dryrun) {
242 // submit a command to materialize jobs for the next 1 hour (3600 secs)
243 // so we don't wait 10 mins for the Service to run.
244 queue(new CoordMaterializeTransitionXCommand(jobId, 3600), 100);
245 }
246 else {
247 Date startTime = coordJob.getStartTime();
248 long startTimeMilli = startTime.getTime();
249 long endTimeMilli = startTimeMilli + (3600 * 1000);
250 Date jobEndTime = coordJob.getEndTime();
251 Date endTime = new Date(endTimeMilli);
252 if (endTime.compareTo(jobEndTime) > 0) {
253 endTime = jobEndTime;
254 }
255 jobId = coordJob.getId();
256 LOG.info("[" + jobId + "]: Update status to RUNNING");
257 coordJob.setStatus(Job.Status.RUNNING);
258 coordJob.setPending();
259 CoordActionMaterializeCommand coordActionMatCom = new CoordActionMaterializeCommand(jobId, startTime,
260 endTime);
261 Configuration jobConf = null;
262 try {
263 jobConf = new XConfiguration(new StringReader(coordJob.getConf()));
264 }
265 catch (IOException e1) {
266 LOG.warn("Configuration parse error. read from DB :" + coordJob.getConf(), e1);
267 }
268 String action = coordActionMatCom.materializeJobs(true, coordJob, jobConf, null);
269 String output = coordJob.getJobXml() + System.getProperty("line.separator")
270 + "***actions for instance***" + action;
271 return output;
272 }
273 }
274 catch (JDOMException jex) {
275 exceptionOccured = true;
276 LOG.warn("ERROR: ", jex);
277 throw new CommandException(ErrorCode.E0700, jex.getMessage(), jex);
278 }
279 catch (CoordinatorJobException cex) {
280 exceptionOccured = true;
281 LOG.warn("ERROR: ", cex);
282 throw new CommandException(cex);
283 }
284 catch (ParameterVerifierException pex) {
285 exceptionOccured = true;
286 LOG.warn("ERROR: ", pex);
287 throw new CommandException(pex);
288 }
289 catch (IllegalArgumentException iex) {
290 exceptionOccured = true;
291 LOG.warn("ERROR: ", iex);
292 throw new CommandException(ErrorCode.E1003, iex);
293 }
294 catch (Exception ex) {
295 exceptionOccured = true;
296 LOG.warn("ERROR: ", ex);
297 throw new CommandException(ErrorCode.E0803, ex);
298 }
299 finally {
300 if (exceptionOccured) {
301 if(coordJob.getId() == null || coordJob.getId().equalsIgnoreCase("")){
302 coordJob.setStatus(CoordinatorJob.Status.FAILED);
303 coordJob.resetPending();
304 }
305 }
306 }
307
308 LOG.info("ENDED Coordinator Submit jobId=" + jobId);
309 return jobId;
310 }
311
312 /**
313 * Method that validates values in the definition for correctness. Placeholder to add more.
314 */
315 private void validateCoordinatorJob() {
316 // check if startTime < endTime
317 if (coordJob.getStartTime().after(coordJob.getEndTime())) {
318 throw new IllegalArgumentException("Coordinator Start Time cannot be greater than End Time.");
319 }
320 }
321
322 /*
323 * Check against multiple data instance values inside a single <instance> <start-instance> or <end-instance> tag
324 * If found, the job is not submitted and user is informed to correct the error, instead of defaulting to the first instance value in the list
325 */
326 private void checkMultipleTimeInstances(Element eCoordJob, String eventType, String dataType) throws CoordinatorJobException {
327 Element eventsSpec, dataSpec, instance;
328 List<Element> instanceSpecList;
329 Namespace ns = eCoordJob.getNamespace();
330 String instanceValue;
331 eventsSpec = eCoordJob.getChild(eventType, ns);
332 if (eventsSpec != null) {
333 dataSpec = eventsSpec.getChild(dataType, ns);
334 if (dataSpec != null) {
335 // In case of input-events, there can be multiple child <instance> datasets. Iterating to ensure none of them have errors
336 instanceSpecList = dataSpec.getChildren("instance", ns);
337 Iterator instanceIter = instanceSpecList.iterator();
338 while(instanceIter.hasNext()) {
339 instance = ((Element) instanceIter.next());
340 if(instance.getContentSize() == 0) { //empty string or whitespace
341 throw new CoordinatorJobException(ErrorCode.E1021, "<instance> tag within " + eventType + " is empty!");
342 }
343 instanceValue = instance.getContent(0).toString();
344 boolean isInvalid = false;
345 try {
346 isInvalid = evalAction.checkForExistence(instanceValue, ",");
347 } catch (Exception e) {
348 handleELParseException(eventType, dataType, instanceValue);
349 }
350 if (isInvalid) { // reaching this block implies instance is not empty i.e. length > 0
351 handleExpresionWithMultipleInstances(eventType, dataType, instanceValue);
352 }
353 }
354
355 // In case of input-events, there can be multiple child <start-instance> datasets. Iterating to ensure none of them have errors
356 instanceSpecList = dataSpec.getChildren("start-instance", ns);
357 instanceIter = instanceSpecList.iterator();
358 while(instanceIter.hasNext()) {
359 instance = ((Element) instanceIter.next());
360 if(instance.getContentSize() == 0) { //empty string or whitespace
361 throw new CoordinatorJobException(ErrorCode.E1021, "<start-instance> tag within " + eventType + " is empty!");
362 }
363 instanceValue = instance.getContent(0).toString();
364 boolean isInvalid = false;
365 try {
366 isInvalid = evalAction.checkForExistence(instanceValue, ",");
367 } catch (Exception e) {
368 handleELParseException(eventType, dataType, instanceValue);
369 }
370 if (isInvalid) { // reaching this block implies start instance is not empty i.e. length > 0
371 handleExpresionWithStartMultipleInstances(eventType, dataType, instanceValue);
372 }
373 }
374
375 // In case of input-events, there can be multiple child <end-instance> datasets. Iterating to ensure none of them have errors
376 instanceSpecList = dataSpec.getChildren("end-instance", ns);
377 instanceIter = instanceSpecList.iterator();
378 while(instanceIter.hasNext()) {
379 instance = ((Element) instanceIter.next());
380 if(instance.getContentSize() == 0) { //empty string or whitespace
381 throw new CoordinatorJobException(ErrorCode.E1021, "<end-instance> tag within " + eventType + " is empty!");
382 }
383 instanceValue = instance.getContent(0).toString();
384 boolean isInvalid = false;
385 try {
386 isInvalid = evalAction.checkForExistence(instanceValue, ",");
387 } catch (Exception e) {
388 handleELParseException(eventType, dataType, instanceValue);
389 }
390 if (isInvalid) { // reaching this block implies instance is not empty i.e. length > 0
391 handleExpresionWithMultipleEndInstances(eventType, dataType, instanceValue);
392 }
393 }
394
395 }
396 }
397 }
398
399 private void handleELParseException(String eventType, String dataType, String instanceValue)
400 throws CoordinatorJobException {
401 String correctAction = null;
402 if(dataType.equals(COORD_INPUT_EVENTS_DATA_IN)) {
403 correctAction = "Coordinator app definition should have valid <instance> tag for data-in";
404 } else if(dataType.equals(COORD_OUTPUT_EVENTS_DATA_OUT)) {
405 correctAction = "Coordinator app definition should have valid <instance> tag for data-out";
406 }
407 throw new CoordinatorJobException(ErrorCode.E1021, eventType + " instance '" + instanceValue
408 + "' is not valid. Coordinator job NOT SUBMITTED. " + correctAction);
409 }
410
411 private void handleExpresionWithMultipleInstances(String eventType, String dataType, String instanceValue)
412 throws CoordinatorJobException {
413 String correctAction = null;
414 if(dataType.equals(COORD_INPUT_EVENTS_DATA_IN)) {
415 correctAction = "Coordinator app definition should have separate <instance> tag per data-in instance";
416 } else if(dataType.equals(COORD_OUTPUT_EVENTS_DATA_OUT)) {
417 correctAction = "Coordinator app definition can have only one <instance> tag per data-out instance";
418 }
419 throw new CoordinatorJobException(ErrorCode.E1021, eventType + " instance '" + instanceValue
420 + "' contains more than one date instance. Coordinator job NOT SUBMITTED. " + correctAction);
421 }
422
423 private void handleExpresionWithStartMultipleInstances(String eventType, String dataType, String instanceValue)
424 throws CoordinatorJobException {
425 String correctAction = "Coordinator app definition should not have multiple start-instances";
426 throw new CoordinatorJobException(ErrorCode.E1021, eventType + " start-instance '" + instanceValue
427 + "' contains more than one date start-instance. Coordinator job NOT SUBMITTED. " + correctAction);
428 }
429
430 private void handleExpresionWithMultipleEndInstances(String eventType, String dataType, String instanceValue)
431 throws CoordinatorJobException {
432 String correctAction = "Coordinator app definition should not have multiple end-instances";
433 throw new CoordinatorJobException(ErrorCode.E1021, eventType + " end-instance '" + instanceValue
434 + "' contains more than one date end-instance. Coordinator job NOT SUBMITTED. " + correctAction);
435 }
436
437 /**
438 * Read the application XML and validate against coordinator Schema
439 *
440 * @return validated coordinator XML
441 * @throws CoordinatorJobException thrown if unable to read or validate coordinator xml
442 */
443 private String readAndValidateXml() throws CoordinatorJobException {
444 String appPath = ParamChecker.notEmpty(conf.get(OozieClient.COORDINATOR_APP_PATH),
445 OozieClient.COORDINATOR_APP_PATH);
446 String coordXml = readDefinition(appPath);
447 validateXml(coordXml);
448 return coordXml;
449 }
450
451 /**
452 * Validate against Coordinator XSD file
453 *
454 * @param xmlContent : Input coordinator xml
455 * @throws CoordinatorJobException thrown if unable to validate coordinator xml
456 */
457 private void validateXml(String xmlContent) throws CoordinatorJobException {
458 javax.xml.validation.Schema schema = Services.get().get(SchemaService.class).getSchema(SchemaName.COORDINATOR);
459 Validator validator = schema.newValidator();
460 try {
461 validator.validate(new StreamSource(new StringReader(xmlContent)));
462 }
463 catch (SAXException ex) {
464 LOG.warn("SAXException :", ex);
465 throw new CoordinatorJobException(ErrorCode.E0701, ex.getMessage(), ex);
466 }
467 catch (IOException ex) {
468 LOG.warn("IOException :", ex);
469 throw new CoordinatorJobException(ErrorCode.E0702, ex.getMessage(), ex);
470 }
471 }
472
473 /**
474 * Read the application XML schema namespace
475 *
476 * @param coordXmlElement input coordinator xml Element
477 * @return app xml namespace
478 * @throws CoordinatorJobException
479 */
480 private String readAppNamespace(Element coordXmlElement) throws CoordinatorJobException {
481 Namespace ns = coordXmlElement.getNamespace();
482 if (ns != null && bundleId != null && ns.getURI().equals(SchemaService.COORDINATOR_NAMESPACE_URI_1)) {
483 throw new CoordinatorJobException(ErrorCode.E1319, "bundle app can not submit coordinator namespace "
484 + SchemaService.COORDINATOR_NAMESPACE_URI_1 + ", please use 0.2 or later");
485 }
486 if (ns != null) {
487 return ns.getURI();
488 }
489 else {
490 throw new CoordinatorJobException(ErrorCode.E0700, "the application xml namespace is not given");
491 }
492 }
493
494 /**
495 * Merge default configuration with user-defined configuration.
496 *
497 * @throws CommandException thrown if failed to read or merge configurations
498 */
499 protected void mergeDefaultConfig() throws CommandException {
500 Path configDefault = null;
501 try {
502 String coordAppPathStr = conf.get(OozieClient.COORDINATOR_APP_PATH);
503 Path coordAppPath = new Path(coordAppPathStr);
504 String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
505 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
506 Configuration fsConf = has.createJobConf(coordAppPath.toUri().getAuthority());
507 FileSystem fs = has.createFileSystem(user, coordAppPath.toUri(), fsConf);
508
509 // app path could be a directory
510 if (!fs.isFile(coordAppPath)) {
511 configDefault = new Path(coordAppPath, CONFIG_DEFAULT);
512 } else {
513 configDefault = new Path(coordAppPath.getParent(), CONFIG_DEFAULT);
514 }
515
516 if (fs.exists(configDefault)) {
517 Configuration defaultConf = new XConfiguration(fs.open(configDefault));
518 PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES);
519 XConfiguration.injectDefaults(defaultConf, conf);
520 }
521 else {
522 LOG.info("configDefault Doesn't exist " + configDefault);
523 }
524 PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES);
525
526 // Resolving all variables in the job properties.
527 // This ensures the Hadoop Configuration semantics is preserved.
528 XConfiguration resolvedVarsConf = new XConfiguration();
529 for (Map.Entry<String, String> entry : conf) {
530 resolvedVarsConf.set(entry.getKey(), conf.get(entry.getKey()));
531 }
532 conf = resolvedVarsConf;
533 }
534 catch (IOException e) {
535 throw new CommandException(ErrorCode.E0702, e.getMessage() + " : Problem reading default config "
536 + configDefault, e);
537 }
538 catch (HadoopAccessorException e) {
539 throw new CommandException(e);
540 }
541 LOG.debug("Merged CONF :" + XmlUtils.prettyPrint(conf).toString());
542 }
543
544 /**
545 * The method resolve all the variables that are defined in configuration. It also include the data set definition
546 * from dataset file into XML.
547 *
548 * @param appXml : Original job XML
549 * @param conf : Configuration of the job
550 * @param coordJob : Coordinator job bean to be populated.
551 * @return Resolved and modified job XML element.
552 * @throws CoordinatorJobException thrown if failed to resolve basic entities or include referred datasets
553 * @throws Exception thrown if failed to resolve basic entities or include referred datasets
554 */
555 public Element basicResolveAndIncludeDS(String appXml, Configuration conf, CoordinatorJobBean coordJob)
556 throws CoordinatorJobException, Exception {
557 Element basicResolvedApp = resolveInitial(conf, appXml, coordJob);
558 includeDataSets(basicResolvedApp, conf);
559 return basicResolvedApp;
560 }
561
562 /**
563 * Insert data set into data-in and data-out tags.
564 *
565 * @param eAppXml : coordinator application XML
566 * @param eDatasets : DataSet XML
567 */
568 @SuppressWarnings("unchecked")
569 private void insertDataSet(Element eAppXml, Element eDatasets) {
570 // Adding DS definition in the coordinator XML
571 Element inputList = eAppXml.getChild("input-events", eAppXml.getNamespace());
572 if (inputList != null) {
573 for (Element dataIn : (List<Element>) inputList.getChildren("data-in", eAppXml.getNamespace())) {
574 Element eDataset = findDataSet(eDatasets, dataIn.getAttributeValue("dataset"));
575 dataIn.getContent().add(0, eDataset);
576 }
577 }
578 Element outputList = eAppXml.getChild("output-events", eAppXml.getNamespace());
579 if (outputList != null) {
580 for (Element dataOut : (List<Element>) outputList.getChildren("data-out", eAppXml.getNamespace())) {
581 Element eDataset = findDataSet(eDatasets, dataOut.getAttributeValue("dataset"));
582 dataOut.getContent().add(0, eDataset);
583 }
584 }
585 }
586
587 /**
588 * Find a specific dataset from a list of Datasets.
589 *
590 * @param eDatasets : List of data sets
591 * @param name : queried data set name
592 * @return one Dataset element. otherwise throw Exception
593 */
594 @SuppressWarnings("unchecked")
595 private static Element findDataSet(Element eDatasets, String name) {
596 for (Element eDataset : (List<Element>) eDatasets.getChildren("dataset", eDatasets.getNamespace())) {
597 if (eDataset.getAttributeValue("name").equals(name)) {
598 eDataset = (Element) eDataset.clone();
599 eDataset.detach();
600 return eDataset;
601 }
602 }
603 throw new RuntimeException("undefined dataset: " + name);
604 }
605
606 /**
607 * Initialize all the required EL Evaluators.
608 */
609 protected void initEvaluators() {
610 evalFreq = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-submit-freq");
611 evalNofuncs = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-submit-nofuncs");
612 evalInst = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-submit-instances");
613 evalSla = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-sla-submit");
614 evalAction = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-action-start");
615 }
616
617 /**
618 * Resolve basic entities using job Configuration.
619 *
620 * @param conf :Job configuration
621 * @param appXml : Original job XML
622 * @param coordJob : Coordinator job bean to be populated.
623 * @return Resolved job XML element.
624 * @throws CoordinatorJobException thrown if failed to resolve basic entities
625 * @throws Exception thrown if failed to resolve basic entities
626 */
627 @SuppressWarnings("unchecked")
628 protected Element resolveInitial(Configuration conf, String appXml, CoordinatorJobBean coordJob)
629 throws CoordinatorJobException, Exception {
630 Element eAppXml = XmlUtils.parseXml(appXml);
631 // job's main attributes
632 // frequency
633 String val = resolveAttribute("frequency", eAppXml, evalFreq);
634 int ival = ParamChecker.checkInteger(val, "frequency");
635 ParamChecker.checkGTZero(ival, "frequency");
636 coordJob.setFrequency(ival);
637 TimeUnit tmp = (evalFreq.getVariable("timeunit") == null) ? TimeUnit.MINUTE : ((TimeUnit) evalFreq
638 .getVariable("timeunit"));
639 addAnAttribute("freq_timeunit", eAppXml, tmp.toString());
640 // TimeUnit
641 coordJob.setTimeUnit(CoordinatorJob.Timeunit.valueOf(tmp.toString()));
642 // End Of Duration
643 tmp = evalFreq.getVariable("endOfDuration") == null ? TimeUnit.NONE : ((TimeUnit) evalFreq
644 .getVariable("endOfDuration"));
645 addAnAttribute("end_of_duration", eAppXml, tmp.toString());
646 // coordJob.setEndOfDuration(tmp) // TODO: Add new attribute in Job bean
647
648 // Application name
649 if (this.coordName == null) {
650 String name = ELUtils.resolveAppName(eAppXml.getAttribute("name").getValue(), conf);
651 coordJob.setAppName(name);
652 }
653 else {
654 // this coord job is created from bundle
655 coordJob.setAppName(this.coordName);
656 }
657
658 // start time
659 val = resolveAttribute("start", eAppXml, evalNofuncs);
660 ParamChecker.checkDateOozieTZ(val, "start");
661 coordJob.setStartTime(DateUtils.parseDateOozieTZ(val));
662 // end time
663 val = resolveAttribute("end", eAppXml, evalNofuncs);
664 ParamChecker.checkDateOozieTZ(val, "end");
665 coordJob.setEndTime(DateUtils.parseDateOozieTZ(val));
666 // Time zone
667 val = resolveAttribute("timezone", eAppXml, evalNofuncs);
668 ParamChecker.checkTimeZone(val, "timezone");
669 coordJob.setTimeZone(val);
670
671 // controls
672 val = resolveTagContents("timeout", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs);
673 if (val == "") {
674 val = Services.get().getConf().get(CONF_DEFAULT_TIMEOUT_NORMAL);
675 }
676
677 ival = ParamChecker.checkInteger(val, "timeout");
678 if (ival < 0 || ival > Services.get().getConf().getInt(CONF_DEFAULT_MAX_TIMEOUT, 129600)) {
679 ival = Services.get().getConf().getInt(CONF_DEFAULT_MAX_TIMEOUT, 129600);
680 }
681 coordJob.setTimeout(ival);
682
683 val = resolveTagContents("concurrency", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs);
684 if (val == null || val.isEmpty()) {
685 val = Services.get().getConf().get(CONF_DEFAULT_CONCURRENCY, "1");
686 }
687 ival = ParamChecker.checkInteger(val, "concurrency");
688 coordJob.setConcurrency(ival);
689
690 val = resolveTagContents("throttle", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs);
691 if (val == null || val.isEmpty()) {
692 int defaultThrottle = Services.get().getConf().getInt(CONF_DEFAULT_THROTTLE, 12);
693 ival = defaultThrottle;
694 }
695 else {
696 ival = ParamChecker.checkInteger(val, "throttle");
697 }
698 int maxQueue = Services.get().getConf().getInt(CONF_QUEUE_SIZE, 10000);
699 float factor = Services.get().getConf().getFloat(CONF_MAT_THROTTLING_FACTOR, 0.10f);
700 int maxThrottle = (int) (maxQueue * factor);
701 if (ival > maxThrottle || ival < 1) {
702 ival = maxThrottle;
703 }
704 LOG.debug("max throttle " + ival);
705 coordJob.setMatThrottling(ival);
706
707 val = resolveTagContents("execution", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs);
708 if (val == "") {
709 val = Execution.FIFO.toString();
710 }
711 coordJob.setExecution(Execution.valueOf(val));
712 String[] acceptedVals = { Execution.LIFO.toString(), Execution.FIFO.toString(), Execution.LAST_ONLY.toString() };
713 ParamChecker.isMember(val, acceptedVals, "execution");
714
715 // datasets
716 resolveTagContents("include", eAppXml.getChild("datasets", eAppXml.getNamespace()), evalNofuncs);
717 // for each data set
718 resolveDataSets(eAppXml);
719 HashMap<String, String> dataNameList = new HashMap<String, String>();
720 resolveIOEvents(eAppXml, dataNameList);
721
722 resolveTagContents("app-path", eAppXml.getChild("action", eAppXml.getNamespace()).getChild("workflow",
723 eAppXml.getNamespace()), evalNofuncs);
724 // TODO: If action or workflow tag is missing, NullPointerException will
725 // occur
726 Element configElem = eAppXml.getChild("action", eAppXml.getNamespace()).getChild("workflow",
727 eAppXml.getNamespace()).getChild("configuration", eAppXml.getNamespace());
728 evalData = CoordELEvaluator.createELEvaluatorForDataEcho(conf, "coord-job-submit-data", dataNameList);
729 if (configElem != null) {
730 for (Element propElem : (List<Element>) configElem.getChildren("property", configElem.getNamespace())) {
731 resolveTagContents("name", propElem, evalData);
732 // Want to check the data-integrity but don't want to modify the
733 // XML
734 // for properties only
735 Element tmpProp = (Element) propElem.clone();
736 resolveTagContents("value", tmpProp, evalData);
737 }
738 }
739 resolveSLA(eAppXml, coordJob);
740 return eAppXml;
741 }
742
743 /**
744 * Resolve SLA events
745 *
746 * @param eAppXml job XML
747 * @param coordJob coordinator job bean
748 * @throws CommandException thrown if failed to resolve sla events
749 */
750 private void resolveSLA(Element eAppXml, CoordinatorJobBean coordJob) throws CommandException {
751 Element eSla = eAppXml.getChild("action", eAppXml.getNamespace()).getChild("info",
752 Namespace.getNamespace(SchemaService.SLA_NAME_SPACE_URI));
753
754 if (eSla != null) {
755 String slaXml = XmlUtils.prettyPrint(eSla).toString();
756 try {
757 // EL evaluation
758 slaXml = evalSla.evaluate(slaXml, String.class);
759 // Validate against semantic SXD
760 XmlUtils.validateData(slaXml, SchemaName.SLA_ORIGINAL);
761 }
762 catch (Exception e) {
763 throw new CommandException(ErrorCode.E1004, "Validation ERROR :" + e.getMessage(), e);
764 }
765 }
766 }
767
768 /**
769 * Resolve input-events/data-in and output-events/data-out tags.
770 *
771 * @param eJob : Job element
772 * @throws CoordinatorJobException thrown if failed to resolve input and output events
773 */
774 @SuppressWarnings("unchecked")
775 private void resolveIOEvents(Element eJobOrg, HashMap<String, String> dataNameList) throws CoordinatorJobException {
776 // Resolving input-events/data-in
777 // Clone the job and don't update anything in the original
778 Element eJob = (Element) eJobOrg.clone();
779 Element inputList = eJob.getChild("input-events", eJob.getNamespace());
780 if (inputList != null) {
781 TreeSet<String> eventNameSet = new TreeSet<String>();
782 for (Element dataIn : (List<Element>) inputList.getChildren("data-in", eJob.getNamespace())) {
783 String dataInName = dataIn.getAttributeValue("name");
784 dataNameList.put(dataInName, "data-in");
785 // check whether there is any duplicate data-in name
786 if (eventNameSet.contains(dataInName)) {
787 throw new RuntimeException("Duplicate dataIn name " + dataInName);
788 }
789 else {
790 eventNameSet.add(dataInName);
791 }
792 resolveTagContents("instance", dataIn, evalInst);
793 resolveTagContents("start-instance", dataIn, evalInst);
794 resolveTagContents("end-instance", dataIn, evalInst);
795 }
796 }
797 // Resolving output-events/data-out
798 Element outputList = eJob.getChild("output-events", eJob.getNamespace());
799 if (outputList != null) {
800 TreeSet<String> eventNameSet = new TreeSet<String>();
801 for (Element dataOut : (List<Element>) outputList.getChildren("data-out", eJob.getNamespace())) {
802 String dataOutName = dataOut.getAttributeValue("name");
803 dataNameList.put(dataOutName, "data-out");
804 // check whether there is any duplicate data-out name
805 if (eventNameSet.contains(dataOutName)) {
806 throw new RuntimeException("Duplicate dataIn name " + dataOutName);
807 }
808 else {
809 eventNameSet.add(dataOutName);
810 }
811 resolveTagContents("instance", dataOut, evalInst);
812 }
813 }
814
815 }
816
817 /**
818 * Add an attribute into XML element.
819 *
820 * @param attrName :attribute name
821 * @param elem : Element to add attribute
822 * @param value :Value of attribute
823 */
824 private void addAnAttribute(String attrName, Element elem, String value) {
825 elem.setAttribute(attrName, value);
826 }
827
828 /**
829 * Resolve datasets using job configuration.
830 *
831 * @param eAppXml : Job Element XML
832 * @throws Exception thrown if failed to resolve datasets
833 */
834 @SuppressWarnings("unchecked")
835 private void resolveDataSets(Element eAppXml) throws Exception {
836 Element datasetList = eAppXml.getChild("datasets", eAppXml.getNamespace());
837 if (datasetList != null) {
838
839 List<Element> dsElems = datasetList.getChildren("dataset", eAppXml.getNamespace());
840 resolveDataSets(dsElems);
841 resolveTagContents("app-path", eAppXml.getChild("action", eAppXml.getNamespace()).getChild("workflow",
842 eAppXml.getNamespace()), evalNofuncs);
843 }
844 }
845
846 /**
847 * Resolve datasets using job configuration.
848 *
849 * @param dsElems : Data set XML element.
850 * @throws CoordinatorJobException thrown if failed to resolve datasets
851 */
852 private void resolveDataSets(List<Element> dsElems) throws CoordinatorJobException {
853 for (Element dsElem : dsElems) {
854 // Setting up default TimeUnit and EndOFDuraion
855 evalFreq.setVariable("timeunit", TimeUnit.MINUTE);
856 evalFreq.setVariable("endOfDuration", TimeUnit.NONE);
857
858 String val = resolveAttribute("frequency", dsElem, evalFreq);
859 int ival = ParamChecker.checkInteger(val, "frequency");
860 ParamChecker.checkGTZero(ival, "frequency");
861 addAnAttribute("freq_timeunit", dsElem, evalFreq.getVariable("timeunit") == null ? TimeUnit.MINUTE
862 .toString() : ((TimeUnit) evalFreq.getVariable("timeunit")).toString());
863 addAnAttribute("end_of_duration", dsElem, evalFreq.getVariable("endOfDuration") == null ? TimeUnit.NONE
864 .toString() : ((TimeUnit) evalFreq.getVariable("endOfDuration")).toString());
865 val = resolveAttribute("initial-instance", dsElem, evalNofuncs);
866 ParamChecker.checkDateOozieTZ(val, "initial-instance");
867 checkInitialInstance(val);
868 val = resolveAttribute("timezone", dsElem, evalNofuncs);
869 ParamChecker.checkTimeZone(val, "timezone");
870 resolveTagContents("uri-template", dsElem, evalNofuncs);
871 resolveTagContents("done-flag", dsElem, evalNofuncs);
872 }
873 }
874
875 /**
876 * Resolve the content of a tag.
877 *
878 * @param tagName : Tag name of job XML i.e. <timeout> 10 </timeout>
879 * @param elem : Element where the tag exists.
880 * @param eval : EL evealuator
881 * @return Resolved tag content.
882 * @throws CoordinatorJobException thrown if failed to resolve tag content
883 */
884 @SuppressWarnings("unchecked")
885 private String resolveTagContents(String tagName, Element elem, ELEvaluator eval) throws CoordinatorJobException {
886 String ret = "";
887 if (elem != null) {
888 for (Element tagElem : (List<Element>) elem.getChildren(tagName, elem.getNamespace())) {
889 if (tagElem != null) {
890 String updated;
891 try {
892 updated = CoordELFunctions.evalAndWrap(eval, tagElem.getText().trim());
893
894 }
895 catch (Exception e) {
896 throw new CoordinatorJobException(ErrorCode.E1004, e.getMessage(), e);
897 }
898 tagElem.removeContent();
899 tagElem.addContent(updated);
900 ret += updated;
901 }
902 }
903 }
904 return ret;
905 }
906
907 /**
908 * Resolve an attribute value.
909 *
910 * @param attrName : Attribute name.
911 * @param elem : XML Element where attribute is defiend
912 * @param eval : ELEvaluator used to resolve
913 * @return Resolved attribute value
914 * @throws CoordinatorJobException thrown if failed to resolve an attribute value
915 */
916 private String resolveAttribute(String attrName, Element elem, ELEvaluator eval) throws CoordinatorJobException {
917 Attribute attr = elem.getAttribute(attrName);
918 String val = null;
919 if (attr != null) {
920 try {
921 val = CoordELFunctions.evalAndWrap(eval, attr.getValue().trim());
922 }
923 catch (Exception e) {
924 throw new CoordinatorJobException(ErrorCode.E1004, e.getMessage(), e);
925 }
926 attr.setValue(val);
927 }
928 return val;
929 }
930
931 /**
932 * Include referred datasets into XML.
933 *
934 * @param resolvedXml : Job XML element.
935 * @param conf : Job configuration
936 * @throws CoordinatorJobException thrown if failed to include referred datasets into XML
937 */
938 @SuppressWarnings("unchecked")
939 protected void includeDataSets(Element resolvedXml, Configuration conf) throws CoordinatorJobException {
940 Element datasets = resolvedXml.getChild("datasets", resolvedXml.getNamespace());
941 Element allDataSets = new Element("all_datasets", resolvedXml.getNamespace());
942 List<String> dsList = new ArrayList<String>();
943 if (datasets != null) {
944 for (Element includeElem : (List<Element>) datasets.getChildren("include", datasets.getNamespace())) {
945 String incDSFile = includeElem.getTextTrim();
946 includeOneDSFile(incDSFile, dsList, allDataSets, datasets.getNamespace());
947 }
948 for (Element e : (List<Element>) datasets.getChildren("dataset", datasets.getNamespace())) {
949 String dsName = e.getAttributeValue("name");
950 if (dsList.contains(dsName)) {// Override with this DS
951 // Remove old DS
952 removeDataSet(allDataSets, dsName);
953 }
954 else {
955 dsList.add(dsName);
956 }
957 allDataSets.addContent((Element) e.clone());
958 }
959 }
960 insertDataSet(resolvedXml, allDataSets);
961 resolvedXml.removeChild("datasets", resolvedXml.getNamespace());
962 }
963
964 /**
965 * Include one dataset file.
966 *
967 * @param incDSFile : Include data set filename.
968 * @param dsList :List of dataset names to verify the duplicate.
969 * @param allDataSets : Element that includes all dataset definitions.
970 * @param dsNameSpace : Data set name space
971 * @throws CoordinatorJobException thrown if failed to include one dataset file
972 */
973 @SuppressWarnings("unchecked")
974 private void includeOneDSFile(String incDSFile, List<String> dsList, Element allDataSets, Namespace dsNameSpace)
975 throws CoordinatorJobException {
976 Element tmpDataSets = null;
977 try {
978 String dsXml = readDefinition(incDSFile);
979 LOG.debug("DSFILE :" + incDSFile + "\n" + dsXml);
980 tmpDataSets = XmlUtils.parseXml(dsXml);
981 }
982 catch (JDOMException e) {
983 LOG.warn("Error parsing included dataset [{0}]. Message [{1}]", incDSFile, e.getMessage());
984 throw new CoordinatorJobException(ErrorCode.E0700, e.getMessage());
985 }
986 resolveDataSets(tmpDataSets.getChildren("dataset"));
987 for (Element e : (List<Element>) tmpDataSets.getChildren("dataset")) {
988 String dsName = e.getAttributeValue("name");
989 if (dsList.contains(dsName)) {
990 throw new RuntimeException("Duplicate Dataset " + dsName);
991 }
992 dsList.add(dsName);
993 Element tmp = (Element) e.clone();
994 // TODO: Don't like to over-write the external/include DS's namespace
995 tmp.setNamespace(dsNameSpace);
996 tmp.getChild("uri-template").setNamespace(dsNameSpace);
997 if (e.getChild("done-flag") != null) {
998 tmp.getChild("done-flag").setNamespace(dsNameSpace);
999 }
1000 allDataSets.addContent(tmp);
1001 }
1002 // nested include
1003 for (Element includeElem : (List<Element>) tmpDataSets.getChildren("include", tmpDataSets.getNamespace())) {
1004 String incFile = includeElem.getTextTrim();
1005 includeOneDSFile(incFile, dsList, allDataSets, dsNameSpace);
1006 }
1007 }
1008
1009 /**
1010 * Remove a dataset from a list of dataset.
1011 *
1012 * @param eDatasets : List of dataset
1013 * @param name : Dataset name to be removed.
1014 */
1015 @SuppressWarnings("unchecked")
1016 private static void removeDataSet(Element eDatasets, String name) {
1017 for (Element eDataset : (List<Element>) eDatasets.getChildren("dataset", eDatasets.getNamespace())) {
1018 if (eDataset.getAttributeValue("name").equals(name)) {
1019 eDataset.detach();
1020 }
1021 }
1022 throw new RuntimeException("undefined dataset: " + name);
1023 }
1024
1025 /**
1026 * Read coordinator definition.
1027 *
1028 * @param appPath application path.
1029 * @return coordinator definition.
1030 * @throws CoordinatorJobException thrown if the definition could not be read.
1031 */
1032 protected String readDefinition(String appPath) throws CoordinatorJobException {
1033 String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
1034 // Configuration confHadoop = CoordUtils.getHadoopConf(conf);
1035 try {
1036 URI uri = new URI(appPath);
1037 LOG.debug("user =" + user);
1038 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
1039 Configuration fsConf = has.createJobConf(uri.getAuthority());
1040 FileSystem fs = has.createFileSystem(user, uri, fsConf);
1041 Path appDefPath = null;
1042
1043 // app path could be a directory
1044 Path path = new Path(uri.getPath());
1045 // check file exists for dataset include file, app xml already checked
1046 if (!fs.exists(path)) {
1047 throw new URISyntaxException(path.toString(), "path not existed : " + path.toString());
1048 }
1049 if (!fs.isFile(path)) {
1050 appDefPath = new Path(path, COORDINATOR_XML_FILE);
1051 } else {
1052 appDefPath = path;
1053 }
1054
1055 Reader reader = new InputStreamReader(fs.open(appDefPath));
1056 StringWriter writer = new StringWriter();
1057 IOUtils.copyCharStream(reader, writer);
1058 return writer.toString();
1059 }
1060 catch (IOException ex) {
1061 LOG.warn("IOException :" + XmlUtils.prettyPrint(conf), ex);
1062 throw new CoordinatorJobException(ErrorCode.E1001, ex.getMessage(), ex);
1063 }
1064 catch (URISyntaxException ex) {
1065 LOG.warn("URISyException :" + ex.getMessage());
1066 throw new CoordinatorJobException(ErrorCode.E1002, appPath, ex.getMessage(), ex);
1067 }
1068 catch (HadoopAccessorException ex) {
1069 throw new CoordinatorJobException(ex);
1070 }
1071 catch (Exception ex) {
1072 LOG.warn("Exception :", ex);
1073 throw new CoordinatorJobException(ErrorCode.E1001, ex.getMessage(), ex);
1074 }
1075 }
1076
1077 /**
1078 * Write a coordinator job into database
1079 *
1080 * @param eJob : XML element of job
1081 * @param coordJob : Coordinator job bean
1082 * @return Job id
1083 * @throws CommandException thrown if unable to save coordinator job to db
1084 */
1085 private String storeToDB(Element eJob, CoordinatorJobBean coordJob) throws CommandException {
1086 String jobId = Services.get().get(UUIDService.class).generateId(ApplicationType.COORDINATOR);
1087 coordJob.setId(jobId);
1088 coordJob.setAuthToken(this.authToken);
1089
1090 coordJob.setAppPath(conf.get(OozieClient.COORDINATOR_APP_PATH));
1091 coordJob.setCreatedTime(new Date());
1092 coordJob.setUser(conf.get(OozieClient.USER_NAME));
1093 String group = ConfigUtils.getWithDeprecatedCheck(conf, OozieClient.JOB_ACL, OozieClient.GROUP_NAME, null);
1094 coordJob.setGroup(group);
1095 coordJob.setConf(XmlUtils.prettyPrint(conf).toString());
1096 coordJob.setJobXml(XmlUtils.prettyPrint(eJob).toString());
1097 coordJob.setLastActionNumber(0);
1098 coordJob.setLastModifiedTime(new Date());
1099
1100 if (!dryrun) {
1101 coordJob.setLastModifiedTime(new Date());
1102 try {
1103 jpaService.execute(new CoordJobInsertJPAExecutor(coordJob));
1104 }
1105 catch (JPAExecutorException je) {
1106 throw new CommandException(je);
1107 }
1108 }
1109 return jobId;
1110 }
1111
1112 /*
1113 * this method checks if the initial-instance specified for a particular
1114 is not a date earlier than the oozie server default Jan 01, 1970 00:00Z UTC
1115 */
1116 private void checkInitialInstance(String val) throws CoordinatorJobException, IllegalArgumentException {
1117 Date initialInstance, givenInstance;
1118 try {
1119 initialInstance = DateUtils.parseDateUTC("1970-01-01T00:00Z");
1120 givenInstance = DateUtils.parseDateOozieTZ(val);
1121 }
1122 catch (Exception e) {
1123 throw new IllegalArgumentException("Unable to parse dataset initial-instance string '" + val +
1124 "' to Date object. ",e);
1125 }
1126 if(givenInstance.compareTo(initialInstance) < 0) {
1127 throw new CoordinatorJobException(ErrorCode.E1021, "Dataset initial-instance " + val +
1128 " is earlier than the default initial instance " + DateUtils.formatDateOozieTZ(initialInstance));
1129 }
1130 }
1131
1132 /* (non-Javadoc)
1133 * @see org.apache.oozie.command.XCommand#getEntityKey()
1134 */
1135 @Override
1136 public String getEntityKey() {
1137 return null;
1138 }
1139
1140 /* (non-Javadoc)
1141 * @see org.apache.oozie.command.XCommand#isLockRequired()
1142 */
1143 @Override
1144 protected boolean isLockRequired() {
1145 return false;
1146 }
1147
1148 /* (non-Javadoc)
1149 * @see org.apache.oozie.command.XCommand#loadState()
1150 */
1151 @Override
1152 protected void loadState() throws CommandException {
1153 jpaService = Services.get().get(JPAService.class);
1154 if (jpaService == null) {
1155 throw new CommandException(ErrorCode.E0610);
1156 }
1157 coordJob = new CoordinatorJobBean();
1158 if (this.bundleId != null) {
1159 // this coord job is created from bundle
1160 coordJob.setBundleId(this.bundleId);
1161 // first use bundle id if submit thru bundle
1162 logInfo.setParameter(DagXLogInfoService.JOB, this.bundleId);
1163 LogUtils.setLogInfo(logInfo);
1164 }
1165 if (this.coordName != null) {
1166 // this coord job is created from bundle
1167 coordJob.setAppName(this.coordName);
1168 }
1169 setJob(coordJob);
1170
1171 }
1172
1173 /* (non-Javadoc)
1174 * @see org.apache.oozie.command.XCommand#verifyPrecondition()
1175 */
1176 @Override
1177 protected void verifyPrecondition() throws CommandException {
1178
1179 }
1180
1181 /* (non-Javadoc)
1182 * @see org.apache.oozie.command.TransitionXCommand#notifyParent()
1183 */
1184 @Override
1185 public void notifyParent() throws CommandException {
1186 // update bundle action
1187 if (coordJob.getBundleId() != null) {
1188 LOG.debug("Updating bundle record: " + coordJob.getBundleId() + " for coord id: " + coordJob.getId());
1189 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus);
1190 bundleStatusUpdate.call();
1191 }
1192 }
1193
1194 /* (non-Javadoc)
1195 * @see org.apache.oozie.command.TransitionXCommand#updateJob()
1196 */
1197 @Override
1198 public void updateJob() throws CommandException {
1199 }
1200
1201 /* (non-Javadoc)
1202 * @see org.apache.oozie.command.TransitionXCommand#getJob()
1203 */
1204 @Override
1205 public Job getJob() {
1206 return coordJob;
1207 }
1208
1209 @Override
1210 public void performWrites() throws CommandException {
1211 }
1212 }