001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.coord; 019 020 import java.io.IOException; 021 import java.io.InputStreamReader; 022 import java.io.Reader; 023 import java.io.StringReader; 024 import java.io.StringWriter; 025 import java.net.URI; 026 import java.net.URISyntaxException; 027 import java.util.ArrayList; 028 import java.util.Date; 029 import java.util.HashMap; 030 import java.util.HashSet; 031 import java.util.Iterator; 032 import java.util.List; 033 import java.util.Map; 034 import java.util.Set; 035 import java.util.TreeSet; 036 037 import javax.xml.transform.stream.StreamSource; 038 import javax.xml.validation.Validator; 039 040 import org.apache.hadoop.conf.Configuration; 041 import org.apache.hadoop.fs.FileSystem; 042 import org.apache.hadoop.fs.Path; 043 import org.apache.oozie.CoordinatorJobBean; 044 import org.apache.oozie.ErrorCode; 045 import org.apache.oozie.client.CoordinatorJob; 046 import org.apache.oozie.client.Job; 047 import org.apache.oozie.client.OozieClient; 048 import org.apache.oozie.client.CoordinatorJob.Execution; 049 import org.apache.oozie.command.CommandException; 050 import org.apache.oozie.command.SubmitTransitionXCommand; 051 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand; 052 import org.apache.oozie.coord.CoordELEvaluator; 053 import org.apache.oozie.coord.CoordELFunctions; 054 import org.apache.oozie.coord.CoordinatorJobException; 055 import org.apache.oozie.coord.TimeUnit; 056 import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor; 057 import org.apache.oozie.executor.jpa.JPAExecutorException; 058 import org.apache.oozie.service.DagXLogInfoService; 059 import org.apache.oozie.service.HadoopAccessorException; 060 import org.apache.oozie.service.HadoopAccessorService; 061 import org.apache.oozie.service.JPAService; 062 import org.apache.oozie.service.SchemaService; 063 import org.apache.oozie.service.Service; 064 import org.apache.oozie.service.Services; 065 import org.apache.oozie.service.UUIDService; 066 import org.apache.oozie.service.SchemaService.SchemaName; 067 import org.apache.oozie.service.UUIDService.ApplicationType; 068 import org.apache.oozie.util.ConfigUtils; 069 import org.apache.oozie.util.DateUtils; 070 import org.apache.oozie.util.ELEvaluator; 071 import org.apache.oozie.util.ELUtils; 072 import org.apache.oozie.util.IOUtils; 073 import org.apache.oozie.util.InstrumentUtils; 074 import org.apache.oozie.util.LogUtils; 075 import org.apache.oozie.util.ParamChecker; 076 import org.apache.oozie.util.ParameterVerifier; 077 import org.apache.oozie.util.ParameterVerifierException; 078 import org.apache.oozie.util.PropertiesUtils; 079 import org.apache.oozie.util.XConfiguration; 080 import org.apache.oozie.util.XLog; 081 import org.apache.oozie.util.XmlUtils; 082 import org.jdom.Attribute; 083 import org.jdom.Element; 084 import org.jdom.JDOMException; 085 import org.jdom.Namespace; 086 import org.xml.sax.SAXException; 087 088 /** 089 * This class provides the functionalities to resolve a coordinator job XML and write the job information into a DB 090 * table. 091 * <p/> 092 * Specifically it performs the following functions: 1. Resolve all the variables or properties using job 093 * configurations. 2. Insert all datasets definition as part of the <data-in> and <data-out> tags. 3. Validate the XML 094 * at runtime. 095 */ 096 public class CoordSubmitXCommand extends SubmitTransitionXCommand { 097 098 private Configuration conf; 099 private final String bundleId; 100 private final String coordName; 101 private boolean dryrun; 102 private JPAService jpaService = null; 103 private CoordinatorJob.Status prevStatus = CoordinatorJob.Status.PREP; 104 105 public static final String CONFIG_DEFAULT = "coord-config-default.xml"; 106 public static final String COORDINATOR_XML_FILE = "coordinator.xml"; 107 public final String COORD_INPUT_EVENTS ="input-events"; 108 public final String COORD_OUTPUT_EVENTS = "output-events"; 109 public final String COORD_INPUT_EVENTS_DATA_IN ="data-in"; 110 public final String COORD_OUTPUT_EVENTS_DATA_OUT = "data-out"; 111 112 private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>(); 113 private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>(); 114 115 private CoordinatorJobBean coordJob = null; 116 /** 117 * Default timeout for normal jobs, in minutes, after which coordinator input check will timeout 118 */ 119 public static final String CONF_DEFAULT_TIMEOUT_NORMAL = Service.CONF_PREFIX + "coord.normal.default.timeout"; 120 121 public static final String CONF_DEFAULT_CONCURRENCY = Service.CONF_PREFIX + "coord.default.concurrency"; 122 123 public static final String CONF_DEFAULT_THROTTLE = Service.CONF_PREFIX + "coord.default.throttle"; 124 125 public static final String CONF_MAT_THROTTLING_FACTOR = Service.CONF_PREFIX 126 + "coord.materialization.throttling.factor"; 127 128 /** 129 * Default MAX timeout in minutes, after which coordinator input check will timeout 130 */ 131 public static final String CONF_DEFAULT_MAX_TIMEOUT = Service.CONF_PREFIX + "coord.default.max.timeout"; 132 133 134 public static final String CONF_QUEUE_SIZE = Service.CONF_PREFIX + "CallableQueueService.queue.size"; 135 136 private ELEvaluator evalFreq = null; 137 private ELEvaluator evalNofuncs = null; 138 private ELEvaluator evalData = null; 139 private ELEvaluator evalInst = null; 140 private ELEvaluator evalAction = null; 141 private ELEvaluator evalSla = null; 142 143 static { 144 String[] badUserProps = { PropertiesUtils.YEAR, PropertiesUtils.MONTH, PropertiesUtils.DAY, 145 PropertiesUtils.HOUR, PropertiesUtils.MINUTE, PropertiesUtils.DAYS, PropertiesUtils.HOURS, 146 PropertiesUtils.MINUTES, PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB, 147 PropertiesUtils.TB, PropertiesUtils.PB, PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN, 148 PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN, PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS }; 149 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES); 150 151 String[] badDefaultProps = { PropertiesUtils.HADOOP_USER}; 152 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES); 153 PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES); 154 } 155 156 /** 157 * Constructor to create the Coordinator Submit Command. 158 * 159 * @param conf : Configuration for Coordinator job 160 */ 161 public CoordSubmitXCommand(Configuration conf) { 162 super("coord_submit", "coord_submit", 1); 163 this.conf = ParamChecker.notNull(conf, "conf"); 164 this.bundleId = null; 165 this.coordName = null; 166 } 167 168 /** 169 * Constructor to create the Coordinator Submit Command by bundle job. 170 * 171 * @param conf : Configuration for Coordinator job 172 * @param bundleId : bundle id 173 * @param coordName : coord name 174 */ 175 public CoordSubmitXCommand(Configuration conf, String bundleId, String coordName) { 176 super("coord_submit", "coord_submit", 1); 177 this.conf = ParamChecker.notNull(conf, "conf"); 178 this.bundleId = ParamChecker.notEmpty(bundleId, "bundleId"); 179 this.coordName = ParamChecker.notEmpty(coordName, "coordName"); 180 } 181 182 /** 183 * Constructor to create the Coordinator Submit Command. 184 * 185 * @param dryrun : if dryrun 186 * @param conf : Configuration for Coordinator job 187 */ 188 public CoordSubmitXCommand(boolean dryrun, Configuration conf) { 189 this(conf); 190 this.dryrun = dryrun; 191 } 192 193 /* (non-Javadoc) 194 * @see org.apache.oozie.command.XCommand#execute() 195 */ 196 @Override 197 protected String submit() throws CommandException { 198 String jobId = null; 199 LOG.info("STARTED Coordinator Submit"); 200 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation()); 201 202 boolean exceptionOccured = false; 203 try { 204 mergeDefaultConfig(); 205 206 String appXml = readAndValidateXml(); 207 coordJob.setOrigJobXml(appXml); 208 LOG.debug("jobXml after initial validation " + XmlUtils.prettyPrint(appXml).toString()); 209 210 Element eXml = XmlUtils.parseXml(appXml); 211 212 String appNamespace = readAppNamespace(eXml); 213 coordJob.setAppNamespace(appNamespace); 214 215 ParameterVerifier.verifyParameters(conf, eXml); 216 217 appXml = XmlUtils.removeComments(appXml); 218 initEvaluators(); 219 Element eJob = basicResolveAndIncludeDS(appXml, conf, coordJob); 220 221 validateCoordinatorJob(); 222 223 // checking if the coordinator application data input/output events 224 // specify multiple data instance values in erroneous manner 225 checkMultipleTimeInstances(eJob, COORD_INPUT_EVENTS, COORD_INPUT_EVENTS_DATA_IN); 226 checkMultipleTimeInstances(eJob, COORD_OUTPUT_EVENTS, COORD_OUTPUT_EVENTS_DATA_OUT); 227 228 LOG.debug("jobXml after all validation " + XmlUtils.prettyPrint(eJob).toString()); 229 230 jobId = storeToDB(eJob, coordJob); 231 // log job info for coordinator job 232 LogUtils.setLogInfo(coordJob, logInfo); 233 LOG = XLog.resetPrefix(LOG); 234 235 if (!dryrun) { 236 // submit a command to materialize jobs for the next 1 hour (3600 secs) 237 // so we don't wait 10 mins for the Service to run. 238 queue(new CoordMaterializeTransitionXCommand(jobId, 3600), 100); 239 } 240 else { 241 Date startTime = coordJob.getStartTime(); 242 long startTimeMilli = startTime.getTime(); 243 long endTimeMilli = startTimeMilli + (3600 * 1000); 244 Date jobEndTime = coordJob.getEndTime(); 245 Date endTime = new Date(endTimeMilli); 246 if (endTime.compareTo(jobEndTime) > 0) { 247 endTime = jobEndTime; 248 } 249 jobId = coordJob.getId(); 250 LOG.info("[" + jobId + "]: Update status to RUNNING"); 251 coordJob.setStatus(Job.Status.RUNNING); 252 coordJob.setPending(); 253 CoordActionMaterializeCommand coordActionMatCom = new CoordActionMaterializeCommand(jobId, startTime, 254 endTime); 255 Configuration jobConf = null; 256 try { 257 jobConf = new XConfiguration(new StringReader(coordJob.getConf())); 258 } 259 catch (IOException e1) { 260 LOG.warn("Configuration parse error. read from DB :" + coordJob.getConf(), e1); 261 } 262 String action = coordActionMatCom.materializeJobs(true, coordJob, jobConf, null); 263 String output = coordJob.getJobXml() + System.getProperty("line.separator") 264 + "***actions for instance***" + action; 265 return output; 266 } 267 } 268 catch (JDOMException jex) { 269 exceptionOccured = true; 270 LOG.warn("ERROR: ", jex); 271 throw new CommandException(ErrorCode.E0700, jex.getMessage(), jex); 272 } 273 catch (CoordinatorJobException cex) { 274 exceptionOccured = true; 275 LOG.warn("ERROR: ", cex); 276 throw new CommandException(cex); 277 } 278 catch (ParameterVerifierException pex) { 279 exceptionOccured = true; 280 LOG.warn("ERROR: ", pex); 281 throw new CommandException(pex); 282 } 283 catch (IllegalArgumentException iex) { 284 exceptionOccured = true; 285 LOG.warn("ERROR: ", iex); 286 throw new CommandException(ErrorCode.E1003, iex.getMessage(), iex); 287 } 288 catch (Exception ex) { 289 exceptionOccured = true; 290 LOG.warn("ERROR: ", ex); 291 throw new CommandException(ErrorCode.E0803, ex.getMessage(), ex); 292 } 293 finally { 294 if (exceptionOccured) { 295 if(coordJob.getId() == null || coordJob.getId().equalsIgnoreCase("")){ 296 coordJob.setStatus(CoordinatorJob.Status.FAILED); 297 coordJob.resetPending(); 298 } 299 } 300 } 301 302 LOG.info("ENDED Coordinator Submit jobId=" + jobId); 303 return jobId; 304 } 305 306 /** 307 * Method that validates values in the definition for correctness. Placeholder to add more. 308 */ 309 private void validateCoordinatorJob() { 310 // check if startTime < endTime 311 if (coordJob.getStartTime().after(coordJob.getEndTime())) { 312 throw new IllegalArgumentException("Coordinator Start Time cannot be greater than End Time."); 313 } 314 } 315 316 /* 317 * Check against multiple data instance values inside a single <instance> <start-instance> or <end-instance> tag 318 * If found, the job is not submitted and user is informed to correct the error, instead of defaulting to the first instance value in the list 319 */ 320 private void checkMultipleTimeInstances(Element eCoordJob, String eventType, String dataType) throws CoordinatorJobException { 321 Element eventsSpec, dataSpec, instance; 322 List<Element> instanceSpecList; 323 Namespace ns = eCoordJob.getNamespace(); 324 String instanceValue; 325 eventsSpec = eCoordJob.getChild(eventType, ns); 326 if (eventsSpec != null) { 327 dataSpec = eventsSpec.getChild(dataType, ns); 328 if (dataSpec != null) { 329 // In case of input-events, there can be multiple child <instance> datasets. Iterating to ensure none of them have errors 330 instanceSpecList = dataSpec.getChildren("instance", ns); 331 Iterator instanceIter = instanceSpecList.iterator(); 332 while(instanceIter.hasNext()) { 333 instance = ((Element) instanceIter.next()); 334 if(instance.getContentSize() == 0) { //empty string or whitespace 335 throw new CoordinatorJobException(ErrorCode.E1021, "<instance> tag within " + eventType + " is empty!"); 336 } 337 instanceValue = instance.getContent(0).toString(); 338 boolean isInvalid = false; 339 try { 340 isInvalid = evalAction.checkForExistence(instanceValue, ","); 341 } catch (Exception e) { 342 handleELParseException(eventType, dataType, instanceValue); 343 } 344 if (isInvalid) { // reaching this block implies instance is not empty i.e. length > 0 345 handleExpresionWithMultipleInstances(eventType, dataType, instanceValue); 346 } 347 } 348 349 // In case of input-events, there can be multiple child <start-instance> datasets. Iterating to ensure none of them have errors 350 instanceSpecList = dataSpec.getChildren("start-instance", ns); 351 instanceIter = instanceSpecList.iterator(); 352 while(instanceIter.hasNext()) { 353 instance = ((Element) instanceIter.next()); 354 if(instance.getContentSize() == 0) { //empty string or whitespace 355 throw new CoordinatorJobException(ErrorCode.E1021, "<start-instance> tag within " + eventType + " is empty!"); 356 } 357 instanceValue = instance.getContent(0).toString(); 358 boolean isInvalid = false; 359 try { 360 isInvalid = evalAction.checkForExistence(instanceValue, ","); 361 } catch (Exception e) { 362 handleELParseException(eventType, dataType, instanceValue); 363 } 364 if (isInvalid) { // reaching this block implies start instance is not empty i.e. length > 0 365 handleExpresionWithStartMultipleInstances(eventType, dataType, instanceValue); 366 } 367 } 368 369 // In case of input-events, there can be multiple child <end-instance> datasets. Iterating to ensure none of them have errors 370 instanceSpecList = dataSpec.getChildren("end-instance", ns); 371 instanceIter = instanceSpecList.iterator(); 372 while(instanceIter.hasNext()) { 373 instance = ((Element) instanceIter.next()); 374 if(instance.getContentSize() == 0) { //empty string or whitespace 375 throw new CoordinatorJobException(ErrorCode.E1021, "<end-instance> tag within " + eventType + " is empty!"); 376 } 377 instanceValue = instance.getContent(0).toString(); 378 boolean isInvalid = false; 379 try { 380 isInvalid = evalAction.checkForExistence(instanceValue, ","); 381 } catch (Exception e) { 382 handleELParseException(eventType, dataType, instanceValue); 383 } 384 if (isInvalid) { // reaching this block implies instance is not empty i.e. length > 0 385 handleExpresionWithMultipleEndInstances(eventType, dataType, instanceValue); 386 } 387 } 388 389 } 390 } 391 } 392 393 private void handleELParseException(String eventType, String dataType, String instanceValue) 394 throws CoordinatorJobException { 395 String correctAction = null; 396 if(dataType.equals(COORD_INPUT_EVENTS_DATA_IN)) { 397 correctAction = "Coordinator app definition should have valid <instance> tag for data-in"; 398 } else if(dataType.equals(COORD_OUTPUT_EVENTS_DATA_OUT)) { 399 correctAction = "Coordinator app definition should have valid <instance> tag for data-out"; 400 } 401 throw new CoordinatorJobException(ErrorCode.E1021, eventType + " instance '" + instanceValue 402 + "' is not valid. Coordinator job NOT SUBMITTED. " + correctAction); 403 } 404 405 private void handleExpresionWithMultipleInstances(String eventType, String dataType, String instanceValue) 406 throws CoordinatorJobException { 407 String correctAction = null; 408 if(dataType.equals(COORD_INPUT_EVENTS_DATA_IN)) { 409 correctAction = "Coordinator app definition should have separate <instance> tag per data-in instance"; 410 } else if(dataType.equals(COORD_OUTPUT_EVENTS_DATA_OUT)) { 411 correctAction = "Coordinator app definition can have only one <instance> tag per data-out instance"; 412 } 413 throw new CoordinatorJobException(ErrorCode.E1021, eventType + " instance '" + instanceValue 414 + "' contains more than one date instance. Coordinator job NOT SUBMITTED. " + correctAction); 415 } 416 417 private void handleExpresionWithStartMultipleInstances(String eventType, String dataType, String instanceValue) 418 throws CoordinatorJobException { 419 String correctAction = "Coordinator app definition should not have multiple start-instances"; 420 throw new CoordinatorJobException(ErrorCode.E1021, eventType + " start-instance '" + instanceValue 421 + "' contains more than one date start-instance. Coordinator job NOT SUBMITTED. " + correctAction); 422 } 423 424 private void handleExpresionWithMultipleEndInstances(String eventType, String dataType, String instanceValue) 425 throws CoordinatorJobException { 426 String correctAction = "Coordinator app definition should not have multiple end-instances"; 427 throw new CoordinatorJobException(ErrorCode.E1021, eventType + " end-instance '" + instanceValue 428 + "' contains more than one date end-instance. Coordinator job NOT SUBMITTED. " + correctAction); 429 } 430 431 /** 432 * Read the application XML and validate against coordinator Schema 433 * 434 * @return validated coordinator XML 435 * @throws CoordinatorJobException thrown if unable to read or validate coordinator xml 436 */ 437 private String readAndValidateXml() throws CoordinatorJobException { 438 String appPath = ParamChecker.notEmpty(conf.get(OozieClient.COORDINATOR_APP_PATH), 439 OozieClient.COORDINATOR_APP_PATH); 440 String coordXml = readDefinition(appPath); 441 validateXml(coordXml); 442 return coordXml; 443 } 444 445 /** 446 * Validate against Coordinator XSD file 447 * 448 * @param xmlContent : Input coordinator xml 449 * @throws CoordinatorJobException thrown if unable to validate coordinator xml 450 */ 451 private void validateXml(String xmlContent) throws CoordinatorJobException { 452 javax.xml.validation.Schema schema = Services.get().get(SchemaService.class).getSchema(SchemaName.COORDINATOR); 453 Validator validator = schema.newValidator(); 454 try { 455 validator.validate(new StreamSource(new StringReader(xmlContent))); 456 } 457 catch (SAXException ex) { 458 LOG.warn("SAXException :", ex); 459 throw new CoordinatorJobException(ErrorCode.E0701, ex.getMessage(), ex); 460 } 461 catch (IOException ex) { 462 LOG.warn("IOException :", ex); 463 throw new CoordinatorJobException(ErrorCode.E0702, ex.getMessage(), ex); 464 } 465 } 466 467 /** 468 * Read the application XML schema namespace 469 * 470 * @param coordXmlElement input coordinator xml Element 471 * @return app xml namespace 472 * @throws CoordinatorJobException 473 */ 474 private String readAppNamespace(Element coordXmlElement) throws CoordinatorJobException { 475 Namespace ns = coordXmlElement.getNamespace(); 476 if (ns != null && bundleId != null && ns.getURI().equals(SchemaService.COORDINATOR_NAMESPACE_URI_1)) { 477 throw new CoordinatorJobException(ErrorCode.E1319, "bundle app can not submit coordinator namespace " 478 + SchemaService.COORDINATOR_NAMESPACE_URI_1 + ", please use 0.2 or later"); 479 } 480 if (ns != null) { 481 return ns.getURI(); 482 } 483 else { 484 throw new CoordinatorJobException(ErrorCode.E0700, "the application xml namespace is not given"); 485 } 486 } 487 488 /** 489 * Merge default configuration with user-defined configuration. 490 * 491 * @throws CommandException thrown if failed to read or merge configurations 492 */ 493 protected void mergeDefaultConfig() throws CommandException { 494 Path configDefault = null; 495 try { 496 String coordAppPathStr = conf.get(OozieClient.COORDINATOR_APP_PATH); 497 Path coordAppPath = new Path(coordAppPathStr); 498 String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME); 499 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 500 Configuration fsConf = has.createJobConf(coordAppPath.toUri().getAuthority()); 501 FileSystem fs = has.createFileSystem(user, coordAppPath.toUri(), fsConf); 502 503 // app path could be a directory 504 if (!fs.isFile(coordAppPath)) { 505 configDefault = new Path(coordAppPath, CONFIG_DEFAULT); 506 } else { 507 configDefault = new Path(coordAppPath.getParent(), CONFIG_DEFAULT); 508 } 509 510 if (fs.exists(configDefault)) { 511 Configuration defaultConf = new XConfiguration(fs.open(configDefault)); 512 PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES); 513 XConfiguration.injectDefaults(defaultConf, conf); 514 } 515 else { 516 LOG.info("configDefault Doesn't exist " + configDefault); 517 } 518 PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES); 519 520 // Resolving all variables in the job properties. 521 // This ensures the Hadoop Configuration semantics is preserved. 522 XConfiguration resolvedVarsConf = new XConfiguration(); 523 for (Map.Entry<String, String> entry : conf) { 524 resolvedVarsConf.set(entry.getKey(), conf.get(entry.getKey())); 525 } 526 conf = resolvedVarsConf; 527 } 528 catch (IOException e) { 529 throw new CommandException(ErrorCode.E0702, e.getMessage() + " : Problem reading default config " 530 + configDefault, e); 531 } 532 catch (HadoopAccessorException e) { 533 throw new CommandException(e); 534 } 535 LOG.debug("Merged CONF :" + XmlUtils.prettyPrint(conf).toString()); 536 } 537 538 /** 539 * The method resolve all the variables that are defined in configuration. It also include the data set definition 540 * from dataset file into XML. 541 * 542 * @param appXml : Original job XML 543 * @param conf : Configuration of the job 544 * @param coordJob : Coordinator job bean to be populated. 545 * @return Resolved and modified job XML element. 546 * @throws CoordinatorJobException thrown if failed to resolve basic entities or include referred datasets 547 * @throws Exception thrown if failed to resolve basic entities or include referred datasets 548 */ 549 public Element basicResolveAndIncludeDS(String appXml, Configuration conf, CoordinatorJobBean coordJob) 550 throws CoordinatorJobException, Exception { 551 Element basicResolvedApp = resolveInitial(conf, appXml, coordJob); 552 includeDataSets(basicResolvedApp, conf); 553 return basicResolvedApp; 554 } 555 556 /** 557 * Insert data set into data-in and data-out tags. 558 * 559 * @param eAppXml : coordinator application XML 560 * @param eDatasets : DataSet XML 561 */ 562 @SuppressWarnings("unchecked") 563 private void insertDataSet(Element eAppXml, Element eDatasets) { 564 // Adding DS definition in the coordinator XML 565 Element inputList = eAppXml.getChild("input-events", eAppXml.getNamespace()); 566 if (inputList != null) { 567 for (Element dataIn : (List<Element>) inputList.getChildren("data-in", eAppXml.getNamespace())) { 568 Element eDataset = findDataSet(eDatasets, dataIn.getAttributeValue("dataset")); 569 dataIn.getContent().add(0, eDataset); 570 } 571 } 572 Element outputList = eAppXml.getChild("output-events", eAppXml.getNamespace()); 573 if (outputList != null) { 574 for (Element dataOut : (List<Element>) outputList.getChildren("data-out", eAppXml.getNamespace())) { 575 Element eDataset = findDataSet(eDatasets, dataOut.getAttributeValue("dataset")); 576 dataOut.getContent().add(0, eDataset); 577 } 578 } 579 } 580 581 /** 582 * Find a specific dataset from a list of Datasets. 583 * 584 * @param eDatasets : List of data sets 585 * @param name : queried data set name 586 * @return one Dataset element. otherwise throw Exception 587 */ 588 @SuppressWarnings("unchecked") 589 private static Element findDataSet(Element eDatasets, String name) { 590 for (Element eDataset : (List<Element>) eDatasets.getChildren("dataset", eDatasets.getNamespace())) { 591 if (eDataset.getAttributeValue("name").equals(name)) { 592 eDataset = (Element) eDataset.clone(); 593 eDataset.detach(); 594 return eDataset; 595 } 596 } 597 throw new RuntimeException("undefined dataset: " + name); 598 } 599 600 /** 601 * Initialize all the required EL Evaluators. 602 */ 603 protected void initEvaluators() { 604 evalFreq = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-submit-freq"); 605 evalNofuncs = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-submit-nofuncs"); 606 evalInst = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-submit-instances"); 607 evalSla = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-sla-submit"); 608 evalAction = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-action-start"); 609 } 610 611 /** 612 * Resolve basic entities using job Configuration. 613 * 614 * @param conf :Job configuration 615 * @param appXml : Original job XML 616 * @param coordJob : Coordinator job bean to be populated. 617 * @return Resolved job XML element. 618 * @throws CoordinatorJobException thrown if failed to resolve basic entities 619 * @throws Exception thrown if failed to resolve basic entities 620 */ 621 @SuppressWarnings("unchecked") 622 protected Element resolveInitial(Configuration conf, String appXml, CoordinatorJobBean coordJob) 623 throws CoordinatorJobException, Exception { 624 Element eAppXml = XmlUtils.parseXml(appXml); 625 // job's main attributes 626 // frequency 627 String val = resolveAttribute("frequency", eAppXml, evalFreq); 628 int ival = ParamChecker.checkInteger(val, "frequency"); 629 ParamChecker.checkGTZero(ival, "frequency"); 630 coordJob.setFrequency(Integer.toString(ival)); 631 TimeUnit tmp = (evalFreq.getVariable("timeunit") == null) ? TimeUnit.MINUTE : ((TimeUnit) evalFreq 632 .getVariable("timeunit")); 633 addAnAttribute("freq_timeunit", eAppXml, tmp.toString()); 634 // TimeUnit 635 coordJob.setTimeUnit(CoordinatorJob.Timeunit.valueOf(tmp.toString())); 636 // End Of Duration 637 tmp = evalFreq.getVariable("endOfDuration") == null ? TimeUnit.NONE : ((TimeUnit) evalFreq 638 .getVariable("endOfDuration")); 639 addAnAttribute("end_of_duration", eAppXml, tmp.toString()); 640 // coordJob.setEndOfDuration(tmp) // TODO: Add new attribute in Job bean 641 642 // Application name 643 if (this.coordName == null) { 644 String name = ELUtils.resolveAppName(eAppXml.getAttribute("name").getValue(), conf); 645 coordJob.setAppName(name); 646 } 647 else { 648 // this coord job is created from bundle 649 coordJob.setAppName(this.coordName); 650 } 651 652 // start time 653 val = resolveAttribute("start", eAppXml, evalNofuncs); 654 ParamChecker.checkDateOozieTZ(val, "start"); 655 coordJob.setStartTime(DateUtils.parseDateOozieTZ(val)); 656 // end time 657 val = resolveAttribute("end", eAppXml, evalNofuncs); 658 ParamChecker.checkDateOozieTZ(val, "end"); 659 coordJob.setEndTime(DateUtils.parseDateOozieTZ(val)); 660 // Time zone 661 val = resolveAttribute("timezone", eAppXml, evalNofuncs); 662 ParamChecker.checkTimeZone(val, "timezone"); 663 coordJob.setTimeZone(val); 664 665 // controls 666 val = resolveTagContents("timeout", eAppXml.getChild("controls", eAppXml.getNamespace()), evalFreq); 667 if (val == "") { 668 val = Services.get().getConf().get(CONF_DEFAULT_TIMEOUT_NORMAL); 669 } 670 671 ival = ParamChecker.checkInteger(val, "timeout"); 672 if (ival < 0 || ival > Services.get().getConf().getInt(CONF_DEFAULT_MAX_TIMEOUT, 129600)) { 673 ival = Services.get().getConf().getInt(CONF_DEFAULT_MAX_TIMEOUT, 129600); 674 } 675 coordJob.setTimeout(ival); 676 677 val = resolveTagContents("concurrency", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs); 678 if (val == null || val.isEmpty()) { 679 val = Services.get().getConf().get(CONF_DEFAULT_CONCURRENCY, "1"); 680 } 681 ival = ParamChecker.checkInteger(val, "concurrency"); 682 coordJob.setConcurrency(ival); 683 684 val = resolveTagContents("throttle", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs); 685 if (val == null || val.isEmpty()) { 686 int defaultThrottle = Services.get().getConf().getInt(CONF_DEFAULT_THROTTLE, 12); 687 ival = defaultThrottle; 688 } 689 else { 690 ival = ParamChecker.checkInteger(val, "throttle"); 691 } 692 int maxQueue = Services.get().getConf().getInt(CONF_QUEUE_SIZE, 10000); 693 float factor = Services.get().getConf().getFloat(CONF_MAT_THROTTLING_FACTOR, 0.10f); 694 int maxThrottle = (int) (maxQueue * factor); 695 if (ival > maxThrottle || ival < 1) { 696 ival = maxThrottle; 697 } 698 LOG.debug("max throttle " + ival); 699 coordJob.setMatThrottling(ival); 700 701 val = resolveTagContents("execution", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs); 702 if (val == "") { 703 val = Execution.FIFO.toString(); 704 } 705 coordJob.setExecution(Execution.valueOf(val)); 706 String[] acceptedVals = { Execution.LIFO.toString(), Execution.FIFO.toString(), Execution.LAST_ONLY.toString() }; 707 ParamChecker.isMember(val, acceptedVals, "execution"); 708 709 // datasets 710 resolveTagContents("include", eAppXml.getChild("datasets", eAppXml.getNamespace()), evalNofuncs); 711 // for each data set 712 resolveDataSets(eAppXml); 713 HashMap<String, String> dataNameList = new HashMap<String, String>(); 714 resolveIOEvents(eAppXml, dataNameList); 715 716 resolveTagContents("app-path", eAppXml.getChild("action", eAppXml.getNamespace()).getChild("workflow", 717 eAppXml.getNamespace()), evalNofuncs); 718 // TODO: If action or workflow tag is missing, NullPointerException will 719 // occur 720 Element configElem = eAppXml.getChild("action", eAppXml.getNamespace()).getChild("workflow", 721 eAppXml.getNamespace()).getChild("configuration", eAppXml.getNamespace()); 722 evalData = CoordELEvaluator.createELEvaluatorForDataEcho(conf, "coord-job-submit-data", dataNameList); 723 if (configElem != null) { 724 for (Element propElem : (List<Element>) configElem.getChildren("property", configElem.getNamespace())) { 725 resolveTagContents("name", propElem, evalData); 726 // Want to check the data-integrity but don't want to modify the 727 // XML 728 // for properties only 729 Element tmpProp = (Element) propElem.clone(); 730 resolveTagContents("value", tmpProp, evalData); 731 } 732 } 733 resolveSLA(eAppXml, coordJob); 734 return eAppXml; 735 } 736 737 /** 738 * Resolve SLA events 739 * 740 * @param eAppXml job XML 741 * @param coordJob coordinator job bean 742 * @throws CommandException thrown if failed to resolve sla events 743 */ 744 private void resolveSLA(Element eAppXml, CoordinatorJobBean coordJob) throws CommandException { 745 Element eSla = XmlUtils.getSLAElement(eAppXml.getChild("action", eAppXml.getNamespace())); 746 747 if (eSla != null) { 748 String slaXml = XmlUtils.prettyPrint(eSla).toString(); 749 try { 750 // EL evaluation 751 slaXml = evalSla.evaluate(slaXml, String.class); 752 // Validate against semantic SXD 753 XmlUtils.validateData(slaXml, SchemaName.SLA_ORIGINAL); 754 } 755 catch (Exception e) { 756 throw new CommandException(ErrorCode.E1004, "Validation ERROR :" + e.getMessage(), e); 757 } 758 } 759 } 760 761 /** 762 * Resolve input-events/data-in and output-events/data-out tags. 763 * 764 * @param eJob : Job element 765 * @throws CoordinatorJobException thrown if failed to resolve input and output events 766 */ 767 @SuppressWarnings("unchecked") 768 private void resolveIOEvents(Element eJobOrg, HashMap<String, String> dataNameList) throws CoordinatorJobException { 769 // Resolving input-events/data-in 770 // Clone the job and don't update anything in the original 771 Element eJob = (Element) eJobOrg.clone(); 772 Element inputList = eJob.getChild("input-events", eJob.getNamespace()); 773 if (inputList != null) { 774 TreeSet<String> eventNameSet = new TreeSet<String>(); 775 for (Element dataIn : (List<Element>) inputList.getChildren("data-in", eJob.getNamespace())) { 776 String dataInName = dataIn.getAttributeValue("name"); 777 dataNameList.put(dataInName, "data-in"); 778 // check whether there is any duplicate data-in name 779 if (eventNameSet.contains(dataInName)) { 780 throw new RuntimeException("Duplicate dataIn name " + dataInName); 781 } 782 else { 783 eventNameSet.add(dataInName); 784 } 785 resolveTagContents("instance", dataIn, evalInst); 786 resolveTagContents("start-instance", dataIn, evalInst); 787 resolveTagContents("end-instance", dataIn, evalInst); 788 } 789 } 790 // Resolving output-events/data-out 791 Element outputList = eJob.getChild("output-events", eJob.getNamespace()); 792 if (outputList != null) { 793 TreeSet<String> eventNameSet = new TreeSet<String>(); 794 for (Element dataOut : (List<Element>) outputList.getChildren("data-out", eJob.getNamespace())) { 795 String dataOutName = dataOut.getAttributeValue("name"); 796 dataNameList.put(dataOutName, "data-out"); 797 // check whether there is any duplicate data-out name 798 if (eventNameSet.contains(dataOutName)) { 799 throw new RuntimeException("Duplicate dataIn name " + dataOutName); 800 } 801 else { 802 eventNameSet.add(dataOutName); 803 } 804 resolveTagContents("instance", dataOut, evalInst); 805 } 806 } 807 808 } 809 810 /** 811 * Add an attribute into XML element. 812 * 813 * @param attrName :attribute name 814 * @param elem : Element to add attribute 815 * @param value :Value of attribute 816 */ 817 private void addAnAttribute(String attrName, Element elem, String value) { 818 elem.setAttribute(attrName, value); 819 } 820 821 /** 822 * Resolve datasets using job configuration. 823 * 824 * @param eAppXml : Job Element XML 825 * @throws Exception thrown if failed to resolve datasets 826 */ 827 @SuppressWarnings("unchecked") 828 private void resolveDataSets(Element eAppXml) throws Exception { 829 Element datasetList = eAppXml.getChild("datasets", eAppXml.getNamespace()); 830 if (datasetList != null) { 831 832 List<Element> dsElems = datasetList.getChildren("dataset", eAppXml.getNamespace()); 833 resolveDataSets(dsElems); 834 resolveTagContents("app-path", eAppXml.getChild("action", eAppXml.getNamespace()).getChild("workflow", 835 eAppXml.getNamespace()), evalNofuncs); 836 } 837 } 838 839 /** 840 * Resolve datasets using job configuration. 841 * 842 * @param dsElems : Data set XML element. 843 * @throws CoordinatorJobException thrown if failed to resolve datasets 844 */ 845 private void resolveDataSets(List<Element> dsElems) throws CoordinatorJobException { 846 for (Element dsElem : dsElems) { 847 // Setting up default TimeUnit and EndOFDuraion 848 evalFreq.setVariable("timeunit", TimeUnit.MINUTE); 849 evalFreq.setVariable("endOfDuration", TimeUnit.NONE); 850 851 String val = resolveAttribute("frequency", dsElem, evalFreq); 852 int ival = ParamChecker.checkInteger(val, "frequency"); 853 ParamChecker.checkGTZero(ival, "frequency"); 854 addAnAttribute("freq_timeunit", dsElem, evalFreq.getVariable("timeunit") == null ? TimeUnit.MINUTE 855 .toString() : ((TimeUnit) evalFreq.getVariable("timeunit")).toString()); 856 addAnAttribute("end_of_duration", dsElem, evalFreq.getVariable("endOfDuration") == null ? TimeUnit.NONE 857 .toString() : ((TimeUnit) evalFreq.getVariable("endOfDuration")).toString()); 858 val = resolveAttribute("initial-instance", dsElem, evalNofuncs); 859 ParamChecker.checkDateOozieTZ(val, "initial-instance"); 860 checkInitialInstance(val); 861 val = resolveAttribute("timezone", dsElem, evalNofuncs); 862 ParamChecker.checkTimeZone(val, "timezone"); 863 resolveTagContents("uri-template", dsElem, evalNofuncs); 864 resolveTagContents("done-flag", dsElem, evalNofuncs); 865 } 866 } 867 868 /** 869 * Resolve the content of a tag. 870 * 871 * @param tagName : Tag name of job XML i.e. <timeout> 10 </timeout> 872 * @param elem : Element where the tag exists. 873 * @param eval : EL evealuator 874 * @return Resolved tag content. 875 * @throws CoordinatorJobException thrown if failed to resolve tag content 876 */ 877 @SuppressWarnings("unchecked") 878 private String resolveTagContents(String tagName, Element elem, ELEvaluator eval) throws CoordinatorJobException { 879 String ret = ""; 880 if (elem != null) { 881 for (Element tagElem : (List<Element>) elem.getChildren(tagName, elem.getNamespace())) { 882 if (tagElem != null) { 883 String updated; 884 try { 885 updated = CoordELFunctions.evalAndWrap(eval, tagElem.getText().trim()); 886 887 } 888 catch (Exception e) { 889 throw new CoordinatorJobException(ErrorCode.E1004, e.getMessage(), e); 890 } 891 tagElem.removeContent(); 892 tagElem.addContent(updated); 893 ret += updated; 894 } 895 } 896 } 897 return ret; 898 } 899 900 /** 901 * Resolve an attribute value. 902 * 903 * @param attrName : Attribute name. 904 * @param elem : XML Element where attribute is defiend 905 * @param eval : ELEvaluator used to resolve 906 * @return Resolved attribute value 907 * @throws CoordinatorJobException thrown if failed to resolve an attribute value 908 */ 909 private String resolveAttribute(String attrName, Element elem, ELEvaluator eval) throws CoordinatorJobException { 910 Attribute attr = elem.getAttribute(attrName); 911 String val = null; 912 if (attr != null) { 913 try { 914 val = CoordELFunctions.evalAndWrap(eval, attr.getValue().trim()); 915 } 916 catch (Exception e) { 917 throw new CoordinatorJobException(ErrorCode.E1004, e.getMessage(), e); 918 } 919 attr.setValue(val); 920 } 921 return val; 922 } 923 924 /** 925 * Include referred datasets into XML. 926 * 927 * @param resolvedXml : Job XML element. 928 * @param conf : Job configuration 929 * @throws CoordinatorJobException thrown if failed to include referred datasets into XML 930 */ 931 @SuppressWarnings("unchecked") 932 protected void includeDataSets(Element resolvedXml, Configuration conf) throws CoordinatorJobException { 933 Element datasets = resolvedXml.getChild("datasets", resolvedXml.getNamespace()); 934 Element allDataSets = new Element("all_datasets", resolvedXml.getNamespace()); 935 List<String> dsList = new ArrayList<String>(); 936 if (datasets != null) { 937 for (Element includeElem : (List<Element>) datasets.getChildren("include", datasets.getNamespace())) { 938 String incDSFile = includeElem.getTextTrim(); 939 includeOneDSFile(incDSFile, dsList, allDataSets, datasets.getNamespace()); 940 } 941 for (Element e : (List<Element>) datasets.getChildren("dataset", datasets.getNamespace())) { 942 String dsName = e.getAttributeValue("name"); 943 if (dsList.contains(dsName)) {// Override with this DS 944 // Remove duplicate 945 removeDataSet(allDataSets, dsName); 946 } 947 else { 948 dsList.add(dsName); 949 } 950 allDataSets.addContent((Element) e.clone()); 951 } 952 } 953 insertDataSet(resolvedXml, allDataSets); 954 resolvedXml.removeChild("datasets", resolvedXml.getNamespace()); 955 } 956 957 /** 958 * Include one dataset file. 959 * 960 * @param incDSFile : Include data set filename. 961 * @param dsList :List of dataset names to verify the duplicate. 962 * @param allDataSets : Element that includes all dataset definitions. 963 * @param dsNameSpace : Data set name space 964 * @throws CoordinatorJobException thrown if failed to include one dataset file 965 */ 966 @SuppressWarnings("unchecked") 967 private void includeOneDSFile(String incDSFile, List<String> dsList, Element allDataSets, Namespace dsNameSpace) 968 throws CoordinatorJobException { 969 Element tmpDataSets = null; 970 try { 971 String dsXml = readDefinition(incDSFile); 972 LOG.debug("DSFILE :" + incDSFile + "\n" + dsXml); 973 tmpDataSets = XmlUtils.parseXml(dsXml); 974 } 975 catch (JDOMException e) { 976 LOG.warn("Error parsing included dataset [{0}]. Message [{1}]", incDSFile, e.getMessage()); 977 throw new CoordinatorJobException(ErrorCode.E0700, e.getMessage()); 978 } 979 resolveDataSets(tmpDataSets.getChildren("dataset")); 980 for (Element e : (List<Element>) tmpDataSets.getChildren("dataset")) { 981 String dsName = e.getAttributeValue("name"); 982 if (dsList.contains(dsName)) { 983 throw new RuntimeException("Duplicate Dataset " + dsName); 984 } 985 dsList.add(dsName); 986 Element tmp = (Element) e.clone(); 987 // TODO: Don't like to over-write the external/include DS's namespace 988 tmp.setNamespace(dsNameSpace); 989 tmp.getChild("uri-template").setNamespace(dsNameSpace); 990 if (e.getChild("done-flag") != null) { 991 tmp.getChild("done-flag").setNamespace(dsNameSpace); 992 } 993 allDataSets.addContent(tmp); 994 } 995 // nested include 996 for (Element includeElem : (List<Element>) tmpDataSets.getChildren("include", tmpDataSets.getNamespace())) { 997 String incFile = includeElem.getTextTrim(); 998 includeOneDSFile(incFile, dsList, allDataSets, dsNameSpace); 999 } 1000 } 1001 1002 /** 1003 * Remove a dataset from a list of dataset. 1004 * 1005 * @param eDatasets : List of dataset 1006 * @param name : Dataset name to be removed. 1007 */ 1008 @SuppressWarnings("unchecked") 1009 private static void removeDataSet(Element eDatasets, String name) { 1010 for (Element eDataset : (List<Element>) eDatasets.getChildren("dataset", eDatasets.getNamespace())) { 1011 if (eDataset.getAttributeValue("name").equals(name)) { 1012 eDataset.detach(); 1013 return; 1014 } 1015 } 1016 throw new RuntimeException("undefined dataset: " + name); 1017 } 1018 1019 /** 1020 * Read coordinator definition. 1021 * 1022 * @param appPath application path. 1023 * @return coordinator definition. 1024 * @throws CoordinatorJobException thrown if the definition could not be read. 1025 */ 1026 protected String readDefinition(String appPath) throws CoordinatorJobException { 1027 String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME); 1028 // Configuration confHadoop = CoordUtils.getHadoopConf(conf); 1029 try { 1030 URI uri = new URI(appPath); 1031 LOG.debug("user =" + user); 1032 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 1033 Configuration fsConf = has.createJobConf(uri.getAuthority()); 1034 FileSystem fs = has.createFileSystem(user, uri, fsConf); 1035 Path appDefPath = null; 1036 1037 // app path could be a directory 1038 Path path = new Path(uri.getPath()); 1039 // check file exists for dataset include file, app xml already checked 1040 if (!fs.exists(path)) { 1041 throw new URISyntaxException(path.toString(), "path not existed : " + path.toString()); 1042 } 1043 if (!fs.isFile(path)) { 1044 appDefPath = new Path(path, COORDINATOR_XML_FILE); 1045 } else { 1046 appDefPath = path; 1047 } 1048 1049 Reader reader = new InputStreamReader(fs.open(appDefPath)); 1050 StringWriter writer = new StringWriter(); 1051 IOUtils.copyCharStream(reader, writer); 1052 return writer.toString(); 1053 } 1054 catch (IOException ex) { 1055 LOG.warn("IOException :" + XmlUtils.prettyPrint(conf), ex); 1056 throw new CoordinatorJobException(ErrorCode.E1001, ex.getMessage(), ex); 1057 } 1058 catch (URISyntaxException ex) { 1059 LOG.warn("URISyException :" + ex.getMessage()); 1060 throw new CoordinatorJobException(ErrorCode.E1002, appPath, ex.getMessage(), ex); 1061 } 1062 catch (HadoopAccessorException ex) { 1063 throw new CoordinatorJobException(ex); 1064 } 1065 catch (Exception ex) { 1066 LOG.warn("Exception :", ex); 1067 throw new CoordinatorJobException(ErrorCode.E1001, ex.getMessage(), ex); 1068 } 1069 } 1070 1071 /** 1072 * Write a coordinator job into database 1073 * 1074 * @param eJob : XML element of job 1075 * @param coordJob : Coordinator job bean 1076 * @return Job id 1077 * @throws CommandException thrown if unable to save coordinator job to db 1078 */ 1079 private String storeToDB(Element eJob, CoordinatorJobBean coordJob) throws CommandException { 1080 String jobId = Services.get().get(UUIDService.class).generateId(ApplicationType.COORDINATOR); 1081 coordJob.setId(jobId); 1082 1083 coordJob.setAppPath(conf.get(OozieClient.COORDINATOR_APP_PATH)); 1084 coordJob.setCreatedTime(new Date()); 1085 coordJob.setUser(conf.get(OozieClient.USER_NAME)); 1086 String group = ConfigUtils.getWithDeprecatedCheck(conf, OozieClient.JOB_ACL, OozieClient.GROUP_NAME, null); 1087 coordJob.setGroup(group); 1088 coordJob.setConf(XmlUtils.prettyPrint(conf).toString()); 1089 coordJob.setJobXml(XmlUtils.prettyPrint(eJob).toString()); 1090 coordJob.setLastActionNumber(0); 1091 coordJob.setLastModifiedTime(new Date()); 1092 1093 if (!dryrun) { 1094 coordJob.setLastModifiedTime(new Date()); 1095 try { 1096 jpaService.execute(new CoordJobInsertJPAExecutor(coordJob)); 1097 } 1098 catch (JPAExecutorException jpaee) { 1099 coordJob.setId(null); 1100 coordJob.setStatus(CoordinatorJob.Status.FAILED); 1101 throw new CommandException(jpaee); 1102 } 1103 } 1104 return jobId; 1105 } 1106 1107 /* 1108 * this method checks if the initial-instance specified for a particular 1109 is not a date earlier than the oozie server default Jan 01, 1970 00:00Z UTC 1110 */ 1111 private void checkInitialInstance(String val) throws CoordinatorJobException, IllegalArgumentException { 1112 Date initialInstance, givenInstance; 1113 try { 1114 initialInstance = DateUtils.parseDateUTC("1970-01-01T00:00Z"); 1115 givenInstance = DateUtils.parseDateOozieTZ(val); 1116 } 1117 catch (Exception e) { 1118 throw new IllegalArgumentException("Unable to parse dataset initial-instance string '" + val + 1119 "' to Date object. ",e); 1120 } 1121 if(givenInstance.compareTo(initialInstance) < 0) { 1122 throw new CoordinatorJobException(ErrorCode.E1021, "Dataset initial-instance " + val + 1123 " is earlier than the default initial instance " + DateUtils.formatDateOozieTZ(initialInstance)); 1124 } 1125 } 1126 1127 /* (non-Javadoc) 1128 * @see org.apache.oozie.command.XCommand#getEntityKey() 1129 */ 1130 @Override 1131 public String getEntityKey() { 1132 return null; 1133 } 1134 1135 /* (non-Javadoc) 1136 * @see org.apache.oozie.command.XCommand#isLockRequired() 1137 */ 1138 @Override 1139 protected boolean isLockRequired() { 1140 return false; 1141 } 1142 1143 /* (non-Javadoc) 1144 * @see org.apache.oozie.command.XCommand#loadState() 1145 */ 1146 @Override 1147 protected void loadState() throws CommandException { 1148 jpaService = Services.get().get(JPAService.class); 1149 if (jpaService == null) { 1150 throw new CommandException(ErrorCode.E0610); 1151 } 1152 coordJob = new CoordinatorJobBean(); 1153 if (this.bundleId != null) { 1154 // this coord job is created from bundle 1155 coordJob.setBundleId(this.bundleId); 1156 // first use bundle id if submit thru bundle 1157 logInfo.setParameter(DagXLogInfoService.JOB, this.bundleId); 1158 LogUtils.setLogInfo(logInfo); 1159 } 1160 if (this.coordName != null) { 1161 // this coord job is created from bundle 1162 coordJob.setAppName(this.coordName); 1163 } 1164 setJob(coordJob); 1165 1166 } 1167 1168 /* (non-Javadoc) 1169 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 1170 */ 1171 @Override 1172 protected void verifyPrecondition() throws CommandException { 1173 1174 } 1175 1176 /* (non-Javadoc) 1177 * @see org.apache.oozie.command.TransitionXCommand#notifyParent() 1178 */ 1179 @Override 1180 public void notifyParent() throws CommandException { 1181 // update bundle action 1182 if (coordJob.getBundleId() != null) { 1183 LOG.debug("Updating bundle record: " + coordJob.getBundleId() + " for coord id: " + coordJob.getId()); 1184 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus); 1185 bundleStatusUpdate.call(); 1186 } 1187 } 1188 1189 /* (non-Javadoc) 1190 * @see org.apache.oozie.command.TransitionXCommand#updateJob() 1191 */ 1192 @Override 1193 public void updateJob() throws CommandException { 1194 } 1195 1196 /* (non-Javadoc) 1197 * @see org.apache.oozie.command.TransitionXCommand#getJob() 1198 */ 1199 @Override 1200 public Job getJob() { 1201 return coordJob; 1202 } 1203 1204 @Override 1205 public void performWrites() throws CommandException { 1206 } 1207 }