001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.coord; 019 020 import java.io.IOException; 021 import java.io.InputStreamReader; 022 import java.io.Reader; 023 import java.io.StringReader; 024 import java.io.StringWriter; 025 import java.net.URI; 026 import java.net.URISyntaxException; 027 import java.util.ArrayList; 028 import java.util.Date; 029 import java.util.HashMap; 030 import java.util.HashSet; 031 import java.util.Iterator; 032 import java.util.List; 033 import java.util.Map; 034 import java.util.Set; 035 import java.util.TreeSet; 036 037 import javax.xml.transform.stream.StreamSource; 038 import javax.xml.validation.Validator; 039 040 import org.apache.hadoop.conf.Configuration; 041 import org.apache.hadoop.fs.FileSystem; 042 import org.apache.hadoop.fs.Path; 043 import org.apache.oozie.CoordinatorJobBean; 044 import org.apache.oozie.ErrorCode; 045 import org.apache.oozie.client.CoordinatorJob; 046 import org.apache.oozie.client.Job; 047 import org.apache.oozie.client.OozieClient; 048 import org.apache.oozie.client.CoordinatorJob.Execution; 049 import org.apache.oozie.command.CommandException; 050 import org.apache.oozie.command.SubmitTransitionXCommand; 051 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand; 052 import org.apache.oozie.coord.CoordELEvaluator; 053 import org.apache.oozie.coord.CoordELFunctions; 054 import org.apache.oozie.coord.CoordinatorJobException; 055 import org.apache.oozie.coord.TimeUnit; 056 import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor; 057 import org.apache.oozie.executor.jpa.JPAExecutorException; 058 import org.apache.oozie.service.DagXLogInfoService; 059 import org.apache.oozie.service.HadoopAccessorException; 060 import org.apache.oozie.service.HadoopAccessorService; 061 import org.apache.oozie.service.JPAService; 062 import org.apache.oozie.service.SchemaService; 063 import org.apache.oozie.service.Service; 064 import org.apache.oozie.service.Services; 065 import org.apache.oozie.service.UUIDService; 066 import org.apache.oozie.service.SchemaService.SchemaName; 067 import org.apache.oozie.service.UUIDService.ApplicationType; 068 import org.apache.oozie.util.ConfigUtils; 069 import org.apache.oozie.util.DateUtils; 070 import org.apache.oozie.util.ELEvaluator; 071 import org.apache.oozie.util.ELUtils; 072 import org.apache.oozie.util.IOUtils; 073 import org.apache.oozie.util.InstrumentUtils; 074 import org.apache.oozie.util.LogUtils; 075 import org.apache.oozie.util.ParamChecker; 076 import org.apache.oozie.util.ParameterVerifier; 077 import org.apache.oozie.util.ParameterVerifierException; 078 import org.apache.oozie.util.PropertiesUtils; 079 import org.apache.oozie.util.XConfiguration; 080 import org.apache.oozie.util.XLog; 081 import org.apache.oozie.util.XmlUtils; 082 import org.jdom.Attribute; 083 import org.jdom.Element; 084 import org.jdom.JDOMException; 085 import org.jdom.Namespace; 086 import org.xml.sax.SAXException; 087 088 /** 089 * This class provides the functionalities to resolve a coordinator job XML and write the job information into a DB 090 * table. 091 * <p/> 092 * Specifically it performs the following functions: 1. Resolve all the variables or properties using job 093 * configurations. 2. Insert all datasets definition as part of the <data-in> and <data-out> tags. 3. Validate the XML 094 * at runtime. 095 */ 096 public class CoordSubmitXCommand extends SubmitTransitionXCommand { 097 098 private Configuration conf; 099 private final String authToken; 100 private final String bundleId; 101 private final String coordName; 102 private boolean dryrun; 103 private JPAService jpaService = null; 104 private CoordinatorJob.Status prevStatus = CoordinatorJob.Status.PREP; 105 106 public static final String CONFIG_DEFAULT = "coord-config-default.xml"; 107 public static final String COORDINATOR_XML_FILE = "coordinator.xml"; 108 public final String COORD_INPUT_EVENTS ="input-events"; 109 public final String COORD_OUTPUT_EVENTS = "output-events"; 110 public final String COORD_INPUT_EVENTS_DATA_IN ="data-in"; 111 public final String COORD_OUTPUT_EVENTS_DATA_OUT = "data-out"; 112 113 private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>(); 114 private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>(); 115 116 private CoordinatorJobBean coordJob = null; 117 /** 118 * Default timeout for normal jobs, in minutes, after which coordinator input check will timeout 119 */ 120 public static final String CONF_DEFAULT_TIMEOUT_NORMAL = Service.CONF_PREFIX + "coord.normal.default.timeout"; 121 122 public static final String CONF_DEFAULT_CONCURRENCY = Service.CONF_PREFIX + "coord.default.concurrency"; 123 124 public static final String CONF_DEFAULT_THROTTLE = Service.CONF_PREFIX + "coord.default.throttle"; 125 126 public static final String CONF_MAT_THROTTLING_FACTOR = Service.CONF_PREFIX 127 + "coord.materialization.throttling.factor"; 128 129 /** 130 * Default MAX timeout in minutes, after which coordinator input check will timeout 131 */ 132 public static final String CONF_DEFAULT_MAX_TIMEOUT = Service.CONF_PREFIX + "coord.default.max.timeout"; 133 134 135 public static final String CONF_QUEUE_SIZE = Service.CONF_PREFIX + "CallableQueueService.queue.size"; 136 137 private ELEvaluator evalFreq = null; 138 private ELEvaluator evalNofuncs = null; 139 private ELEvaluator evalData = null; 140 private ELEvaluator evalInst = null; 141 private ELEvaluator evalAction = null; 142 private ELEvaluator evalSla = null; 143 144 static { 145 String[] badUserProps = { PropertiesUtils.YEAR, PropertiesUtils.MONTH, PropertiesUtils.DAY, 146 PropertiesUtils.HOUR, PropertiesUtils.MINUTE, PropertiesUtils.DAYS, PropertiesUtils.HOURS, 147 PropertiesUtils.MINUTES, PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB, 148 PropertiesUtils.TB, PropertiesUtils.PB, PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN, 149 PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN, PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS }; 150 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES); 151 152 String[] badDefaultProps = { PropertiesUtils.HADOOP_USER}; 153 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES); 154 PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES); 155 } 156 157 /** 158 * Constructor to create the Coordinator Submit Command. 159 * 160 * @param conf : Configuration for Coordinator job 161 * @param authToken : To be used for authentication 162 */ 163 public CoordSubmitXCommand(Configuration conf, String authToken) { 164 super("coord_submit", "coord_submit", 1); 165 this.conf = ParamChecker.notNull(conf, "conf"); 166 this.authToken = ParamChecker.notEmpty(authToken, "authToken"); 167 this.bundleId = null; 168 this.coordName = null; 169 } 170 171 /** 172 * Constructor to create the Coordinator Submit Command by bundle job. 173 * 174 * @param conf : Configuration for Coordinator job 175 * @param authToken : To be used for authentication 176 * @param bundleId : bundle id 177 * @param coordName : coord name 178 */ 179 public CoordSubmitXCommand(Configuration conf, String authToken, String bundleId, String coordName) { 180 super("coord_submit", "coord_submit", 1); 181 this.conf = ParamChecker.notNull(conf, "conf"); 182 this.authToken = ParamChecker.notEmpty(authToken, "authToken"); 183 this.bundleId = ParamChecker.notEmpty(bundleId, "bundleId"); 184 this.coordName = ParamChecker.notEmpty(coordName, "coordName"); 185 } 186 187 /** 188 * Constructor to create the Coordinator Submit Command. 189 * 190 * @param dryrun : if dryrun 191 * @param conf : Configuration for Coordinator job 192 * @param authToken : To be used for authentication 193 */ 194 public CoordSubmitXCommand(boolean dryrun, Configuration conf, String authToken) { 195 this(conf, authToken); 196 this.dryrun = dryrun; 197 } 198 199 /* (non-Javadoc) 200 * @see org.apache.oozie.command.XCommand#execute() 201 */ 202 @Override 203 protected String submit() throws CommandException { 204 String jobId = null; 205 LOG.info("STARTED Coordinator Submit"); 206 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation()); 207 208 boolean exceptionOccured = false; 209 try { 210 mergeDefaultConfig(); 211 212 String appXml = readAndValidateXml(); 213 coordJob.setOrigJobXml(appXml); 214 LOG.debug("jobXml after initial validation " + XmlUtils.prettyPrint(appXml).toString()); 215 216 Element eXml = XmlUtils.parseXml(appXml); 217 218 String appNamespace = readAppNamespace(eXml); 219 coordJob.setAppNamespace(appNamespace); 220 221 ParameterVerifier.verifyParameters(conf, eXml); 222 223 appXml = XmlUtils.removeComments(appXml); 224 initEvaluators(); 225 Element eJob = basicResolveAndIncludeDS(appXml, conf, coordJob); 226 227 validateCoordinatorJob(); 228 229 // checking if the coordinator application data input/output events 230 // specify multiple data instance values in erroneous manner 231 checkMultipleTimeInstances(eJob, COORD_INPUT_EVENTS, COORD_INPUT_EVENTS_DATA_IN); 232 checkMultipleTimeInstances(eJob, COORD_OUTPUT_EVENTS, COORD_OUTPUT_EVENTS_DATA_OUT); 233 234 LOG.debug("jobXml after all validation " + XmlUtils.prettyPrint(eJob).toString()); 235 236 jobId = storeToDB(eJob, coordJob); 237 // log job info for coordinator job 238 LogUtils.setLogInfo(coordJob, logInfo); 239 LOG = XLog.resetPrefix(LOG); 240 241 if (!dryrun) { 242 // submit a command to materialize jobs for the next 1 hour (3600 secs) 243 // so we don't wait 10 mins for the Service to run. 244 queue(new CoordMaterializeTransitionXCommand(jobId, 3600), 100); 245 } 246 else { 247 Date startTime = coordJob.getStartTime(); 248 long startTimeMilli = startTime.getTime(); 249 long endTimeMilli = startTimeMilli + (3600 * 1000); 250 Date jobEndTime = coordJob.getEndTime(); 251 Date endTime = new Date(endTimeMilli); 252 if (endTime.compareTo(jobEndTime) > 0) { 253 endTime = jobEndTime; 254 } 255 jobId = coordJob.getId(); 256 LOG.info("[" + jobId + "]: Update status to RUNNING"); 257 coordJob.setStatus(Job.Status.RUNNING); 258 coordJob.setPending(); 259 CoordActionMaterializeCommand coordActionMatCom = new CoordActionMaterializeCommand(jobId, startTime, 260 endTime); 261 Configuration jobConf = null; 262 try { 263 jobConf = new XConfiguration(new StringReader(coordJob.getConf())); 264 } 265 catch (IOException e1) { 266 LOG.warn("Configuration parse error. read from DB :" + coordJob.getConf(), e1); 267 } 268 String action = coordActionMatCom.materializeJobs(true, coordJob, jobConf, null); 269 String output = coordJob.getJobXml() + System.getProperty("line.separator") 270 + "***actions for instance***" + action; 271 return output; 272 } 273 } 274 catch (JDOMException jex) { 275 exceptionOccured = true; 276 LOG.warn("ERROR: ", jex); 277 throw new CommandException(ErrorCode.E0700, jex.getMessage(), jex); 278 } 279 catch (CoordinatorJobException cex) { 280 exceptionOccured = true; 281 LOG.warn("ERROR: ", cex); 282 throw new CommandException(cex); 283 } 284 catch (ParameterVerifierException pex) { 285 exceptionOccured = true; 286 LOG.warn("ERROR: ", pex); 287 throw new CommandException(pex); 288 } 289 catch (IllegalArgumentException iex) { 290 exceptionOccured = true; 291 LOG.warn("ERROR: ", iex); 292 throw new CommandException(ErrorCode.E1003, iex); 293 } 294 catch (Exception ex) { 295 exceptionOccured = true; 296 LOG.warn("ERROR: ", ex); 297 throw new CommandException(ErrorCode.E0803, ex); 298 } 299 finally { 300 if (exceptionOccured) { 301 if(coordJob.getId() == null || coordJob.getId().equalsIgnoreCase("")){ 302 coordJob.setStatus(CoordinatorJob.Status.FAILED); 303 coordJob.resetPending(); 304 } 305 } 306 } 307 308 LOG.info("ENDED Coordinator Submit jobId=" + jobId); 309 return jobId; 310 } 311 312 /** 313 * Method that validates values in the definition for correctness. Placeholder to add more. 314 */ 315 private void validateCoordinatorJob() { 316 // check if startTime < endTime 317 if (coordJob.getStartTime().after(coordJob.getEndTime())) { 318 throw new IllegalArgumentException("Coordinator Start Time cannot be greater than End Time."); 319 } 320 } 321 322 /* 323 * Check against multiple data instance values inside a single <instance> <start-instance> or <end-instance> tag 324 * If found, the job is not submitted and user is informed to correct the error, instead of defaulting to the first instance value in the list 325 */ 326 private void checkMultipleTimeInstances(Element eCoordJob, String eventType, String dataType) throws CoordinatorJobException { 327 Element eventsSpec, dataSpec, instance; 328 List<Element> instanceSpecList; 329 Namespace ns = eCoordJob.getNamespace(); 330 String instanceValue; 331 eventsSpec = eCoordJob.getChild(eventType, ns); 332 if (eventsSpec != null) { 333 dataSpec = eventsSpec.getChild(dataType, ns); 334 if (dataSpec != null) { 335 // In case of input-events, there can be multiple child <instance> datasets. Iterating to ensure none of them have errors 336 instanceSpecList = dataSpec.getChildren("instance", ns); 337 Iterator instanceIter = instanceSpecList.iterator(); 338 while(instanceIter.hasNext()) { 339 instance = ((Element) instanceIter.next()); 340 if(instance.getContentSize() == 0) { //empty string or whitespace 341 throw new CoordinatorJobException(ErrorCode.E1021, "<instance> tag within " + eventType + " is empty!"); 342 } 343 instanceValue = instance.getContent(0).toString(); 344 boolean isInvalid = false; 345 try { 346 isInvalid = evalAction.checkForExistence(instanceValue, ","); 347 } catch (Exception e) { 348 handleELParseException(eventType, dataType, instanceValue); 349 } 350 if (isInvalid) { // reaching this block implies instance is not empty i.e. length > 0 351 handleExpresionWithMultipleInstances(eventType, dataType, instanceValue); 352 } 353 } 354 355 // In case of input-events, there can be multiple child <start-instance> datasets. Iterating to ensure none of them have errors 356 instanceSpecList = dataSpec.getChildren("start-instance", ns); 357 instanceIter = instanceSpecList.iterator(); 358 while(instanceIter.hasNext()) { 359 instance = ((Element) instanceIter.next()); 360 if(instance.getContentSize() == 0) { //empty string or whitespace 361 throw new CoordinatorJobException(ErrorCode.E1021, "<start-instance> tag within " + eventType + " is empty!"); 362 } 363 instanceValue = instance.getContent(0).toString(); 364 boolean isInvalid = false; 365 try { 366 isInvalid = evalAction.checkForExistence(instanceValue, ","); 367 } catch (Exception e) { 368 handleELParseException(eventType, dataType, instanceValue); 369 } 370 if (isInvalid) { // reaching this block implies start instance is not empty i.e. length > 0 371 handleExpresionWithStartMultipleInstances(eventType, dataType, instanceValue); 372 } 373 } 374 375 // In case of input-events, there can be multiple child <end-instance> datasets. Iterating to ensure none of them have errors 376 instanceSpecList = dataSpec.getChildren("end-instance", ns); 377 instanceIter = instanceSpecList.iterator(); 378 while(instanceIter.hasNext()) { 379 instance = ((Element) instanceIter.next()); 380 if(instance.getContentSize() == 0) { //empty string or whitespace 381 throw new CoordinatorJobException(ErrorCode.E1021, "<end-instance> tag within " + eventType + " is empty!"); 382 } 383 instanceValue = instance.getContent(0).toString(); 384 boolean isInvalid = false; 385 try { 386 isInvalid = evalAction.checkForExistence(instanceValue, ","); 387 } catch (Exception e) { 388 handleELParseException(eventType, dataType, instanceValue); 389 } 390 if (isInvalid) { // reaching this block implies instance is not empty i.e. length > 0 391 handleExpresionWithMultipleEndInstances(eventType, dataType, instanceValue); 392 } 393 } 394 395 } 396 } 397 } 398 399 private void handleELParseException(String eventType, String dataType, String instanceValue) 400 throws CoordinatorJobException { 401 String correctAction = null; 402 if(dataType.equals(COORD_INPUT_EVENTS_DATA_IN)) { 403 correctAction = "Coordinator app definition should have valid <instance> tag for data-in"; 404 } else if(dataType.equals(COORD_OUTPUT_EVENTS_DATA_OUT)) { 405 correctAction = "Coordinator app definition should have valid <instance> tag for data-out"; 406 } 407 throw new CoordinatorJobException(ErrorCode.E1021, eventType + " instance '" + instanceValue 408 + "' is not valid. Coordinator job NOT SUBMITTED. " + correctAction); 409 } 410 411 private void handleExpresionWithMultipleInstances(String eventType, String dataType, String instanceValue) 412 throws CoordinatorJobException { 413 String correctAction = null; 414 if(dataType.equals(COORD_INPUT_EVENTS_DATA_IN)) { 415 correctAction = "Coordinator app definition should have separate <instance> tag per data-in instance"; 416 } else if(dataType.equals(COORD_OUTPUT_EVENTS_DATA_OUT)) { 417 correctAction = "Coordinator app definition can have only one <instance> tag per data-out instance"; 418 } 419 throw new CoordinatorJobException(ErrorCode.E1021, eventType + " instance '" + instanceValue 420 + "' contains more than one date instance. Coordinator job NOT SUBMITTED. " + correctAction); 421 } 422 423 private void handleExpresionWithStartMultipleInstances(String eventType, String dataType, String instanceValue) 424 throws CoordinatorJobException { 425 String correctAction = "Coordinator app definition should not have multiple start-instances"; 426 throw new CoordinatorJobException(ErrorCode.E1021, eventType + " start-instance '" + instanceValue 427 + "' contains more than one date start-instance. Coordinator job NOT SUBMITTED. " + correctAction); 428 } 429 430 private void handleExpresionWithMultipleEndInstances(String eventType, String dataType, String instanceValue) 431 throws CoordinatorJobException { 432 String correctAction = "Coordinator app definition should not have multiple end-instances"; 433 throw new CoordinatorJobException(ErrorCode.E1021, eventType + " end-instance '" + instanceValue 434 + "' contains more than one date end-instance. Coordinator job NOT SUBMITTED. " + correctAction); 435 } 436 437 /** 438 * Read the application XML and validate against coordinator Schema 439 * 440 * @return validated coordinator XML 441 * @throws CoordinatorJobException thrown if unable to read or validate coordinator xml 442 */ 443 private String readAndValidateXml() throws CoordinatorJobException { 444 String appPath = ParamChecker.notEmpty(conf.get(OozieClient.COORDINATOR_APP_PATH), 445 OozieClient.COORDINATOR_APP_PATH); 446 String coordXml = readDefinition(appPath); 447 validateXml(coordXml); 448 return coordXml; 449 } 450 451 /** 452 * Validate against Coordinator XSD file 453 * 454 * @param xmlContent : Input coordinator xml 455 * @throws CoordinatorJobException thrown if unable to validate coordinator xml 456 */ 457 private void validateXml(String xmlContent) throws CoordinatorJobException { 458 javax.xml.validation.Schema schema = Services.get().get(SchemaService.class).getSchema(SchemaName.COORDINATOR); 459 Validator validator = schema.newValidator(); 460 try { 461 validator.validate(new StreamSource(new StringReader(xmlContent))); 462 } 463 catch (SAXException ex) { 464 LOG.warn("SAXException :", ex); 465 throw new CoordinatorJobException(ErrorCode.E0701, ex.getMessage(), ex); 466 } 467 catch (IOException ex) { 468 LOG.warn("IOException :", ex); 469 throw new CoordinatorJobException(ErrorCode.E0702, ex.getMessage(), ex); 470 } 471 } 472 473 /** 474 * Read the application XML schema namespace 475 * 476 * @param coordXmlElement input coordinator xml Element 477 * @return app xml namespace 478 * @throws CoordinatorJobException 479 */ 480 private String readAppNamespace(Element coordXmlElement) throws CoordinatorJobException { 481 Namespace ns = coordXmlElement.getNamespace(); 482 if (ns != null && bundleId != null && ns.getURI().equals(SchemaService.COORDINATOR_NAMESPACE_URI_1)) { 483 throw new CoordinatorJobException(ErrorCode.E1319, "bundle app can not submit coordinator namespace " 484 + SchemaService.COORDINATOR_NAMESPACE_URI_1 + ", please use 0.2 or later"); 485 } 486 if (ns != null) { 487 return ns.getURI(); 488 } 489 else { 490 throw new CoordinatorJobException(ErrorCode.E0700, "the application xml namespace is not given"); 491 } 492 } 493 494 /** 495 * Merge default configuration with user-defined configuration. 496 * 497 * @throws CommandException thrown if failed to read or merge configurations 498 */ 499 protected void mergeDefaultConfig() throws CommandException { 500 Path configDefault = null; 501 try { 502 String coordAppPathStr = conf.get(OozieClient.COORDINATOR_APP_PATH); 503 Path coordAppPath = new Path(coordAppPathStr); 504 String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME); 505 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 506 Configuration fsConf = has.createJobConf(coordAppPath.toUri().getAuthority()); 507 FileSystem fs = has.createFileSystem(user, coordAppPath.toUri(), fsConf); 508 509 // app path could be a directory 510 if (!fs.isFile(coordAppPath)) { 511 configDefault = new Path(coordAppPath, CONFIG_DEFAULT); 512 } else { 513 configDefault = new Path(coordAppPath.getParent(), CONFIG_DEFAULT); 514 } 515 516 if (fs.exists(configDefault)) { 517 Configuration defaultConf = new XConfiguration(fs.open(configDefault)); 518 PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES); 519 XConfiguration.injectDefaults(defaultConf, conf); 520 } 521 else { 522 LOG.info("configDefault Doesn't exist " + configDefault); 523 } 524 PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES); 525 526 // Resolving all variables in the job properties. 527 // This ensures the Hadoop Configuration semantics is preserved. 528 XConfiguration resolvedVarsConf = new XConfiguration(); 529 for (Map.Entry<String, String> entry : conf) { 530 resolvedVarsConf.set(entry.getKey(), conf.get(entry.getKey())); 531 } 532 conf = resolvedVarsConf; 533 } 534 catch (IOException e) { 535 throw new CommandException(ErrorCode.E0702, e.getMessage() + " : Problem reading default config " 536 + configDefault, e); 537 } 538 catch (HadoopAccessorException e) { 539 throw new CommandException(e); 540 } 541 LOG.debug("Merged CONF :" + XmlUtils.prettyPrint(conf).toString()); 542 } 543 544 /** 545 * The method resolve all the variables that are defined in configuration. It also include the data set definition 546 * from dataset file into XML. 547 * 548 * @param appXml : Original job XML 549 * @param conf : Configuration of the job 550 * @param coordJob : Coordinator job bean to be populated. 551 * @return Resolved and modified job XML element. 552 * @throws CoordinatorJobException thrown if failed to resolve basic entities or include referred datasets 553 * @throws Exception thrown if failed to resolve basic entities or include referred datasets 554 */ 555 public Element basicResolveAndIncludeDS(String appXml, Configuration conf, CoordinatorJobBean coordJob) 556 throws CoordinatorJobException, Exception { 557 Element basicResolvedApp = resolveInitial(conf, appXml, coordJob); 558 includeDataSets(basicResolvedApp, conf); 559 return basicResolvedApp; 560 } 561 562 /** 563 * Insert data set into data-in and data-out tags. 564 * 565 * @param eAppXml : coordinator application XML 566 * @param eDatasets : DataSet XML 567 */ 568 @SuppressWarnings("unchecked") 569 private void insertDataSet(Element eAppXml, Element eDatasets) { 570 // Adding DS definition in the coordinator XML 571 Element inputList = eAppXml.getChild("input-events", eAppXml.getNamespace()); 572 if (inputList != null) { 573 for (Element dataIn : (List<Element>) inputList.getChildren("data-in", eAppXml.getNamespace())) { 574 Element eDataset = findDataSet(eDatasets, dataIn.getAttributeValue("dataset")); 575 dataIn.getContent().add(0, eDataset); 576 } 577 } 578 Element outputList = eAppXml.getChild("output-events", eAppXml.getNamespace()); 579 if (outputList != null) { 580 for (Element dataOut : (List<Element>) outputList.getChildren("data-out", eAppXml.getNamespace())) { 581 Element eDataset = findDataSet(eDatasets, dataOut.getAttributeValue("dataset")); 582 dataOut.getContent().add(0, eDataset); 583 } 584 } 585 } 586 587 /** 588 * Find a specific dataset from a list of Datasets. 589 * 590 * @param eDatasets : List of data sets 591 * @param name : queried data set name 592 * @return one Dataset element. otherwise throw Exception 593 */ 594 @SuppressWarnings("unchecked") 595 private static Element findDataSet(Element eDatasets, String name) { 596 for (Element eDataset : (List<Element>) eDatasets.getChildren("dataset", eDatasets.getNamespace())) { 597 if (eDataset.getAttributeValue("name").equals(name)) { 598 eDataset = (Element) eDataset.clone(); 599 eDataset.detach(); 600 return eDataset; 601 } 602 } 603 throw new RuntimeException("undefined dataset: " + name); 604 } 605 606 /** 607 * Initialize all the required EL Evaluators. 608 */ 609 protected void initEvaluators() { 610 evalFreq = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-submit-freq"); 611 evalNofuncs = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-submit-nofuncs"); 612 evalInst = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-submit-instances"); 613 evalSla = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-sla-submit"); 614 evalAction = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-action-start"); 615 } 616 617 /** 618 * Resolve basic entities using job Configuration. 619 * 620 * @param conf :Job configuration 621 * @param appXml : Original job XML 622 * @param coordJob : Coordinator job bean to be populated. 623 * @return Resolved job XML element. 624 * @throws CoordinatorJobException thrown if failed to resolve basic entities 625 * @throws Exception thrown if failed to resolve basic entities 626 */ 627 @SuppressWarnings("unchecked") 628 protected Element resolveInitial(Configuration conf, String appXml, CoordinatorJobBean coordJob) 629 throws CoordinatorJobException, Exception { 630 Element eAppXml = XmlUtils.parseXml(appXml); 631 // job's main attributes 632 // frequency 633 String val = resolveAttribute("frequency", eAppXml, evalFreq); 634 int ival = ParamChecker.checkInteger(val, "frequency"); 635 ParamChecker.checkGTZero(ival, "frequency"); 636 coordJob.setFrequency(ival); 637 TimeUnit tmp = (evalFreq.getVariable("timeunit") == null) ? TimeUnit.MINUTE : ((TimeUnit) evalFreq 638 .getVariable("timeunit")); 639 addAnAttribute("freq_timeunit", eAppXml, tmp.toString()); 640 // TimeUnit 641 coordJob.setTimeUnit(CoordinatorJob.Timeunit.valueOf(tmp.toString())); 642 // End Of Duration 643 tmp = evalFreq.getVariable("endOfDuration") == null ? TimeUnit.NONE : ((TimeUnit) evalFreq 644 .getVariable("endOfDuration")); 645 addAnAttribute("end_of_duration", eAppXml, tmp.toString()); 646 // coordJob.setEndOfDuration(tmp) // TODO: Add new attribute in Job bean 647 648 // Application name 649 if (this.coordName == null) { 650 String name = ELUtils.resolveAppName(eAppXml.getAttribute("name").getValue(), conf); 651 coordJob.setAppName(name); 652 } 653 else { 654 // this coord job is created from bundle 655 coordJob.setAppName(this.coordName); 656 } 657 658 // start time 659 val = resolveAttribute("start", eAppXml, evalNofuncs); 660 ParamChecker.checkDateOozieTZ(val, "start"); 661 coordJob.setStartTime(DateUtils.parseDateOozieTZ(val)); 662 // end time 663 val = resolveAttribute("end", eAppXml, evalNofuncs); 664 ParamChecker.checkDateOozieTZ(val, "end"); 665 coordJob.setEndTime(DateUtils.parseDateOozieTZ(val)); 666 // Time zone 667 val = resolveAttribute("timezone", eAppXml, evalNofuncs); 668 ParamChecker.checkTimeZone(val, "timezone"); 669 coordJob.setTimeZone(val); 670 671 // controls 672 val = resolveTagContents("timeout", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs); 673 if (val == "") { 674 val = Services.get().getConf().get(CONF_DEFAULT_TIMEOUT_NORMAL); 675 } 676 677 ival = ParamChecker.checkInteger(val, "timeout"); 678 if (ival < 0 || ival > Services.get().getConf().getInt(CONF_DEFAULT_MAX_TIMEOUT, 129600)) { 679 ival = Services.get().getConf().getInt(CONF_DEFAULT_MAX_TIMEOUT, 129600); 680 } 681 coordJob.setTimeout(ival); 682 683 val = resolveTagContents("concurrency", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs); 684 if (val == null || val.isEmpty()) { 685 val = Services.get().getConf().get(CONF_DEFAULT_CONCURRENCY, "1"); 686 } 687 ival = ParamChecker.checkInteger(val, "concurrency"); 688 coordJob.setConcurrency(ival); 689 690 val = resolveTagContents("throttle", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs); 691 if (val == null || val.isEmpty()) { 692 int defaultThrottle = Services.get().getConf().getInt(CONF_DEFAULT_THROTTLE, 12); 693 ival = defaultThrottle; 694 } 695 else { 696 ival = ParamChecker.checkInteger(val, "throttle"); 697 } 698 int maxQueue = Services.get().getConf().getInt(CONF_QUEUE_SIZE, 10000); 699 float factor = Services.get().getConf().getFloat(CONF_MAT_THROTTLING_FACTOR, 0.10f); 700 int maxThrottle = (int) (maxQueue * factor); 701 if (ival > maxThrottle || ival < 1) { 702 ival = maxThrottle; 703 } 704 LOG.debug("max throttle " + ival); 705 coordJob.setMatThrottling(ival); 706 707 val = resolveTagContents("execution", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs); 708 if (val == "") { 709 val = Execution.FIFO.toString(); 710 } 711 coordJob.setExecution(Execution.valueOf(val)); 712 String[] acceptedVals = { Execution.LIFO.toString(), Execution.FIFO.toString(), Execution.LAST_ONLY.toString() }; 713 ParamChecker.isMember(val, acceptedVals, "execution"); 714 715 // datasets 716 resolveTagContents("include", eAppXml.getChild("datasets", eAppXml.getNamespace()), evalNofuncs); 717 // for each data set 718 resolveDataSets(eAppXml); 719 HashMap<String, String> dataNameList = new HashMap<String, String>(); 720 resolveIOEvents(eAppXml, dataNameList); 721 722 resolveTagContents("app-path", eAppXml.getChild("action", eAppXml.getNamespace()).getChild("workflow", 723 eAppXml.getNamespace()), evalNofuncs); 724 // TODO: If action or workflow tag is missing, NullPointerException will 725 // occur 726 Element configElem = eAppXml.getChild("action", eAppXml.getNamespace()).getChild("workflow", 727 eAppXml.getNamespace()).getChild("configuration", eAppXml.getNamespace()); 728 evalData = CoordELEvaluator.createELEvaluatorForDataEcho(conf, "coord-job-submit-data", dataNameList); 729 if (configElem != null) { 730 for (Element propElem : (List<Element>) configElem.getChildren("property", configElem.getNamespace())) { 731 resolveTagContents("name", propElem, evalData); 732 // Want to check the data-integrity but don't want to modify the 733 // XML 734 // for properties only 735 Element tmpProp = (Element) propElem.clone(); 736 resolveTagContents("value", tmpProp, evalData); 737 } 738 } 739 resolveSLA(eAppXml, coordJob); 740 return eAppXml; 741 } 742 743 /** 744 * Resolve SLA events 745 * 746 * @param eAppXml job XML 747 * @param coordJob coordinator job bean 748 * @throws CommandException thrown if failed to resolve sla events 749 */ 750 private void resolveSLA(Element eAppXml, CoordinatorJobBean coordJob) throws CommandException { 751 Element eSla = eAppXml.getChild("action", eAppXml.getNamespace()).getChild("info", 752 Namespace.getNamespace(SchemaService.SLA_NAME_SPACE_URI)); 753 754 if (eSla != null) { 755 String slaXml = XmlUtils.prettyPrint(eSla).toString(); 756 try { 757 // EL evaluation 758 slaXml = evalSla.evaluate(slaXml, String.class); 759 // Validate against semantic SXD 760 XmlUtils.validateData(slaXml, SchemaName.SLA_ORIGINAL); 761 } 762 catch (Exception e) { 763 throw new CommandException(ErrorCode.E1004, "Validation ERROR :" + e.getMessage(), e); 764 } 765 } 766 } 767 768 /** 769 * Resolve input-events/data-in and output-events/data-out tags. 770 * 771 * @param eJob : Job element 772 * @throws CoordinatorJobException thrown if failed to resolve input and output events 773 */ 774 @SuppressWarnings("unchecked") 775 private void resolveIOEvents(Element eJobOrg, HashMap<String, String> dataNameList) throws CoordinatorJobException { 776 // Resolving input-events/data-in 777 // Clone the job and don't update anything in the original 778 Element eJob = (Element) eJobOrg.clone(); 779 Element inputList = eJob.getChild("input-events", eJob.getNamespace()); 780 if (inputList != null) { 781 TreeSet<String> eventNameSet = new TreeSet<String>(); 782 for (Element dataIn : (List<Element>) inputList.getChildren("data-in", eJob.getNamespace())) { 783 String dataInName = dataIn.getAttributeValue("name"); 784 dataNameList.put(dataInName, "data-in"); 785 // check whether there is any duplicate data-in name 786 if (eventNameSet.contains(dataInName)) { 787 throw new RuntimeException("Duplicate dataIn name " + dataInName); 788 } 789 else { 790 eventNameSet.add(dataInName); 791 } 792 resolveTagContents("instance", dataIn, evalInst); 793 resolveTagContents("start-instance", dataIn, evalInst); 794 resolveTagContents("end-instance", dataIn, evalInst); 795 } 796 } 797 // Resolving output-events/data-out 798 Element outputList = eJob.getChild("output-events", eJob.getNamespace()); 799 if (outputList != null) { 800 TreeSet<String> eventNameSet = new TreeSet<String>(); 801 for (Element dataOut : (List<Element>) outputList.getChildren("data-out", eJob.getNamespace())) { 802 String dataOutName = dataOut.getAttributeValue("name"); 803 dataNameList.put(dataOutName, "data-out"); 804 // check whether there is any duplicate data-out name 805 if (eventNameSet.contains(dataOutName)) { 806 throw new RuntimeException("Duplicate dataIn name " + dataOutName); 807 } 808 else { 809 eventNameSet.add(dataOutName); 810 } 811 resolveTagContents("instance", dataOut, evalInst); 812 } 813 } 814 815 } 816 817 /** 818 * Add an attribute into XML element. 819 * 820 * @param attrName :attribute name 821 * @param elem : Element to add attribute 822 * @param value :Value of attribute 823 */ 824 private void addAnAttribute(String attrName, Element elem, String value) { 825 elem.setAttribute(attrName, value); 826 } 827 828 /** 829 * Resolve datasets using job configuration. 830 * 831 * @param eAppXml : Job Element XML 832 * @throws Exception thrown if failed to resolve datasets 833 */ 834 @SuppressWarnings("unchecked") 835 private void resolveDataSets(Element eAppXml) throws Exception { 836 Element datasetList = eAppXml.getChild("datasets", eAppXml.getNamespace()); 837 if (datasetList != null) { 838 839 List<Element> dsElems = datasetList.getChildren("dataset", eAppXml.getNamespace()); 840 resolveDataSets(dsElems); 841 resolveTagContents("app-path", eAppXml.getChild("action", eAppXml.getNamespace()).getChild("workflow", 842 eAppXml.getNamespace()), evalNofuncs); 843 } 844 } 845 846 /** 847 * Resolve datasets using job configuration. 848 * 849 * @param dsElems : Data set XML element. 850 * @throws CoordinatorJobException thrown if failed to resolve datasets 851 */ 852 private void resolveDataSets(List<Element> dsElems) throws CoordinatorJobException { 853 for (Element dsElem : dsElems) { 854 // Setting up default TimeUnit and EndOFDuraion 855 evalFreq.setVariable("timeunit", TimeUnit.MINUTE); 856 evalFreq.setVariable("endOfDuration", TimeUnit.NONE); 857 858 String val = resolveAttribute("frequency", dsElem, evalFreq); 859 int ival = ParamChecker.checkInteger(val, "frequency"); 860 ParamChecker.checkGTZero(ival, "frequency"); 861 addAnAttribute("freq_timeunit", dsElem, evalFreq.getVariable("timeunit") == null ? TimeUnit.MINUTE 862 .toString() : ((TimeUnit) evalFreq.getVariable("timeunit")).toString()); 863 addAnAttribute("end_of_duration", dsElem, evalFreq.getVariable("endOfDuration") == null ? TimeUnit.NONE 864 .toString() : ((TimeUnit) evalFreq.getVariable("endOfDuration")).toString()); 865 val = resolveAttribute("initial-instance", dsElem, evalNofuncs); 866 ParamChecker.checkDateOozieTZ(val, "initial-instance"); 867 checkInitialInstance(val); 868 val = resolveAttribute("timezone", dsElem, evalNofuncs); 869 ParamChecker.checkTimeZone(val, "timezone"); 870 resolveTagContents("uri-template", dsElem, evalNofuncs); 871 resolveTagContents("done-flag", dsElem, evalNofuncs); 872 } 873 } 874 875 /** 876 * Resolve the content of a tag. 877 * 878 * @param tagName : Tag name of job XML i.e. <timeout> 10 </timeout> 879 * @param elem : Element where the tag exists. 880 * @param eval : EL evealuator 881 * @return Resolved tag content. 882 * @throws CoordinatorJobException thrown if failed to resolve tag content 883 */ 884 @SuppressWarnings("unchecked") 885 private String resolveTagContents(String tagName, Element elem, ELEvaluator eval) throws CoordinatorJobException { 886 String ret = ""; 887 if (elem != null) { 888 for (Element tagElem : (List<Element>) elem.getChildren(tagName, elem.getNamespace())) { 889 if (tagElem != null) { 890 String updated; 891 try { 892 updated = CoordELFunctions.evalAndWrap(eval, tagElem.getText().trim()); 893 894 } 895 catch (Exception e) { 896 throw new CoordinatorJobException(ErrorCode.E1004, e.getMessage(), e); 897 } 898 tagElem.removeContent(); 899 tagElem.addContent(updated); 900 ret += updated; 901 } 902 } 903 } 904 return ret; 905 } 906 907 /** 908 * Resolve an attribute value. 909 * 910 * @param attrName : Attribute name. 911 * @param elem : XML Element where attribute is defiend 912 * @param eval : ELEvaluator used to resolve 913 * @return Resolved attribute value 914 * @throws CoordinatorJobException thrown if failed to resolve an attribute value 915 */ 916 private String resolveAttribute(String attrName, Element elem, ELEvaluator eval) throws CoordinatorJobException { 917 Attribute attr = elem.getAttribute(attrName); 918 String val = null; 919 if (attr != null) { 920 try { 921 val = CoordELFunctions.evalAndWrap(eval, attr.getValue().trim()); 922 } 923 catch (Exception e) { 924 throw new CoordinatorJobException(ErrorCode.E1004, e.getMessage(), e); 925 } 926 attr.setValue(val); 927 } 928 return val; 929 } 930 931 /** 932 * Include referred datasets into XML. 933 * 934 * @param resolvedXml : Job XML element. 935 * @param conf : Job configuration 936 * @throws CoordinatorJobException thrown if failed to include referred datasets into XML 937 */ 938 @SuppressWarnings("unchecked") 939 protected void includeDataSets(Element resolvedXml, Configuration conf) throws CoordinatorJobException { 940 Element datasets = resolvedXml.getChild("datasets", resolvedXml.getNamespace()); 941 Element allDataSets = new Element("all_datasets", resolvedXml.getNamespace()); 942 List<String> dsList = new ArrayList<String>(); 943 if (datasets != null) { 944 for (Element includeElem : (List<Element>) datasets.getChildren("include", datasets.getNamespace())) { 945 String incDSFile = includeElem.getTextTrim(); 946 includeOneDSFile(incDSFile, dsList, allDataSets, datasets.getNamespace()); 947 } 948 for (Element e : (List<Element>) datasets.getChildren("dataset", datasets.getNamespace())) { 949 String dsName = e.getAttributeValue("name"); 950 if (dsList.contains(dsName)) {// Override with this DS 951 // Remove old DS 952 removeDataSet(allDataSets, dsName); 953 } 954 else { 955 dsList.add(dsName); 956 } 957 allDataSets.addContent((Element) e.clone()); 958 } 959 } 960 insertDataSet(resolvedXml, allDataSets); 961 resolvedXml.removeChild("datasets", resolvedXml.getNamespace()); 962 } 963 964 /** 965 * Include one dataset file. 966 * 967 * @param incDSFile : Include data set filename. 968 * @param dsList :List of dataset names to verify the duplicate. 969 * @param allDataSets : Element that includes all dataset definitions. 970 * @param dsNameSpace : Data set name space 971 * @throws CoordinatorJobException thrown if failed to include one dataset file 972 */ 973 @SuppressWarnings("unchecked") 974 private void includeOneDSFile(String incDSFile, List<String> dsList, Element allDataSets, Namespace dsNameSpace) 975 throws CoordinatorJobException { 976 Element tmpDataSets = null; 977 try { 978 String dsXml = readDefinition(incDSFile); 979 LOG.debug("DSFILE :" + incDSFile + "\n" + dsXml); 980 tmpDataSets = XmlUtils.parseXml(dsXml); 981 } 982 catch (JDOMException e) { 983 LOG.warn("Error parsing included dataset [{0}]. Message [{1}]", incDSFile, e.getMessage()); 984 throw new CoordinatorJobException(ErrorCode.E0700, e.getMessage()); 985 } 986 resolveDataSets(tmpDataSets.getChildren("dataset")); 987 for (Element e : (List<Element>) tmpDataSets.getChildren("dataset")) { 988 String dsName = e.getAttributeValue("name"); 989 if (dsList.contains(dsName)) { 990 throw new RuntimeException("Duplicate Dataset " + dsName); 991 } 992 dsList.add(dsName); 993 Element tmp = (Element) e.clone(); 994 // TODO: Don't like to over-write the external/include DS's namespace 995 tmp.setNamespace(dsNameSpace); 996 tmp.getChild("uri-template").setNamespace(dsNameSpace); 997 if (e.getChild("done-flag") != null) { 998 tmp.getChild("done-flag").setNamespace(dsNameSpace); 999 } 1000 allDataSets.addContent(tmp); 1001 } 1002 // nested include 1003 for (Element includeElem : (List<Element>) tmpDataSets.getChildren("include", tmpDataSets.getNamespace())) { 1004 String incFile = includeElem.getTextTrim(); 1005 includeOneDSFile(incFile, dsList, allDataSets, dsNameSpace); 1006 } 1007 } 1008 1009 /** 1010 * Remove a dataset from a list of dataset. 1011 * 1012 * @param eDatasets : List of dataset 1013 * @param name : Dataset name to be removed. 1014 */ 1015 @SuppressWarnings("unchecked") 1016 private static void removeDataSet(Element eDatasets, String name) { 1017 for (Element eDataset : (List<Element>) eDatasets.getChildren("dataset", eDatasets.getNamespace())) { 1018 if (eDataset.getAttributeValue("name").equals(name)) { 1019 eDataset.detach(); 1020 } 1021 } 1022 throw new RuntimeException("undefined dataset: " + name); 1023 } 1024 1025 /** 1026 * Read coordinator definition. 1027 * 1028 * @param appPath application path. 1029 * @return coordinator definition. 1030 * @throws CoordinatorJobException thrown if the definition could not be read. 1031 */ 1032 protected String readDefinition(String appPath) throws CoordinatorJobException { 1033 String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME); 1034 // Configuration confHadoop = CoordUtils.getHadoopConf(conf); 1035 try { 1036 URI uri = new URI(appPath); 1037 LOG.debug("user =" + user); 1038 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 1039 Configuration fsConf = has.createJobConf(uri.getAuthority()); 1040 FileSystem fs = has.createFileSystem(user, uri, fsConf); 1041 Path appDefPath = null; 1042 1043 // app path could be a directory 1044 Path path = new Path(uri.getPath()); 1045 // check file exists for dataset include file, app xml already checked 1046 if (!fs.exists(path)) { 1047 throw new URISyntaxException(path.toString(), "path not existed : " + path.toString()); 1048 } 1049 if (!fs.isFile(path)) { 1050 appDefPath = new Path(path, COORDINATOR_XML_FILE); 1051 } else { 1052 appDefPath = path; 1053 } 1054 1055 Reader reader = new InputStreamReader(fs.open(appDefPath)); 1056 StringWriter writer = new StringWriter(); 1057 IOUtils.copyCharStream(reader, writer); 1058 return writer.toString(); 1059 } 1060 catch (IOException ex) { 1061 LOG.warn("IOException :" + XmlUtils.prettyPrint(conf), ex); 1062 throw new CoordinatorJobException(ErrorCode.E1001, ex.getMessage(), ex); 1063 } 1064 catch (URISyntaxException ex) { 1065 LOG.warn("URISyException :" + ex.getMessage()); 1066 throw new CoordinatorJobException(ErrorCode.E1002, appPath, ex.getMessage(), ex); 1067 } 1068 catch (HadoopAccessorException ex) { 1069 throw new CoordinatorJobException(ex); 1070 } 1071 catch (Exception ex) { 1072 LOG.warn("Exception :", ex); 1073 throw new CoordinatorJobException(ErrorCode.E1001, ex.getMessage(), ex); 1074 } 1075 } 1076 1077 /** 1078 * Write a coordinator job into database 1079 * 1080 * @param eJob : XML element of job 1081 * @param coordJob : Coordinator job bean 1082 * @return Job id 1083 * @throws CommandException thrown if unable to save coordinator job to db 1084 */ 1085 private String storeToDB(Element eJob, CoordinatorJobBean coordJob) throws CommandException { 1086 String jobId = Services.get().get(UUIDService.class).generateId(ApplicationType.COORDINATOR); 1087 coordJob.setId(jobId); 1088 coordJob.setAuthToken(this.authToken); 1089 1090 coordJob.setAppPath(conf.get(OozieClient.COORDINATOR_APP_PATH)); 1091 coordJob.setCreatedTime(new Date()); 1092 coordJob.setUser(conf.get(OozieClient.USER_NAME)); 1093 String group = ConfigUtils.getWithDeprecatedCheck(conf, OozieClient.JOB_ACL, OozieClient.GROUP_NAME, null); 1094 coordJob.setGroup(group); 1095 coordJob.setConf(XmlUtils.prettyPrint(conf).toString()); 1096 coordJob.setJobXml(XmlUtils.prettyPrint(eJob).toString()); 1097 coordJob.setLastActionNumber(0); 1098 coordJob.setLastModifiedTime(new Date()); 1099 1100 if (!dryrun) { 1101 coordJob.setLastModifiedTime(new Date()); 1102 try { 1103 jpaService.execute(new CoordJobInsertJPAExecutor(coordJob)); 1104 } 1105 catch (JPAExecutorException je) { 1106 throw new CommandException(je); 1107 } 1108 } 1109 return jobId; 1110 } 1111 1112 /* 1113 * this method checks if the initial-instance specified for a particular 1114 is not a date earlier than the oozie server default Jan 01, 1970 00:00Z UTC 1115 */ 1116 private void checkInitialInstance(String val) throws CoordinatorJobException, IllegalArgumentException { 1117 Date initialInstance, givenInstance; 1118 try { 1119 initialInstance = DateUtils.parseDateUTC("1970-01-01T00:00Z"); 1120 givenInstance = DateUtils.parseDateOozieTZ(val); 1121 } 1122 catch (Exception e) { 1123 throw new IllegalArgumentException("Unable to parse dataset initial-instance string '" + val + 1124 "' to Date object. ",e); 1125 } 1126 if(givenInstance.compareTo(initialInstance) < 0) { 1127 throw new CoordinatorJobException(ErrorCode.E1021, "Dataset initial-instance " + val + 1128 " is earlier than the default initial instance " + DateUtils.formatDateOozieTZ(initialInstance)); 1129 } 1130 } 1131 1132 /* (non-Javadoc) 1133 * @see org.apache.oozie.command.XCommand#getEntityKey() 1134 */ 1135 @Override 1136 public String getEntityKey() { 1137 return null; 1138 } 1139 1140 /* (non-Javadoc) 1141 * @see org.apache.oozie.command.XCommand#isLockRequired() 1142 */ 1143 @Override 1144 protected boolean isLockRequired() { 1145 return false; 1146 } 1147 1148 /* (non-Javadoc) 1149 * @see org.apache.oozie.command.XCommand#loadState() 1150 */ 1151 @Override 1152 protected void loadState() throws CommandException { 1153 jpaService = Services.get().get(JPAService.class); 1154 if (jpaService == null) { 1155 throw new CommandException(ErrorCode.E0610); 1156 } 1157 coordJob = new CoordinatorJobBean(); 1158 if (this.bundleId != null) { 1159 // this coord job is created from bundle 1160 coordJob.setBundleId(this.bundleId); 1161 // first use bundle id if submit thru bundle 1162 logInfo.setParameter(DagXLogInfoService.JOB, this.bundleId); 1163 LogUtils.setLogInfo(logInfo); 1164 } 1165 if (this.coordName != null) { 1166 // this coord job is created from bundle 1167 coordJob.setAppName(this.coordName); 1168 } 1169 setJob(coordJob); 1170 1171 } 1172 1173 /* (non-Javadoc) 1174 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 1175 */ 1176 @Override 1177 protected void verifyPrecondition() throws CommandException { 1178 1179 } 1180 1181 /* (non-Javadoc) 1182 * @see org.apache.oozie.command.TransitionXCommand#notifyParent() 1183 */ 1184 @Override 1185 public void notifyParent() throws CommandException { 1186 // update bundle action 1187 if (coordJob.getBundleId() != null) { 1188 LOG.debug("Updating bundle record: " + coordJob.getBundleId() + " for coord id: " + coordJob.getId()); 1189 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus); 1190 bundleStatusUpdate.call(); 1191 } 1192 } 1193 1194 /* (non-Javadoc) 1195 * @see org.apache.oozie.command.TransitionXCommand#updateJob() 1196 */ 1197 @Override 1198 public void updateJob() throws CommandException { 1199 } 1200 1201 /* (non-Javadoc) 1202 * @see org.apache.oozie.command.TransitionXCommand#getJob() 1203 */ 1204 @Override 1205 public Job getJob() { 1206 return coordJob; 1207 } 1208 1209 @Override 1210 public void performWrites() throws CommandException { 1211 } 1212 }