001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.coord; 019 020 import java.io.IOException; 021 import java.io.InputStreamReader; 022 import java.io.Reader; 023 import java.io.StringReader; 024 import java.io.StringWriter; 025 import java.net.URI; 026 import java.net.URISyntaxException; 027 import java.text.DateFormat; 028 import java.text.SimpleDateFormat; 029 import java.util.ArrayList; 030 import java.util.Date; 031 import java.util.HashMap; 032 import java.util.HashSet; 033 import java.util.Iterator; 034 import java.util.List; 035 import java.util.Map; 036 import java.util.Set; 037 import java.util.TimeZone; 038 import java.util.TreeSet; 039 040 import javax.xml.transform.stream.StreamSource; 041 import javax.xml.validation.Validator; 042 043 import org.apache.hadoop.conf.Configuration; 044 import org.apache.hadoop.fs.FileSystem; 045 import org.apache.hadoop.fs.Path; 046 import org.apache.oozie.CoordinatorJobBean; 047 import org.apache.oozie.ErrorCode; 048 import org.apache.oozie.client.CoordinatorJob; 049 import org.apache.oozie.client.Job; 050 import org.apache.oozie.client.OozieClient; 051 import org.apache.oozie.client.CoordinatorJob.Execution; 052 import org.apache.oozie.command.CommandException; 053 import org.apache.oozie.command.SubmitTransitionXCommand; 054 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand; 055 import org.apache.oozie.coord.CoordELEvaluator; 056 import org.apache.oozie.coord.CoordELFunctions; 057 import org.apache.oozie.coord.CoordinatorJobException; 058 import org.apache.oozie.coord.TimeUnit; 059 import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor; 060 import org.apache.oozie.executor.jpa.JPAExecutorException; 061 import org.apache.oozie.service.DagXLogInfoService; 062 import org.apache.oozie.service.HadoopAccessorException; 063 import org.apache.oozie.service.HadoopAccessorService; 064 import org.apache.oozie.service.JPAService; 065 import org.apache.oozie.service.SchemaService; 066 import org.apache.oozie.service.Service; 067 import org.apache.oozie.service.Services; 068 import org.apache.oozie.service.UUIDService; 069 import org.apache.oozie.service.WorkflowAppService; 070 import org.apache.oozie.service.SchemaService.SchemaName; 071 import org.apache.oozie.service.UUIDService.ApplicationType; 072 import org.apache.oozie.util.ConfigUtils; 073 import org.apache.oozie.util.DateUtils; 074 import org.apache.oozie.util.ELEvaluator; 075 import org.apache.oozie.util.IOUtils; 076 import org.apache.oozie.util.InstrumentUtils; 077 import org.apache.oozie.util.LogUtils; 078 import org.apache.oozie.util.ParamChecker; 079 import org.apache.oozie.util.PropertiesUtils; 080 import org.apache.oozie.util.XConfiguration; 081 import org.apache.oozie.util.XLog; 082 import org.apache.oozie.util.XmlUtils; 083 import org.jdom.Attribute; 084 import org.jdom.Element; 085 import org.jdom.JDOMException; 086 import org.jdom.Namespace; 087 import org.xml.sax.SAXException; 088 089 /** 090 * This class provides the functionalities to resolve a coordinator job XML and write the job information into a DB 091 * table. 092 * <p/> 093 * Specifically it performs the following functions: 1. Resolve all the variables or properties using job 094 * configurations. 2. Insert all datasets definition as part of the <data-in> and <data-out> tags. 3. Validate the XML 095 * at runtime. 096 */ 097 public class CoordSubmitXCommand extends SubmitTransitionXCommand { 098 099 private Configuration conf; 100 private final String authToken; 101 private final String bundleId; 102 private final String coordName; 103 private boolean dryrun; 104 private JPAService jpaService = null; 105 private CoordinatorJob.Status prevStatus = CoordinatorJob.Status.PREP; 106 107 public static final String CONFIG_DEFAULT = "coord-config-default.xml"; 108 public static final String COORDINATOR_XML_FILE = "coordinator.xml"; 109 public final String COORD_INPUT_EVENTS ="input-events"; 110 public final String COORD_OUTPUT_EVENTS = "output-events"; 111 public final String COORD_INPUT_EVENTS_DATA_IN ="data-in"; 112 public final String COORD_OUTPUT_EVENTS_DATA_OUT = "data-out"; 113 114 private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>(); 115 private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>(); 116 117 private CoordinatorJobBean coordJob = null; 118 /** 119 * Default timeout for normal jobs, in minutes, after which coordinator input check will timeout 120 */ 121 public static final String CONF_DEFAULT_TIMEOUT_NORMAL = Service.CONF_PREFIX + "coord.normal.default.timeout"; 122 123 public static final String CONF_DEFAULT_CONCURRENCY = Service.CONF_PREFIX + "coord.default.concurrency"; 124 125 public static final String CONF_DEFAULT_THROTTLE = Service.CONF_PREFIX + "coord.default.throttle"; 126 127 public static final String CONF_MAT_THROTTLING_FACTOR = Service.CONF_PREFIX 128 + "coord.materialization.throttling.factor"; 129 130 /** 131 * Default MAX timeout in minutes, after which coordinator input check will timeout 132 */ 133 public static final String CONF_DEFAULT_MAX_TIMEOUT = Service.CONF_PREFIX + "coord.default.max.timeout"; 134 135 136 public static final String CONF_QUEUE_SIZE = Service.CONF_PREFIX + "CallableQueueService.queue.size"; 137 138 private ELEvaluator evalFreq = null; 139 private ELEvaluator evalNofuncs = null; 140 private ELEvaluator evalData = null; 141 private ELEvaluator evalInst = null; 142 private ELEvaluator evalAction = null; 143 private ELEvaluator evalSla = null; 144 145 static { 146 String[] badUserProps = { PropertiesUtils.YEAR, PropertiesUtils.MONTH, PropertiesUtils.DAY, 147 PropertiesUtils.HOUR, PropertiesUtils.MINUTE, PropertiesUtils.DAYS, PropertiesUtils.HOURS, 148 PropertiesUtils.MINUTES, PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB, 149 PropertiesUtils.TB, PropertiesUtils.PB, PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN, 150 PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN, PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS }; 151 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES); 152 153 String[] badDefaultProps = { PropertiesUtils.HADOOP_USER}; 154 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES); 155 PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES); 156 } 157 158 /** 159 * Constructor to create the Coordinator Submit Command. 160 * 161 * @param conf : Configuration for Coordinator job 162 * @param authToken : To be used for authentication 163 */ 164 public CoordSubmitXCommand(Configuration conf, String authToken) { 165 super("coord_submit", "coord_submit", 1); 166 this.conf = ParamChecker.notNull(conf, "conf"); 167 this.authToken = ParamChecker.notEmpty(authToken, "authToken"); 168 this.bundleId = null; 169 this.coordName = null; 170 } 171 172 /** 173 * Constructor to create the Coordinator Submit Command by bundle job. 174 * 175 * @param conf : Configuration for Coordinator job 176 * @param authToken : To be used for authentication 177 * @param bundleId : bundle id 178 * @param coordName : coord name 179 */ 180 public CoordSubmitXCommand(Configuration conf, String authToken, String bundleId, String coordName) { 181 super("coord_submit", "coord_submit", 1); 182 this.conf = ParamChecker.notNull(conf, "conf"); 183 this.authToken = ParamChecker.notEmpty(authToken, "authToken"); 184 this.bundleId = ParamChecker.notEmpty(bundleId, "bundleId"); 185 this.coordName = ParamChecker.notEmpty(coordName, "coordName"); 186 } 187 188 /** 189 * Constructor to create the Coordinator Submit Command. 190 * 191 * @param dryrun : if dryrun 192 * @param conf : Configuration for Coordinator job 193 * @param authToken : To be used for authentication 194 */ 195 public CoordSubmitXCommand(boolean dryrun, Configuration conf, String authToken) { 196 this(conf, authToken); 197 this.dryrun = dryrun; 198 } 199 200 /* (non-Javadoc) 201 * @see org.apache.oozie.command.XCommand#execute() 202 */ 203 @Override 204 protected String submit() throws CommandException { 205 String jobId = null; 206 LOG.info("STARTED Coordinator Submit"); 207 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation()); 208 209 boolean exceptionOccured = false; 210 try { 211 mergeDefaultConfig(); 212 213 String appXml = readAndValidateXml(); 214 coordJob.setOrigJobXml(appXml); 215 LOG.debug("jobXml after initial validation " + XmlUtils.prettyPrint(appXml).toString()); 216 217 String appNamespace = readAppNamespace(appXml); 218 coordJob.setAppNamespace(appNamespace); 219 220 appXml = XmlUtils.removeComments(appXml); 221 initEvaluators(); 222 Element eJob = basicResolveAndIncludeDS(appXml, conf, coordJob); 223 224 // checking if the coordinator application data input/output events 225 // specify multiple data instance values in erroneous manner 226 checkMultipleTimeInstances(eJob, COORD_INPUT_EVENTS, COORD_INPUT_EVENTS_DATA_IN); 227 checkMultipleTimeInstances(eJob, COORD_OUTPUT_EVENTS, COORD_OUTPUT_EVENTS_DATA_OUT); 228 229 LOG.debug("jobXml after all validation " + XmlUtils.prettyPrint(eJob).toString()); 230 231 jobId = storeToDB(eJob, coordJob); 232 // log job info for coordinator job 233 LogUtils.setLogInfo(coordJob, logInfo); 234 LOG = XLog.resetPrefix(LOG); 235 236 if (!dryrun) { 237 // submit a command to materialize jobs for the next 1 hour (3600 secs) 238 // so we don't wait 10 mins for the Service to run. 239 queue(new CoordMaterializeTransitionXCommand(jobId, 3600), 100); 240 } 241 else { 242 Date startTime = coordJob.getStartTime(); 243 long startTimeMilli = startTime.getTime(); 244 long endTimeMilli = startTimeMilli + (3600 * 1000); 245 Date jobEndTime = coordJob.getEndTime(); 246 Date endTime = new Date(endTimeMilli); 247 if (endTime.compareTo(jobEndTime) > 0) { 248 endTime = jobEndTime; 249 } 250 jobId = coordJob.getId(); 251 LOG.info("[" + jobId + "]: Update status to RUNNING"); 252 coordJob.setStatus(Job.Status.RUNNING); 253 coordJob.setPending(); 254 CoordActionMaterializeCommand coordActionMatCom = new CoordActionMaterializeCommand(jobId, startTime, 255 endTime); 256 Configuration jobConf = null; 257 try { 258 jobConf = new XConfiguration(new StringReader(coordJob.getConf())); 259 } 260 catch (IOException e1) { 261 LOG.warn("Configuration parse error. read from DB :" + coordJob.getConf(), e1); 262 } 263 String action = coordActionMatCom.materializeJobs(true, coordJob, jobConf, null); 264 String output = coordJob.getJobXml() + System.getProperty("line.separator") 265 + "***actions for instance***" + action; 266 return output; 267 } 268 } 269 catch (CoordinatorJobException cex) { 270 exceptionOccured = true; 271 LOG.warn("ERROR: ", cex); 272 throw new CommandException(cex); 273 } 274 catch (IllegalArgumentException iex) { 275 exceptionOccured = true; 276 LOG.warn("ERROR: ", iex); 277 throw new CommandException(ErrorCode.E1003, iex); 278 } 279 catch (Exception ex) { 280 exceptionOccured = true; 281 LOG.warn("ERROR: ", ex); 282 throw new CommandException(ErrorCode.E0803, ex); 283 } 284 finally { 285 if (exceptionOccured) { 286 if(coordJob.getId() == null || coordJob.getId().equalsIgnoreCase("")){ 287 coordJob.setStatus(CoordinatorJob.Status.FAILED); 288 coordJob.resetPending(); 289 } 290 } 291 } 292 293 LOG.info("ENDED Coordinator Submit jobId=" + jobId); 294 return jobId; 295 } 296 297 /* 298 * Check against multiple data instance values inside a single <instance> tag 299 * If found, the job is not submitted and user is informed to correct the error, instead of defaulting to the first instance value in the list 300 */ 301 private void checkMultipleTimeInstances(Element eCoordJob, String eventType, String dataType) throws CoordinatorJobException { 302 Element eventsSpec, dataSpec, instance; 303 List<Element> instanceSpecList; 304 Namespace ns = eCoordJob.getNamespace(); 305 String instanceValue; 306 eventsSpec = eCoordJob.getChild(eventType, ns); 307 if (eventsSpec != null) { 308 dataSpec = eventsSpec.getChild(dataType, ns); 309 if (dataSpec != null) { 310 // In case of input-events, there can be multiple child <instance> datasets. Iterating to ensure none of them have errors 311 instanceSpecList = dataSpec.getChildren("instance", ns); 312 Iterator instanceIter = instanceSpecList.iterator(); 313 while(instanceIter.hasNext()) { 314 instance = ((Element) instanceIter.next()); 315 if(instance.getContentSize() == 0) { //empty string or whitespace 316 throw new CoordinatorJobException(ErrorCode.E1021, "<instance> tag within " + eventType + " is empty!"); 317 } 318 instanceValue = instance.getContent(0).toString(); 319 boolean isInvalid = false; 320 try { 321 isInvalid = evalAction.checkForExistence(instanceValue, ","); 322 } catch (Exception e) { 323 handleELParseException(eventType, dataType, instanceValue); 324 } 325 if (isInvalid) { // reaching this block implies instance is not empty i.e. length > 0 326 handleExpresionWithMultipleInstances(eventType, dataType, instanceValue); 327 } 328 } 329 } 330 } 331 } 332 333 private void handleELParseException(String eventType, String dataType, String instanceValue) 334 throws CoordinatorJobException { 335 String correctAction = null; 336 if(dataType.equals(COORD_INPUT_EVENTS_DATA_IN)) { 337 correctAction = "Coordinator app definition should have valid <instance> tag for data-in"; 338 } else if(dataType.equals(COORD_OUTPUT_EVENTS_DATA_OUT)) { 339 correctAction = "Coordinator app definition should have valid <instance> tag for data-out"; 340 } 341 throw new CoordinatorJobException(ErrorCode.E1021, eventType + " instance '" + instanceValue 342 + "' is not valid. Coordinator job NOT SUBMITTED. " + correctAction); 343 } 344 345 private void handleExpresionWithMultipleInstances(String eventType, String dataType, String instanceValue) 346 throws CoordinatorJobException { 347 String correctAction = null; 348 if(dataType.equals(COORD_INPUT_EVENTS_DATA_IN)) { 349 correctAction = "Coordinator app definition should have separate <instance> tag per data-in instance"; 350 } else if(dataType.equals(COORD_OUTPUT_EVENTS_DATA_OUT)) { 351 correctAction = "Coordinator app definition can have only one <instance> tag per data-out instance"; 352 } 353 throw new CoordinatorJobException(ErrorCode.E1021, eventType + " instance '" + instanceValue 354 + "' contains more than one date instance. Coordinator job NOT SUBMITTED. " + correctAction); 355 } 356 357 /** 358 * Read the application XML and validate against coordinator Schema 359 * 360 * @return validated coordinator XML 361 * @throws CoordinatorJobException thrown if unable to read or validate coordinator xml 362 */ 363 private String readAndValidateXml() throws CoordinatorJobException { 364 String appPath = ParamChecker.notEmpty(conf.get(OozieClient.COORDINATOR_APP_PATH), 365 OozieClient.COORDINATOR_APP_PATH); 366 String coordXml = readDefinition(appPath); 367 validateXml(coordXml); 368 return coordXml; 369 } 370 371 /** 372 * Validate against Coordinator XSD file 373 * 374 * @param xmlContent : Input coordinator xml 375 * @throws CoordinatorJobException thrown if unable to validate coordinator xml 376 */ 377 private void validateXml(String xmlContent) throws CoordinatorJobException { 378 javax.xml.validation.Schema schema = Services.get().get(SchemaService.class).getSchema(SchemaName.COORDINATOR); 379 Validator validator = schema.newValidator(); 380 try { 381 validator.validate(new StreamSource(new StringReader(xmlContent))); 382 } 383 catch (SAXException ex) { 384 LOG.warn("SAXException :", ex); 385 throw new CoordinatorJobException(ErrorCode.E0701, ex.getMessage(), ex); 386 } 387 catch (IOException ex) { 388 LOG.warn("IOException :", ex); 389 throw new CoordinatorJobException(ErrorCode.E0702, ex.getMessage(), ex); 390 } 391 } 392 393 /** 394 * Read the application XML schema namespace 395 * 396 * @param xmlContent input coordinator xml 397 * @return app xml namespace 398 * @throws CoordinatorJobException 399 */ 400 private String readAppNamespace(String xmlContent) throws CoordinatorJobException { 401 try { 402 Element coordXmlElement = XmlUtils.parseXml(xmlContent); 403 Namespace ns = coordXmlElement.getNamespace(); 404 if (ns != null && bundleId != null && ns.getURI().equals(SchemaService.COORDINATOR_NAMESPACE_URI_1)) { 405 throw new CoordinatorJobException(ErrorCode.E1319, "bundle app can not submit coordinator namespace " 406 + SchemaService.COORDINATOR_NAMESPACE_URI_1 + ", please use 0.2 or later"); 407 } 408 if (ns != null) { 409 return ns.getURI(); 410 } 411 else { 412 throw new CoordinatorJobException(ErrorCode.E0700, "the application xml namespace is not given"); 413 } 414 } 415 catch (JDOMException ex) { 416 LOG.warn("JDOMException :", ex); 417 throw new CoordinatorJobException(ErrorCode.E0700, ex.getMessage(), ex); 418 } 419 } 420 421 /** 422 * Merge default configuration with user-defined configuration. 423 * 424 * @throws CommandException thrown if failed to read or merge configurations 425 */ 426 protected void mergeDefaultConfig() throws CommandException { 427 Path configDefault = null; 428 try { 429 String coordAppPathStr = conf.get(OozieClient.COORDINATOR_APP_PATH); 430 Path coordAppPath = new Path(coordAppPathStr); 431 String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME); 432 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 433 Configuration fsConf = has.createJobConf(coordAppPath.toUri().getAuthority()); 434 FileSystem fs = has.createFileSystem(user, coordAppPath.toUri(), fsConf); 435 436 // app path could be a directory 437 if (!fs.isFile(coordAppPath)) { 438 configDefault = new Path(coordAppPath, CONFIG_DEFAULT); 439 } else { 440 configDefault = new Path(coordAppPath.getParent(), CONFIG_DEFAULT); 441 } 442 443 if (fs.exists(configDefault)) { 444 Configuration defaultConf = new XConfiguration(fs.open(configDefault)); 445 PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES); 446 XConfiguration.injectDefaults(defaultConf, conf); 447 } 448 else { 449 LOG.info("configDefault Doesn't exist " + configDefault); 450 } 451 PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES); 452 453 // Resolving all variables in the job properties. 454 // This ensures the Hadoop Configuration semantics is preserved. 455 XConfiguration resolvedVarsConf = new XConfiguration(); 456 for (Map.Entry<String, String> entry : conf) { 457 resolvedVarsConf.set(entry.getKey(), conf.get(entry.getKey())); 458 } 459 conf = resolvedVarsConf; 460 } 461 catch (IOException e) { 462 throw new CommandException(ErrorCode.E0702, e.getMessage() + " : Problem reading default config " 463 + configDefault, e); 464 } 465 catch (HadoopAccessorException e) { 466 throw new CommandException(e); 467 } 468 LOG.debug("Merged CONF :" + XmlUtils.prettyPrint(conf).toString()); 469 } 470 471 /** 472 * The method resolve all the variables that are defined in configuration. It also include the data set definition 473 * from dataset file into XML. 474 * 475 * @param appXml : Original job XML 476 * @param conf : Configuration of the job 477 * @param coordJob : Coordinator job bean to be populated. 478 * @return Resolved and modified job XML element. 479 * @throws CoordinatorJobException thrown if failed to resolve basic entities or include referred datasets 480 * @throws Exception thrown if failed to resolve basic entities or include referred datasets 481 */ 482 public Element basicResolveAndIncludeDS(String appXml, Configuration conf, CoordinatorJobBean coordJob) 483 throws CoordinatorJobException, Exception { 484 Element basicResolvedApp = resolveInitial(conf, appXml, coordJob); 485 includeDataSets(basicResolvedApp, conf); 486 return basicResolvedApp; 487 } 488 489 /** 490 * Insert data set into data-in and data-out tags. 491 * 492 * @param eAppXml : coordinator application XML 493 * @param eDatasets : DataSet XML 494 */ 495 @SuppressWarnings("unchecked") 496 private void insertDataSet(Element eAppXml, Element eDatasets) { 497 // Adding DS definition in the coordinator XML 498 Element inputList = eAppXml.getChild("input-events", eAppXml.getNamespace()); 499 if (inputList != null) { 500 for (Element dataIn : (List<Element>) inputList.getChildren("data-in", eAppXml.getNamespace())) { 501 Element eDataset = findDataSet(eDatasets, dataIn.getAttributeValue("dataset")); 502 dataIn.getContent().add(0, eDataset); 503 } 504 } 505 Element outputList = eAppXml.getChild("output-events", eAppXml.getNamespace()); 506 if (outputList != null) { 507 for (Element dataOut : (List<Element>) outputList.getChildren("data-out", eAppXml.getNamespace())) { 508 Element eDataset = findDataSet(eDatasets, dataOut.getAttributeValue("dataset")); 509 dataOut.getContent().add(0, eDataset); 510 } 511 } 512 } 513 514 /** 515 * Find a specific dataset from a list of Datasets. 516 * 517 * @param eDatasets : List of data sets 518 * @param name : queried data set name 519 * @return one Dataset element. otherwise throw Exception 520 */ 521 @SuppressWarnings("unchecked") 522 private static Element findDataSet(Element eDatasets, String name) { 523 for (Element eDataset : (List<Element>) eDatasets.getChildren("dataset", eDatasets.getNamespace())) { 524 if (eDataset.getAttributeValue("name").equals(name)) { 525 eDataset = (Element) eDataset.clone(); 526 eDataset.detach(); 527 return eDataset; 528 } 529 } 530 throw new RuntimeException("undefined dataset: " + name); 531 } 532 533 /** 534 * Initialize all the required EL Evaluators. 535 */ 536 protected void initEvaluators() { 537 evalFreq = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-submit-freq"); 538 evalNofuncs = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-submit-nofuncs"); 539 evalInst = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-submit-instances"); 540 evalSla = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-sla-submit"); 541 evalAction = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-action-start"); 542 } 543 544 /** 545 * Resolve basic entities using job Configuration. 546 * 547 * @param conf :Job configuration 548 * @param appXml : Original job XML 549 * @param coordJob : Coordinator job bean to be populated. 550 * @return Resolved job XML element. 551 * @throws CoordinatorJobException thrown if failed to resolve basic entities 552 * @throws Exception thrown if failed to resolve basic entities 553 */ 554 @SuppressWarnings("unchecked") 555 protected Element resolveInitial(Configuration conf, String appXml, CoordinatorJobBean coordJob) 556 throws CoordinatorJobException, Exception { 557 Element eAppXml = XmlUtils.parseXml(appXml); 558 // job's main attributes 559 // frequency 560 String val = resolveAttribute("frequency", eAppXml, evalFreq); 561 int ival = ParamChecker.checkInteger(val, "frequency"); 562 ParamChecker.checkGTZero(ival, "frequency"); 563 coordJob.setFrequency(ival); 564 TimeUnit tmp = (evalFreq.getVariable("timeunit") == null) ? TimeUnit.MINUTE : ((TimeUnit) evalFreq 565 .getVariable("timeunit")); 566 addAnAttribute("freq_timeunit", eAppXml, tmp.toString()); 567 // TimeUnit 568 coordJob.setTimeUnit(CoordinatorJob.Timeunit.valueOf(tmp.toString())); 569 // End Of Duration 570 tmp = evalFreq.getVariable("endOfDuration") == null ? TimeUnit.NONE : ((TimeUnit) evalFreq 571 .getVariable("endOfDuration")); 572 addAnAttribute("end_of_duration", eAppXml, tmp.toString()); 573 // coordJob.setEndOfDuration(tmp) // TODO: Add new attribute in Job bean 574 575 // Application name 576 if (this.coordName == null) { 577 val = resolveAttribute("name", eAppXml, evalNofuncs); 578 coordJob.setAppName(val); 579 } 580 else { 581 // this coord job is created from bundle 582 coordJob.setAppName(this.coordName); 583 } 584 585 // start time 586 val = resolveAttribute("start", eAppXml, evalNofuncs); 587 ParamChecker.checkUTC(val, "start"); 588 coordJob.setStartTime(DateUtils.parseDateUTC(val)); 589 // end time 590 val = resolveAttribute("end", eAppXml, evalNofuncs); 591 ParamChecker.checkUTC(val, "end"); 592 coordJob.setEndTime(DateUtils.parseDateUTC(val)); 593 // Time zone 594 val = resolveAttribute("timezone", eAppXml, evalNofuncs); 595 ParamChecker.checkTimeZone(val, "timezone"); 596 coordJob.setTimeZone(val); 597 598 // controls 599 val = resolveTagContents("timeout", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs); 600 if (val == "") { 601 val = Services.get().getConf().get(CONF_DEFAULT_TIMEOUT_NORMAL); 602 } 603 604 ival = ParamChecker.checkInteger(val, "timeout"); 605 if (ival < 0 || ival > Services.get().getConf().getInt(CONF_DEFAULT_MAX_TIMEOUT, 129600)) { 606 ival = Services.get().getConf().getInt(CONF_DEFAULT_MAX_TIMEOUT, 129600); 607 } 608 coordJob.setTimeout(ival); 609 610 val = resolveTagContents("concurrency", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs); 611 if (val == null || val.isEmpty()) { 612 val = Services.get().getConf().get(CONF_DEFAULT_CONCURRENCY, "1"); 613 } 614 ival = ParamChecker.checkInteger(val, "concurrency"); 615 coordJob.setConcurrency(ival); 616 617 val = resolveTagContents("throttle", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs); 618 if (val == null || val.isEmpty()) { 619 int defaultThrottle = Services.get().getConf().getInt(CONF_DEFAULT_THROTTLE, 12); 620 ival = defaultThrottle; 621 } 622 else { 623 ival = ParamChecker.checkInteger(val, "throttle"); 624 } 625 int maxQueue = Services.get().getConf().getInt(CONF_QUEUE_SIZE, 10000); 626 float factor = Services.get().getConf().getFloat(CONF_MAT_THROTTLING_FACTOR, 0.10f); 627 int maxThrottle = (int) (maxQueue * factor); 628 if (ival > maxThrottle || ival < 1) { 629 ival = maxThrottle; 630 } 631 LOG.debug("max throttle " + ival); 632 coordJob.setMatThrottling(ival); 633 634 val = resolveTagContents("execution", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs); 635 if (val == "") { 636 val = Execution.FIFO.toString(); 637 } 638 coordJob.setExecution(Execution.valueOf(val)); 639 String[] acceptedVals = { Execution.LIFO.toString(), Execution.FIFO.toString(), Execution.LAST_ONLY.toString() }; 640 ParamChecker.isMember(val, acceptedVals, "execution"); 641 642 // datasets 643 resolveTagContents("include", eAppXml.getChild("datasets", eAppXml.getNamespace()), evalNofuncs); 644 // for each data set 645 resolveDataSets(eAppXml); 646 HashMap<String, String> dataNameList = new HashMap<String, String>(); 647 resolveIOEvents(eAppXml, dataNameList); 648 649 resolveTagContents("app-path", eAppXml.getChild("action", eAppXml.getNamespace()).getChild("workflow", 650 eAppXml.getNamespace()), evalNofuncs); 651 // TODO: If action or workflow tag is missing, NullPointerException will 652 // occur 653 Element configElem = eAppXml.getChild("action", eAppXml.getNamespace()).getChild("workflow", 654 eAppXml.getNamespace()).getChild("configuration", eAppXml.getNamespace()); 655 evalData = CoordELEvaluator.createELEvaluatorForDataEcho(conf, "coord-job-submit-data", dataNameList); 656 if (configElem != null) { 657 for (Element propElem : (List<Element>) configElem.getChildren("property", configElem.getNamespace())) { 658 resolveTagContents("name", propElem, evalData); 659 // Want to check the data-integrity but don't want to modify the 660 // XML 661 // for properties only 662 Element tmpProp = (Element) propElem.clone(); 663 resolveTagContents("value", tmpProp, evalData); 664 } 665 } 666 resolveSLA(eAppXml, coordJob); 667 return eAppXml; 668 } 669 670 /** 671 * Resolve SLA events 672 * 673 * @param eAppXml job XML 674 * @param coordJob coordinator job bean 675 * @throws CommandException thrown if failed to resolve sla events 676 */ 677 private void resolveSLA(Element eAppXml, CoordinatorJobBean coordJob) throws CommandException { 678 Element eSla = eAppXml.getChild("action", eAppXml.getNamespace()).getChild("info", 679 Namespace.getNamespace(SchemaService.SLA_NAME_SPACE_URI)); 680 681 if (eSla != null) { 682 String slaXml = XmlUtils.prettyPrint(eSla).toString(); 683 try { 684 // EL evaluation 685 slaXml = evalSla.evaluate(slaXml, String.class); 686 // Validate against semantic SXD 687 XmlUtils.validateData(slaXml, SchemaName.SLA_ORIGINAL); 688 } 689 catch (Exception e) { 690 throw new CommandException(ErrorCode.E1004, "Validation ERROR :" + e.getMessage(), e); 691 } 692 } 693 } 694 695 /** 696 * Resolve input-events/data-in and output-events/data-out tags. 697 * 698 * @param eJob : Job element 699 * @throws CoordinatorJobException thrown if failed to resolve input and output events 700 */ 701 @SuppressWarnings("unchecked") 702 private void resolveIOEvents(Element eJobOrg, HashMap<String, String> dataNameList) throws CoordinatorJobException { 703 // Resolving input-events/data-in 704 // Clone the job and don't update anything in the original 705 Element eJob = (Element) eJobOrg.clone(); 706 Element inputList = eJob.getChild("input-events", eJob.getNamespace()); 707 if (inputList != null) { 708 TreeSet<String> eventNameSet = new TreeSet<String>(); 709 for (Element dataIn : (List<Element>) inputList.getChildren("data-in", eJob.getNamespace())) { 710 String dataInName = dataIn.getAttributeValue("name"); 711 dataNameList.put(dataInName, "data-in"); 712 // check whether there is any duplicate data-in name 713 if (eventNameSet.contains(dataInName)) { 714 throw new RuntimeException("Duplicate dataIn name " + dataInName); 715 } 716 else { 717 eventNameSet.add(dataInName); 718 } 719 resolveTagContents("instance", dataIn, evalInst); 720 resolveTagContents("start-instance", dataIn, evalInst); 721 resolveTagContents("end-instance", dataIn, evalInst); 722 } 723 } 724 // Resolving output-events/data-out 725 Element outputList = eJob.getChild("output-events", eJob.getNamespace()); 726 if (outputList != null) { 727 TreeSet<String> eventNameSet = new TreeSet<String>(); 728 for (Element dataOut : (List<Element>) outputList.getChildren("data-out", eJob.getNamespace())) { 729 String dataOutName = dataOut.getAttributeValue("name"); 730 dataNameList.put(dataOutName, "data-out"); 731 // check whether there is any duplicate data-out name 732 if (eventNameSet.contains(dataOutName)) { 733 throw new RuntimeException("Duplicate dataIn name " + dataOutName); 734 } 735 else { 736 eventNameSet.add(dataOutName); 737 } 738 resolveTagContents("instance", dataOut, evalInst); 739 } 740 } 741 742 } 743 744 /** 745 * Add an attribute into XML element. 746 * 747 * @param attrName :attribute name 748 * @param elem : Element to add attribute 749 * @param value :Value of attribute 750 */ 751 private void addAnAttribute(String attrName, Element elem, String value) { 752 elem.setAttribute(attrName, value); 753 } 754 755 /** 756 * Resolve datasets using job configuration. 757 * 758 * @param eAppXml : Job Element XML 759 * @throws Exception thrown if failed to resolve datasets 760 */ 761 @SuppressWarnings("unchecked") 762 private void resolveDataSets(Element eAppXml) throws Exception { 763 Element datasetList = eAppXml.getChild("datasets", eAppXml.getNamespace()); 764 if (datasetList != null) { 765 766 List<Element> dsElems = datasetList.getChildren("dataset", eAppXml.getNamespace()); 767 resolveDataSets(dsElems); 768 resolveTagContents("app-path", eAppXml.getChild("action", eAppXml.getNamespace()).getChild("workflow", 769 eAppXml.getNamespace()), evalNofuncs); 770 } 771 } 772 773 /** 774 * Resolve datasets using job configuration. 775 * 776 * @param dsElems : Data set XML element. 777 * @throws CoordinatorJobException thrown if failed to resolve datasets 778 */ 779 private void resolveDataSets(List<Element> dsElems) throws CoordinatorJobException { 780 for (Element dsElem : dsElems) { 781 // Setting up default TimeUnit and EndOFDuraion 782 evalFreq.setVariable("timeunit", TimeUnit.MINUTE); 783 evalFreq.setVariable("endOfDuration", TimeUnit.NONE); 784 785 String val = resolveAttribute("frequency", dsElem, evalFreq); 786 int ival = ParamChecker.checkInteger(val, "frequency"); 787 ParamChecker.checkGTZero(ival, "frequency"); 788 addAnAttribute("freq_timeunit", dsElem, evalFreq.getVariable("timeunit") == null ? TimeUnit.MINUTE 789 .toString() : ((TimeUnit) evalFreq.getVariable("timeunit")).toString()); 790 addAnAttribute("end_of_duration", dsElem, evalFreq.getVariable("endOfDuration") == null ? TimeUnit.NONE 791 .toString() : ((TimeUnit) evalFreq.getVariable("endOfDuration")).toString()); 792 val = resolveAttribute("initial-instance", dsElem, evalNofuncs); 793 ParamChecker.checkUTC(val, "initial-instance"); 794 checkInitialInstance(val); 795 val = resolveAttribute("timezone", dsElem, evalNofuncs); 796 ParamChecker.checkTimeZone(val, "timezone"); 797 resolveTagContents("uri-template", dsElem, evalNofuncs); 798 resolveTagContents("done-flag", dsElem, evalNofuncs); 799 } 800 } 801 802 /** 803 * Resolve the content of a tag. 804 * 805 * @param tagName : Tag name of job XML i.e. <timeout> 10 </timeout> 806 * @param elem : Element where the tag exists. 807 * @param eval : EL evealuator 808 * @return Resolved tag content. 809 * @throws CoordinatorJobException thrown if failed to resolve tag content 810 */ 811 @SuppressWarnings("unchecked") 812 private String resolveTagContents(String tagName, Element elem, ELEvaluator eval) throws CoordinatorJobException { 813 String ret = ""; 814 if (elem != null) { 815 for (Element tagElem : (List<Element>) elem.getChildren(tagName, elem.getNamespace())) { 816 if (tagElem != null) { 817 String updated; 818 try { 819 updated = CoordELFunctions.evalAndWrap(eval, tagElem.getText().trim()); 820 821 } 822 catch (Exception e) { 823 throw new CoordinatorJobException(ErrorCode.E1004, e.getMessage(), e); 824 } 825 tagElem.removeContent(); 826 tagElem.addContent(updated); 827 ret += updated; 828 } 829 } 830 } 831 return ret; 832 } 833 834 /** 835 * Resolve an attribute value. 836 * 837 * @param attrName : Attribute name. 838 * @param elem : XML Element where attribute is defiend 839 * @param eval : ELEvaluator used to resolve 840 * @return Resolved attribute value 841 * @throws CoordinatorJobException thrown if failed to resolve an attribute value 842 */ 843 private String resolveAttribute(String attrName, Element elem, ELEvaluator eval) throws CoordinatorJobException { 844 Attribute attr = elem.getAttribute(attrName); 845 String val = null; 846 if (attr != null) { 847 try { 848 val = CoordELFunctions.evalAndWrap(eval, attr.getValue().trim()); 849 } 850 catch (Exception e) { 851 throw new CoordinatorJobException(ErrorCode.E1004, e.getMessage(), e); 852 } 853 attr.setValue(val); 854 } 855 return val; 856 } 857 858 /** 859 * Include referred datasets into XML. 860 * 861 * @param resolvedXml : Job XML element. 862 * @param conf : Job configuration 863 * @throws CoordinatorJobException thrown if failed to include referred datasets into XML 864 */ 865 @SuppressWarnings("unchecked") 866 protected void includeDataSets(Element resolvedXml, Configuration conf) throws CoordinatorJobException { 867 Element datasets = resolvedXml.getChild("datasets", resolvedXml.getNamespace()); 868 Element allDataSets = new Element("all_datasets", resolvedXml.getNamespace()); 869 List<String> dsList = new ArrayList<String>(); 870 if (datasets != null) { 871 for (Element includeElem : (List<Element>) datasets.getChildren("include", datasets.getNamespace())) { 872 String incDSFile = includeElem.getTextTrim(); 873 includeOneDSFile(incDSFile, dsList, allDataSets, datasets.getNamespace()); 874 } 875 for (Element e : (List<Element>) datasets.getChildren("dataset", datasets.getNamespace())) { 876 String dsName = e.getAttributeValue("name"); 877 if (dsList.contains(dsName)) {// Override with this DS 878 // Remove old DS 879 removeDataSet(allDataSets, dsName); 880 } 881 else { 882 dsList.add(dsName); 883 } 884 allDataSets.addContent((Element) e.clone()); 885 } 886 } 887 insertDataSet(resolvedXml, allDataSets); 888 resolvedXml.removeChild("datasets", resolvedXml.getNamespace()); 889 } 890 891 /** 892 * Include one dataset file. 893 * 894 * @param incDSFile : Include data set filename. 895 * @param dsList :List of dataset names to verify the duplicate. 896 * @param allDataSets : Element that includes all dataset definitions. 897 * @param dsNameSpace : Data set name space 898 * @throws CoordinatorJobException thrown if failed to include one dataset file 899 */ 900 @SuppressWarnings("unchecked") 901 private void includeOneDSFile(String incDSFile, List<String> dsList, Element allDataSets, Namespace dsNameSpace) 902 throws CoordinatorJobException { 903 Element tmpDataSets = null; 904 try { 905 String dsXml = readDefinition(incDSFile); 906 LOG.debug("DSFILE :" + incDSFile + "\n" + dsXml); 907 tmpDataSets = XmlUtils.parseXml(dsXml); 908 } 909 catch (JDOMException e) { 910 LOG.warn("Error parsing included dataset [{0}]. Message [{1}]", incDSFile, e.getMessage()); 911 throw new CoordinatorJobException(ErrorCode.E0700, e.getMessage()); 912 } 913 resolveDataSets(tmpDataSets.getChildren("dataset")); 914 for (Element e : (List<Element>) tmpDataSets.getChildren("dataset")) { 915 String dsName = e.getAttributeValue("name"); 916 if (dsList.contains(dsName)) { 917 throw new RuntimeException("Duplicate Dataset " + dsName); 918 } 919 dsList.add(dsName); 920 Element tmp = (Element) e.clone(); 921 // TODO: Don't like to over-write the external/include DS's namespace 922 tmp.setNamespace(dsNameSpace); 923 tmp.getChild("uri-template").setNamespace(dsNameSpace); 924 if (e.getChild("done-flag") != null) { 925 tmp.getChild("done-flag").setNamespace(dsNameSpace); 926 } 927 allDataSets.addContent(tmp); 928 } 929 // nested include 930 for (Element includeElem : (List<Element>) tmpDataSets.getChildren("include", tmpDataSets.getNamespace())) { 931 String incFile = includeElem.getTextTrim(); 932 includeOneDSFile(incFile, dsList, allDataSets, dsNameSpace); 933 } 934 } 935 936 /** 937 * Remove a dataset from a list of dataset. 938 * 939 * @param eDatasets : List of dataset 940 * @param name : Dataset name to be removed. 941 */ 942 @SuppressWarnings("unchecked") 943 private static void removeDataSet(Element eDatasets, String name) { 944 for (Element eDataset : (List<Element>) eDatasets.getChildren("dataset", eDatasets.getNamespace())) { 945 if (eDataset.getAttributeValue("name").equals(name)) { 946 eDataset.detach(); 947 } 948 } 949 throw new RuntimeException("undefined dataset: " + name); 950 } 951 952 /** 953 * Read coordinator definition. 954 * 955 * @param appPath application path. 956 * @return coordinator definition. 957 * @throws CoordinatorJobException thrown if the definition could not be read. 958 */ 959 protected String readDefinition(String appPath) throws CoordinatorJobException { 960 String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME); 961 // Configuration confHadoop = CoordUtils.getHadoopConf(conf); 962 try { 963 URI uri = new URI(appPath); 964 LOG.debug("user =" + user); 965 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 966 Configuration fsConf = has.createJobConf(uri.getAuthority()); 967 FileSystem fs = has.createFileSystem(user, uri, fsConf); 968 Path appDefPath = null; 969 970 // app path could be a directory 971 Path path = new Path(uri.getPath()); 972 // check file exists for dataset include file, app xml already checked 973 if (!fs.exists(path)) { 974 throw new URISyntaxException(path.toString(), "path not existed : " + path.toString()); 975 } 976 if (!fs.isFile(path)) { 977 appDefPath = new Path(path, COORDINATOR_XML_FILE); 978 } else { 979 appDefPath = path; 980 } 981 982 Reader reader = new InputStreamReader(fs.open(appDefPath)); 983 StringWriter writer = new StringWriter(); 984 IOUtils.copyCharStream(reader, writer); 985 return writer.toString(); 986 } 987 catch (IOException ex) { 988 LOG.warn("IOException :" + XmlUtils.prettyPrint(conf), ex); 989 throw new CoordinatorJobException(ErrorCode.E1001, ex.getMessage(), ex); 990 } 991 catch (URISyntaxException ex) { 992 LOG.warn("URISyException :" + ex.getMessage()); 993 throw new CoordinatorJobException(ErrorCode.E1002, appPath, ex.getMessage(), ex); 994 } 995 catch (HadoopAccessorException ex) { 996 throw new CoordinatorJobException(ex); 997 } 998 catch (Exception ex) { 999 LOG.warn("Exception :", ex); 1000 throw new CoordinatorJobException(ErrorCode.E1001, ex.getMessage(), ex); 1001 } 1002 } 1003 1004 /** 1005 * Write a coordinator job into database 1006 * 1007 * @param eJob : XML element of job 1008 * @param coordJob : Coordinator job bean 1009 * @return Job id 1010 * @throws CommandException thrown if unable to save coordinator job to db 1011 */ 1012 private String storeToDB(Element eJob, CoordinatorJobBean coordJob) throws CommandException { 1013 String jobId = Services.get().get(UUIDService.class).generateId(ApplicationType.COORDINATOR); 1014 coordJob.setId(jobId); 1015 coordJob.setAuthToken(this.authToken); 1016 1017 coordJob.setAppPath(conf.get(OozieClient.COORDINATOR_APP_PATH)); 1018 coordJob.setCreatedTime(new Date()); 1019 coordJob.setUser(conf.get(OozieClient.USER_NAME)); 1020 String group = ConfigUtils.getWithDeprecatedCheck(conf, OozieClient.JOB_ACL, OozieClient.GROUP_NAME, null); 1021 coordJob.setGroup(group); 1022 coordJob.setConf(XmlUtils.prettyPrint(conf).toString()); 1023 coordJob.setJobXml(XmlUtils.prettyPrint(eJob).toString()); 1024 coordJob.setLastActionNumber(0); 1025 coordJob.setLastModifiedTime(new Date()); 1026 1027 if (!dryrun) { 1028 coordJob.setLastModifiedTime(new Date()); 1029 try { 1030 jpaService.execute(new CoordJobInsertJPAExecutor(coordJob)); 1031 } 1032 catch (JPAExecutorException je) { 1033 throw new CommandException(je); 1034 } 1035 } 1036 return jobId; 1037 } 1038 1039 /* 1040 * this method checks if the initial-instance specified for a particular 1041 is not a date earlier than the oozie server default Jan 01, 1970 00:00Z UTC 1042 */ 1043 private void checkInitialInstance(String val) throws CoordinatorJobException, IllegalArgumentException { 1044 Date initialInstance, givenInstance; 1045 DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm'Z'"); 1046 df.setTimeZone(TimeZone.getTimeZone("UTC")); 1047 try { 1048 initialInstance = DateUtils.parseDateUTC("1970-01-01T00:00Z"); 1049 givenInstance = DateUtils.parseDateUTC(val); 1050 } 1051 catch (Exception e) { 1052 throw new IllegalArgumentException("Unable to parse dataset initial-instance string '" + val + "' to Date object. ",e); 1053 } 1054 if(givenInstance.compareTo(initialInstance) < 0) { 1055 throw new CoordinatorJobException(ErrorCode.E1021, "Dataset initial-instance " + df.format(givenInstance) + " is earlier than the default initial instance " + df.format(initialInstance)); 1056 } 1057 } 1058 1059 /* (non-Javadoc) 1060 * @see org.apache.oozie.command.XCommand#getEntityKey() 1061 */ 1062 @Override 1063 public String getEntityKey() { 1064 return null; 1065 } 1066 1067 /* (non-Javadoc) 1068 * @see org.apache.oozie.command.XCommand#isLockRequired() 1069 */ 1070 @Override 1071 protected boolean isLockRequired() { 1072 return false; 1073 } 1074 1075 /* (non-Javadoc) 1076 * @see org.apache.oozie.command.XCommand#loadState() 1077 */ 1078 @Override 1079 protected void loadState() throws CommandException { 1080 jpaService = Services.get().get(JPAService.class); 1081 if (jpaService == null) { 1082 throw new CommandException(ErrorCode.E0610); 1083 } 1084 coordJob = new CoordinatorJobBean(); 1085 if (this.bundleId != null) { 1086 // this coord job is created from bundle 1087 coordJob.setBundleId(this.bundleId); 1088 // first use bundle id if submit thru bundle 1089 logInfo.setParameter(DagXLogInfoService.JOB, this.bundleId); 1090 LogUtils.setLogInfo(logInfo); 1091 } 1092 if (this.coordName != null) { 1093 // this coord job is created from bundle 1094 coordJob.setAppName(this.coordName); 1095 } 1096 setJob(coordJob); 1097 1098 } 1099 1100 /* (non-Javadoc) 1101 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 1102 */ 1103 @Override 1104 protected void verifyPrecondition() throws CommandException { 1105 1106 } 1107 1108 /* (non-Javadoc) 1109 * @see org.apache.oozie.command.TransitionXCommand#notifyParent() 1110 */ 1111 @Override 1112 public void notifyParent() throws CommandException { 1113 // update bundle action 1114 if (coordJob.getBundleId() != null) { 1115 LOG.debug("Updating bundle record: " + coordJob.getBundleId() + " for coord id: " + coordJob.getId()); 1116 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus); 1117 bundleStatusUpdate.call(); 1118 } 1119 } 1120 1121 /* (non-Javadoc) 1122 * @see org.apache.oozie.command.TransitionXCommand#updateJob() 1123 */ 1124 @Override 1125 public void updateJob() throws CommandException { 1126 } 1127 1128 /* (non-Javadoc) 1129 * @see org.apache.oozie.command.TransitionXCommand#getJob() 1130 */ 1131 @Override 1132 public Job getJob() { 1133 return coordJob; 1134 } 1135 }