001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.command.coord; 019 020import java.io.IOException; 021import java.io.InputStreamReader; 022import java.io.Reader; 023import java.io.StringReader; 024import java.io.StringWriter; 025import java.net.URI; 026import java.net.URISyntaxException; 027import java.util.ArrayList; 028import java.util.Calendar; 029import java.util.Date; 030import java.util.HashMap; 031import java.util.HashSet; 032import java.util.Iterator; 033import java.util.List; 034import java.util.Map; 035import java.util.Set; 036import java.util.TreeSet; 037 038import javax.xml.transform.stream.StreamSource; 039import javax.xml.validation.Validator; 040 041import org.apache.hadoop.conf.Configuration; 042import org.apache.hadoop.fs.FileSystem; 043import org.apache.hadoop.fs.Path; 044import org.apache.oozie.CoordinatorJobBean; 045import org.apache.oozie.ErrorCode; 046import org.apache.oozie.client.CoordinatorJob; 047import org.apache.oozie.client.Job; 048import org.apache.oozie.client.OozieClient; 049import org.apache.oozie.client.CoordinatorJob.Execution; 050import org.apache.oozie.command.CommandException; 051import org.apache.oozie.command.SubmitTransitionXCommand; 052import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand; 053import org.apache.oozie.coord.CoordELEvaluator; 054import org.apache.oozie.coord.CoordELFunctions; 055import org.apache.oozie.coord.CoordinatorJobException; 056import org.apache.oozie.coord.TimeUnit; 057import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; 058import org.apache.oozie.executor.jpa.JPAExecutorException; 059import org.apache.oozie.service.DagXLogInfoService; 060import org.apache.oozie.service.HadoopAccessorException; 061import org.apache.oozie.service.HadoopAccessorService; 062import org.apache.oozie.service.JPAService; 063import org.apache.oozie.service.SchemaService; 064import org.apache.oozie.service.Service; 065import org.apache.oozie.service.Services; 066import org.apache.oozie.service.UUIDService; 067import org.apache.oozie.service.SchemaService.SchemaName; 068import org.apache.oozie.service.UUIDService.ApplicationType; 069import org.apache.oozie.util.ConfigUtils; 070import org.apache.oozie.util.DateUtils; 071import org.apache.oozie.util.ELEvaluator; 072import org.apache.oozie.util.ELUtils; 073import org.apache.oozie.util.IOUtils; 074import org.apache.oozie.util.InstrumentUtils; 075import org.apache.oozie.util.LogUtils; 076import org.apache.oozie.util.ParamChecker; 077import org.apache.oozie.util.ParameterVerifier; 078import org.apache.oozie.util.ParameterVerifierException; 079import org.apache.oozie.util.PropertiesUtils; 080import org.apache.oozie.util.XConfiguration; 081import org.apache.oozie.util.XLog; 082import org.apache.oozie.util.XmlUtils; 083import org.jdom.Attribute; 084import org.jdom.Element; 085import org.jdom.JDOMException; 086import org.jdom.Namespace; 087import org.xml.sax.SAXException; 088 089/** 090 * This class provides the functionalities to resolve a coordinator job XML and write the job information into a DB 091 * table. 092 * <p/> 093 * Specifically it performs the following functions: 1. Resolve all the variables or properties using job 094 * configurations. 2. Insert all datasets definition as part of the <data-in> and <data-out> tags. 3. Validate the XML 095 * at runtime. 096 */ 097public class CoordSubmitXCommand extends SubmitTransitionXCommand { 098 099 protected Configuration conf; 100 private final String bundleId; 101 private final String coordName; 102 protected boolean dryrun; 103 protected JPAService jpaService = null; 104 private CoordinatorJob.Status prevStatus = CoordinatorJob.Status.PREP; 105 106 public static final String CONFIG_DEFAULT = "coord-config-default.xml"; 107 public static final String COORDINATOR_XML_FILE = "coordinator.xml"; 108 public final String COORD_INPUT_EVENTS ="input-events"; 109 public final String COORD_OUTPUT_EVENTS = "output-events"; 110 public final String COORD_INPUT_EVENTS_DATA_IN ="data-in"; 111 public final String COORD_OUTPUT_EVENTS_DATA_OUT = "data-out"; 112 113 private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>(); 114 private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>(); 115 116 protected CoordinatorJobBean coordJob = null; 117 /** 118 * Default timeout for normal jobs, in minutes, after which coordinator input check will timeout 119 */ 120 public static final String CONF_DEFAULT_TIMEOUT_NORMAL = Service.CONF_PREFIX + "coord.normal.default.timeout"; 121 122 public static final String CONF_DEFAULT_CONCURRENCY = Service.CONF_PREFIX + "coord.default.concurrency"; 123 124 public static final String CONF_DEFAULT_THROTTLE = Service.CONF_PREFIX + "coord.default.throttle"; 125 126 public static final String CONF_MAT_THROTTLING_FACTOR = Service.CONF_PREFIX 127 + "coord.materialization.throttling.factor"; 128 129 /** 130 * Default MAX timeout in minutes, after which coordinator input check will timeout 131 */ 132 public static final String CONF_DEFAULT_MAX_TIMEOUT = Service.CONF_PREFIX + "coord.default.max.timeout"; 133 134 public static final String CONF_QUEUE_SIZE = Service.CONF_PREFIX + "CallableQueueService.queue.size"; 135 136 public static final String CONF_CHECK_MAX_FREQUENCY = Service.CONF_PREFIX + "coord.check.maximum.frequency"; 137 138 private ELEvaluator evalFreq = null; 139 private ELEvaluator evalNofuncs = null; 140 private ELEvaluator evalData = null; 141 private ELEvaluator evalInst = null; 142 private ELEvaluator evalAction = null; 143 private ELEvaluator evalSla = null; 144 private ELEvaluator evalTimeout = null; 145 146 static { 147 String[] badUserProps = { PropertiesUtils.YEAR, PropertiesUtils.MONTH, PropertiesUtils.DAY, 148 PropertiesUtils.HOUR, PropertiesUtils.MINUTE, PropertiesUtils.DAYS, PropertiesUtils.HOURS, 149 PropertiesUtils.MINUTES, PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB, 150 PropertiesUtils.TB, PropertiesUtils.PB, PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN, 151 PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN, PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS }; 152 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES); 153 154 String[] badDefaultProps = { PropertiesUtils.HADOOP_USER}; 155 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES); 156 PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES); 157 } 158 159 /** 160 * Constructor to create the Coordinator Submit Command. 161 * 162 * @param conf : Configuration for Coordinator job 163 */ 164 public CoordSubmitXCommand(Configuration conf) { 165 super("coord_submit", "coord_submit", 1); 166 this.conf = ParamChecker.notNull(conf, "conf"); 167 this.bundleId = null; 168 this.coordName = null; 169 } 170 171 /** 172 * Constructor to create the Coordinator Submit Command by bundle job. 173 * 174 * @param conf : Configuration for Coordinator job 175 * @param bundleId : bundle id 176 * @param coordName : coord name 177 */ 178 public CoordSubmitXCommand(Configuration conf, String bundleId, String coordName) { 179 super("coord_submit", "coord_submit", 1); 180 this.conf = ParamChecker.notNull(conf, "conf"); 181 this.bundleId = ParamChecker.notEmpty(bundleId, "bundleId"); 182 this.coordName = ParamChecker.notEmpty(coordName, "coordName"); 183 } 184 185 /** 186 * Constructor to create the Coordinator Submit Command. 187 * 188 * @param dryrun : if dryrun 189 * @param conf : Configuration for Coordinator job 190 */ 191 public CoordSubmitXCommand(boolean dryrun, Configuration conf) { 192 this(conf); 193 this.dryrun = dryrun; 194 } 195 196 /* (non-Javadoc) 197 * @see org.apache.oozie.command.XCommand#execute() 198 */ 199 @Override 200 protected String submit() throws CommandException { 201 LOG.info("STARTED Coordinator Submit"); 202 String jobId = submitJob(); 203 LOG.info("ENDED Coordinator Submit jobId=" + jobId); 204 return jobId; 205 } 206 207 protected String submitJob() throws CommandException { 208 String jobId = null; 209 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation()); 210 211 boolean exceptionOccured = false; 212 try { 213 mergeDefaultConfig(); 214 215 String appXml = readAndValidateXml(); 216 coordJob.setOrigJobXml(appXml); 217 LOG.debug("jobXml after initial validation " + XmlUtils.prettyPrint(appXml).toString()); 218 219 Element eXml = XmlUtils.parseXml(appXml); 220 221 String appNamespace = readAppNamespace(eXml); 222 coordJob.setAppNamespace(appNamespace); 223 224 ParameterVerifier.verifyParameters(conf, eXml); 225 226 appXml = XmlUtils.removeComments(appXml); 227 initEvaluators(); 228 Element eJob = basicResolveAndIncludeDS(appXml, conf, coordJob); 229 230 validateCoordinatorJob(); 231 232 // checking if the coordinator application data input/output events 233 // specify multiple data instance values in erroneous manner 234 checkMultipleTimeInstances(eJob, COORD_INPUT_EVENTS, COORD_INPUT_EVENTS_DATA_IN); 235 checkMultipleTimeInstances(eJob, COORD_OUTPUT_EVENTS, COORD_OUTPUT_EVENTS_DATA_OUT); 236 237 LOG.debug("jobXml after all validation " + XmlUtils.prettyPrint(eJob).toString()); 238 239 jobId = storeToDB(appXml, eJob, coordJob); 240 // log job info for coordinator job 241 LogUtils.setLogInfo(coordJob, logInfo); 242 LOG = XLog.resetPrefix(LOG); 243 244 if (!dryrun) { 245 queueMaterializeTransitionXCommand(jobId); 246 } 247 else { 248 return getDryRun(coordJob); 249 } 250 } 251 catch (JDOMException jex) { 252 exceptionOccured = true; 253 LOG.warn("ERROR: ", jex); 254 throw new CommandException(ErrorCode.E0700, jex.getMessage(), jex); 255 } 256 catch (CoordinatorJobException cex) { 257 exceptionOccured = true; 258 LOG.warn("ERROR: ", cex); 259 throw new CommandException(cex); 260 } 261 catch (ParameterVerifierException pex) { 262 exceptionOccured = true; 263 LOG.warn("ERROR: ", pex); 264 throw new CommandException(pex); 265 } 266 catch (IllegalArgumentException iex) { 267 exceptionOccured = true; 268 LOG.warn("ERROR: ", iex); 269 throw new CommandException(ErrorCode.E1003, iex.getMessage(), iex); 270 } 271 catch (Exception ex) { 272 exceptionOccured = true; 273 LOG.warn("ERROR: ", ex); 274 throw new CommandException(ErrorCode.E0803, ex.getMessage(), ex); 275 } 276 finally { 277 if (exceptionOccured) { 278 if (coordJob.getId() == null || coordJob.getId().equalsIgnoreCase("")) { 279 coordJob.setStatus(CoordinatorJob.Status.FAILED); 280 coordJob.resetPending(); 281 } 282 } 283 } 284 return jobId; 285 } 286 287 /** 288 * Gets the dryrun output. 289 * 290 * @param jobId the job id 291 * @return the dry run 292 * @throws Exception the exception 293 */ 294 protected String getDryRun(CoordinatorJobBean coordJob) throws Exception{ 295 Date startTime = coordJob.getStartTime(); 296 long startTimeMilli = startTime.getTime(); 297 long endTimeMilli = startTimeMilli + (3600 * 1000); 298 Date jobEndTime = coordJob.getEndTime(); 299 Date endTime = new Date(endTimeMilli); 300 if (endTime.compareTo(jobEndTime) > 0) { 301 endTime = jobEndTime; 302 } 303 String jobId = coordJob.getId(); 304 LOG.info("[" + jobId + "]: Update status to RUNNING"); 305 coordJob.setStatus(Job.Status.RUNNING); 306 coordJob.setPending(); 307 CoordActionMaterializeCommand coordActionMatCom = new CoordActionMaterializeCommand(jobId, startTime, 308 endTime); 309 Configuration jobConf = null; 310 try { 311 jobConf = new XConfiguration(new StringReader(coordJob.getConf())); 312 } 313 catch (IOException e1) { 314 LOG.warn("Configuration parse error. read from DB :" + coordJob.getConf(), e1); 315 } 316 String action = coordActionMatCom.materializeJobs(true, coordJob, jobConf, null); 317 String output = coordJob.getJobXml() + System.getProperty("line.separator") 318 + "***actions for instance***" + action; 319 return output; 320 } 321 322 /** 323 * Queue MaterializeTransitionXCommand 324 */ 325 protected void queueMaterializeTransitionXCommand(String jobId) { 326 // submit a command to materialize jobs for the next 1 hour (3600 secs) 327 // so we don't wait 10 mins for the Service to run. 328 queue(new CoordMaterializeTransitionXCommand(jobId, 3600), 100); 329 } 330 331 /** 332 * Method that validates values in the definition for correctness. Placeholder to add more. 333 */ 334 private void validateCoordinatorJob() throws Exception { 335 // check if startTime < endTime 336 if (!coordJob.getStartTime().before(coordJob.getEndTime())) { 337 throw new IllegalArgumentException("Coordinator Start Time must be earlier than End Time."); 338 } 339 340 try { 341 // Check if a coord job with cron frequency will materialize actions 342 int freq = Integer.parseInt(coordJob.getFrequency()); 343 344 // Check if the frequency is faster than 5 min if enabled 345 if (Services.get().getConf().getBoolean(CONF_CHECK_MAX_FREQUENCY, true)) { 346 CoordinatorJob.Timeunit unit = coordJob.getTimeUnit(); 347 if (freq == 0 || (freq < 5 && unit == CoordinatorJob.Timeunit.MINUTE)) { 348 throw new IllegalArgumentException("Coordinator job with frequency [" + freq + 349 "] minutes is faster than allowed maximum of 5 minutes (" 350 + CONF_CHECK_MAX_FREQUENCY + " is set to true)"); 351 } 352 } 353 } catch (NumberFormatException e) { 354 Date start = coordJob.getStartTime(); 355 Calendar cal = Calendar.getInstance(); 356 cal.setTime(start); 357 cal.add(Calendar.MINUTE, -1); 358 start = cal.getTime(); 359 360 Date nextTime = CoordCommandUtils.getNextValidActionTimeForCronFrequency(start, coordJob); 361 if (!nextTime.before(coordJob.getEndTime())) { 362 throw new IllegalArgumentException("Coordinator job with frequency '" + 363 coordJob.getFrequency() + "' materializes no actions between start and end time."); 364 } 365 } 366 } 367 368 /* 369 * Check against multiple data instance values inside a single <instance> <start-instance> or <end-instance> tag 370 * If found, the job is not submitted and user is informed to correct the error, instead of defaulting to the first instance value in the list 371 */ 372 private void checkMultipleTimeInstances(Element eCoordJob, String eventType, String dataType) throws CoordinatorJobException { 373 Element eventsSpec, dataSpec, instance; 374 List<Element> instanceSpecList; 375 Namespace ns = eCoordJob.getNamespace(); 376 String instanceValue; 377 eventsSpec = eCoordJob.getChild(eventType, ns); 378 if (eventsSpec != null) { 379 dataSpec = eventsSpec.getChild(dataType, ns); 380 if (dataSpec != null) { 381 // In case of input-events, there can be multiple child <instance> datasets. Iterating to ensure none of them have errors 382 instanceSpecList = dataSpec.getChildren("instance", ns); 383 Iterator instanceIter = instanceSpecList.iterator(); 384 while(instanceIter.hasNext()) { 385 instance = ((Element) instanceIter.next()); 386 if(instance.getContentSize() == 0) { //empty string or whitespace 387 throw new CoordinatorJobException(ErrorCode.E1021, "<instance> tag within " + eventType + " is empty!"); 388 } 389 instanceValue = instance.getContent(0).toString(); 390 boolean isInvalid = false; 391 try { 392 isInvalid = evalAction.checkForExistence(instanceValue, ","); 393 } catch (Exception e) { 394 handleELParseException(eventType, dataType, instanceValue); 395 } 396 if (isInvalid) { // reaching this block implies instance is not empty i.e. length > 0 397 handleExpresionWithMultipleInstances(eventType, dataType, instanceValue); 398 } 399 } 400 401 // In case of input-events, there can be multiple child <start-instance> datasets. Iterating to ensure none of them have errors 402 instanceSpecList = dataSpec.getChildren("start-instance", ns); 403 instanceIter = instanceSpecList.iterator(); 404 while(instanceIter.hasNext()) { 405 instance = ((Element) instanceIter.next()); 406 if(instance.getContentSize() == 0) { //empty string or whitespace 407 throw new CoordinatorJobException(ErrorCode.E1021, "<start-instance> tag within " + eventType + " is empty!"); 408 } 409 instanceValue = instance.getContent(0).toString(); 410 boolean isInvalid = false; 411 try { 412 isInvalid = evalAction.checkForExistence(instanceValue, ","); 413 } catch (Exception e) { 414 handleELParseException(eventType, dataType, instanceValue); 415 } 416 if (isInvalid) { // reaching this block implies start instance is not empty i.e. length > 0 417 handleExpresionWithStartMultipleInstances(eventType, dataType, instanceValue); 418 } 419 } 420 421 // In case of input-events, there can be multiple child <end-instance> datasets. Iterating to ensure none of them have errors 422 instanceSpecList = dataSpec.getChildren("end-instance", ns); 423 instanceIter = instanceSpecList.iterator(); 424 while(instanceIter.hasNext()) { 425 instance = ((Element) instanceIter.next()); 426 if(instance.getContentSize() == 0) { //empty string or whitespace 427 throw new CoordinatorJobException(ErrorCode.E1021, "<end-instance> tag within " + eventType + " is empty!"); 428 } 429 instanceValue = instance.getContent(0).toString(); 430 boolean isInvalid = false; 431 try { 432 isInvalid = evalAction.checkForExistence(instanceValue, ","); 433 } catch (Exception e) { 434 handleELParseException(eventType, dataType, instanceValue); 435 } 436 if (isInvalid) { // reaching this block implies instance is not empty i.e. length > 0 437 handleExpresionWithMultipleEndInstances(eventType, dataType, instanceValue); 438 } 439 } 440 441 } 442 } 443 } 444 445 private void handleELParseException(String eventType, String dataType, String instanceValue) 446 throws CoordinatorJobException { 447 String correctAction = null; 448 if(dataType.equals(COORD_INPUT_EVENTS_DATA_IN)) { 449 correctAction = "Coordinator app definition should have valid <instance> tag for data-in"; 450 } else if(dataType.equals(COORD_OUTPUT_EVENTS_DATA_OUT)) { 451 correctAction = "Coordinator app definition should have valid <instance> tag for data-out"; 452 } 453 throw new CoordinatorJobException(ErrorCode.E1021, eventType + " instance '" + instanceValue 454 + "' is not valid. Coordinator job NOT SUBMITTED. " + correctAction); 455 } 456 457 private void handleExpresionWithMultipleInstances(String eventType, String dataType, String instanceValue) 458 throws CoordinatorJobException { 459 String correctAction = null; 460 if(dataType.equals(COORD_INPUT_EVENTS_DATA_IN)) { 461 correctAction = "Coordinator app definition should have separate <instance> tag per data-in instance"; 462 } else if(dataType.equals(COORD_OUTPUT_EVENTS_DATA_OUT)) { 463 correctAction = "Coordinator app definition can have only one <instance> tag per data-out instance"; 464 } 465 throw new CoordinatorJobException(ErrorCode.E1021, eventType + " instance '" + instanceValue 466 + "' contains more than one date instance. Coordinator job NOT SUBMITTED. " + correctAction); 467 } 468 469 private void handleExpresionWithStartMultipleInstances(String eventType, String dataType, String instanceValue) 470 throws CoordinatorJobException { 471 String correctAction = "Coordinator app definition should not have multiple start-instances"; 472 throw new CoordinatorJobException(ErrorCode.E1021, eventType + " start-instance '" + instanceValue 473 + "' contains more than one date start-instance. Coordinator job NOT SUBMITTED. " + correctAction); 474 } 475 476 private void handleExpresionWithMultipleEndInstances(String eventType, String dataType, String instanceValue) 477 throws CoordinatorJobException { 478 String correctAction = "Coordinator app definition should not have multiple end-instances"; 479 throw new CoordinatorJobException(ErrorCode.E1021, eventType + " end-instance '" + instanceValue 480 + "' contains more than one date end-instance. Coordinator job NOT SUBMITTED. " + correctAction); 481 } 482 /** 483 * Read the application XML and validate against coordinator Schema 484 * 485 * @return validated coordinator XML 486 * @throws CoordinatorJobException thrown if unable to read or validate coordinator xml 487 */ 488 protected String readAndValidateXml() throws CoordinatorJobException { 489 String appPath = ParamChecker.notEmpty(conf.get(OozieClient.COORDINATOR_APP_PATH), 490 OozieClient.COORDINATOR_APP_PATH); 491 String coordXml = readDefinition(appPath); 492 validateXml(coordXml); 493 return coordXml; 494 } 495 496 /** 497 * Validate against Coordinator XSD file 498 * 499 * @param xmlContent : Input coordinator xml 500 * @throws CoordinatorJobException thrown if unable to validate coordinator xml 501 */ 502 private void validateXml(String xmlContent) throws CoordinatorJobException { 503 javax.xml.validation.Schema schema = Services.get().get(SchemaService.class).getSchema(SchemaName.COORDINATOR); 504 Validator validator = schema.newValidator(); 505 try { 506 validator.validate(new StreamSource(new StringReader(xmlContent))); 507 } 508 catch (SAXException ex) { 509 LOG.warn("SAXException :", ex); 510 throw new CoordinatorJobException(ErrorCode.E0701, ex.getMessage(), ex); 511 } 512 catch (IOException ex) { 513 LOG.warn("IOException :", ex); 514 throw new CoordinatorJobException(ErrorCode.E0702, ex.getMessage(), ex); 515 } 516 } 517 518 /** 519 * Read the application XML schema namespace 520 * 521 * @param coordXmlElement input coordinator xml Element 522 * @return app xml namespace 523 * @throws CoordinatorJobException 524 */ 525 private String readAppNamespace(Element coordXmlElement) throws CoordinatorJobException { 526 Namespace ns = coordXmlElement.getNamespace(); 527 if (ns != null && bundleId != null && ns.getURI().equals(SchemaService.COORDINATOR_NAMESPACE_URI_1)) { 528 throw new CoordinatorJobException(ErrorCode.E1319, "bundle app can not submit coordinator namespace " 529 + SchemaService.COORDINATOR_NAMESPACE_URI_1 + ", please use 0.2 or later"); 530 } 531 if (ns != null) { 532 return ns.getURI(); 533 } 534 else { 535 throw new CoordinatorJobException(ErrorCode.E0700, "the application xml namespace is not given"); 536 } 537 } 538 539 /** 540 * Merge default configuration with user-defined configuration. 541 * 542 * @throws CommandException thrown if failed to read or merge configurations 543 */ 544 protected void mergeDefaultConfig() throws CommandException { 545 Path configDefault = null; 546 try { 547 String coordAppPathStr = conf.get(OozieClient.COORDINATOR_APP_PATH); 548 Path coordAppPath = new Path(coordAppPathStr); 549 String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME); 550 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 551 Configuration fsConf = has.createJobConf(coordAppPath.toUri().getAuthority()); 552 FileSystem fs = has.createFileSystem(user, coordAppPath.toUri(), fsConf); 553 554 // app path could be a directory 555 if (!fs.isFile(coordAppPath)) { 556 configDefault = new Path(coordAppPath, CONFIG_DEFAULT); 557 } else { 558 configDefault = new Path(coordAppPath.getParent(), CONFIG_DEFAULT); 559 } 560 561 if (fs.exists(configDefault)) { 562 Configuration defaultConf = new XConfiguration(fs.open(configDefault)); 563 PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES); 564 XConfiguration.injectDefaults(defaultConf, conf); 565 } 566 else { 567 LOG.info("configDefault Doesn't exist " + configDefault); 568 } 569 PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES); 570 571 // Resolving all variables in the job properties. 572 // This ensures the Hadoop Configuration semantics is preserved. 573 XConfiguration resolvedVarsConf = new XConfiguration(); 574 for (Map.Entry<String, String> entry : conf) { 575 resolvedVarsConf.set(entry.getKey(), conf.get(entry.getKey())); 576 } 577 conf = resolvedVarsConf; 578 } 579 catch (IOException e) { 580 throw new CommandException(ErrorCode.E0702, e.getMessage() + " : Problem reading default config " 581 + configDefault, e); 582 } 583 catch (HadoopAccessorException e) { 584 throw new CommandException(e); 585 } 586 LOG.debug("Merged CONF :" + XmlUtils.prettyPrint(conf).toString()); 587 } 588 589 /** 590 * The method resolve all the variables that are defined in configuration. It also include the data set definition 591 * from dataset file into XML. 592 * 593 * @param appXml : Original job XML 594 * @param conf : Configuration of the job 595 * @param coordJob : Coordinator job bean to be populated. 596 * @return Resolved and modified job XML element. 597 * @throws CoordinatorJobException thrown if failed to resolve basic entities or include referred datasets 598 * @throws Exception thrown if failed to resolve basic entities or include referred datasets 599 */ 600 public Element basicResolveAndIncludeDS(String appXml, Configuration conf, CoordinatorJobBean coordJob) 601 throws CoordinatorJobException, Exception { 602 Element basicResolvedApp = resolveInitial(conf, appXml, coordJob); 603 includeDataSets(basicResolvedApp, conf); 604 return basicResolvedApp; 605 } 606 607 /** 608 * Insert data set into data-in and data-out tags. 609 * 610 * @param eAppXml : coordinator application XML 611 * @param eDatasets : DataSet XML 612 */ 613 @SuppressWarnings("unchecked") 614 private void insertDataSet(Element eAppXml, Element eDatasets) { 615 // Adding DS definition in the coordinator XML 616 Element inputList = eAppXml.getChild("input-events", eAppXml.getNamespace()); 617 if (inputList != null) { 618 for (Element dataIn : (List<Element>) inputList.getChildren("data-in", eAppXml.getNamespace())) { 619 Element eDataset = findDataSet(eDatasets, dataIn.getAttributeValue("dataset")); 620 dataIn.getContent().add(0, eDataset); 621 } 622 } 623 Element outputList = eAppXml.getChild("output-events", eAppXml.getNamespace()); 624 if (outputList != null) { 625 for (Element dataOut : (List<Element>) outputList.getChildren("data-out", eAppXml.getNamespace())) { 626 Element eDataset = findDataSet(eDatasets, dataOut.getAttributeValue("dataset")); 627 dataOut.getContent().add(0, eDataset); 628 } 629 } 630 } 631 632 /** 633 * Find a specific dataset from a list of Datasets. 634 * 635 * @param eDatasets : List of data sets 636 * @param name : queried data set name 637 * @return one Dataset element. otherwise throw Exception 638 */ 639 @SuppressWarnings("unchecked") 640 private static Element findDataSet(Element eDatasets, String name) { 641 for (Element eDataset : (List<Element>) eDatasets.getChildren("dataset", eDatasets.getNamespace())) { 642 if (eDataset.getAttributeValue("name").equals(name)) { 643 eDataset = (Element) eDataset.clone(); 644 eDataset.detach(); 645 return eDataset; 646 } 647 } 648 throw new RuntimeException("undefined dataset: " + name); 649 } 650 651 /** 652 * Initialize all the required EL Evaluators. 653 */ 654 protected void initEvaluators() { 655 evalFreq = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-submit-freq"); 656 evalNofuncs = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-submit-nofuncs"); 657 evalInst = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-submit-instances"); 658 evalAction = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-action-start"); 659 evalTimeout = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-wait-timeout"); 660 } 661 662 /** 663 * Resolve basic entities using job Configuration. 664 * 665 * @param conf :Job configuration 666 * @param appXml : Original job XML 667 * @param coordJob : Coordinator job bean to be populated. 668 * @return Resolved job XML element. 669 * @throws CoordinatorJobException thrown if failed to resolve basic entities 670 * @throws Exception thrown if failed to resolve basic entities 671 */ 672 @SuppressWarnings("unchecked") 673 protected Element resolveInitial(Configuration conf, String appXml, CoordinatorJobBean coordJob) 674 throws CoordinatorJobException, Exception { 675 Element eAppXml = XmlUtils.parseXml(appXml); 676 // job's main attributes 677 // frequency 678 String val = resolveAttribute("frequency", eAppXml, evalFreq); 679 int ival = 0; 680 681 val = ParamChecker.checkFrequency(val); 682 coordJob.setFrequency(val); 683 TimeUnit tmp = (evalFreq.getVariable("timeunit") == null) ? TimeUnit.MINUTE : ((TimeUnit) evalFreq 684 .getVariable("timeunit")); 685 try { 686 Integer.parseInt(val); 687 } 688 catch (NumberFormatException ex) { 689 tmp=TimeUnit.CRON; 690 } 691 692 addAnAttribute("freq_timeunit", eAppXml, tmp.toString()); 693 // TimeUnit 694 coordJob.setTimeUnit(CoordinatorJob.Timeunit.valueOf(tmp.toString())); 695 // End Of Duration 696 tmp = evalFreq.getVariable("endOfDuration") == null ? TimeUnit.NONE : ((TimeUnit) evalFreq 697 .getVariable("endOfDuration")); 698 addAnAttribute("end_of_duration", eAppXml, tmp.toString()); 699 // coordJob.setEndOfDuration(tmp) // TODO: Add new attribute in Job bean 700 701 // Application name 702 if (this.coordName == null) { 703 String name = ELUtils.resolveAppName(eAppXml.getAttribute("name").getValue(), conf); 704 coordJob.setAppName(name); 705 } 706 else { 707 // this coord job is created from bundle 708 coordJob.setAppName(this.coordName); 709 } 710 711 // start time 712 val = resolveAttribute("start", eAppXml, evalNofuncs); 713 ParamChecker.checkDateOozieTZ(val, "start"); 714 coordJob.setStartTime(DateUtils.parseDateOozieTZ(val)); 715 // end time 716 val = resolveAttribute("end", eAppXml, evalNofuncs); 717 ParamChecker.checkDateOozieTZ(val, "end"); 718 coordJob.setEndTime(DateUtils.parseDateOozieTZ(val)); 719 // Time zone 720 val = resolveAttribute("timezone", eAppXml, evalNofuncs); 721 ParamChecker.checkTimeZone(val, "timezone"); 722 coordJob.setTimeZone(val); 723 724 // controls 725 val = resolveTagContents("timeout", eAppXml.getChild("controls", eAppXml.getNamespace()), evalTimeout); 726 if (val != null && val != "") { 727 int t = Integer.parseInt(val); 728 tmp = (evalTimeout.getVariable("timeunit") == null) ? TimeUnit.MINUTE : ((TimeUnit) evalTimeout 729 .getVariable("timeunit")); 730 switch (tmp) { 731 case HOUR: 732 val = String.valueOf(t * 60); 733 break; 734 case DAY: 735 val = String.valueOf(t * 60 * 24); 736 break; 737 case MONTH: 738 val = String.valueOf(t * 60 * 24 * 30); 739 break; 740 default: 741 break; 742 } 743 } 744 else { 745 val = Services.get().getConf().get(CONF_DEFAULT_TIMEOUT_NORMAL); 746 } 747 748 ival = ParamChecker.checkInteger(val, "timeout"); 749 if (ival < 0 || ival > Services.get().getConf().getInt(CONF_DEFAULT_MAX_TIMEOUT, 129600)) { 750 ival = Services.get().getConf().getInt(CONF_DEFAULT_MAX_TIMEOUT, 129600); 751 } 752 coordJob.setTimeout(ival); 753 754 val = resolveTagContents("concurrency", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs); 755 if (val == null || val.isEmpty()) { 756 val = Services.get().getConf().get(CONF_DEFAULT_CONCURRENCY, "1"); 757 } 758 ival = ParamChecker.checkInteger(val, "concurrency"); 759 coordJob.setConcurrency(ival); 760 761 val = resolveTagContents("throttle", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs); 762 if (val == null || val.isEmpty()) { 763 int defaultThrottle = Services.get().getConf().getInt(CONF_DEFAULT_THROTTLE, 12); 764 ival = defaultThrottle; 765 } 766 else { 767 ival = ParamChecker.checkInteger(val, "throttle"); 768 } 769 int maxQueue = Services.get().getConf().getInt(CONF_QUEUE_SIZE, 10000); 770 float factor = Services.get().getConf().getFloat(CONF_MAT_THROTTLING_FACTOR, 0.10f); 771 int maxThrottle = (int) (maxQueue * factor); 772 if (ival > maxThrottle || ival < 1) { 773 ival = maxThrottle; 774 } 775 LOG.debug("max throttle " + ival); 776 coordJob.setMatThrottling(ival); 777 778 val = resolveTagContents("execution", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs); 779 if (val == "") { 780 val = Execution.FIFO.toString(); 781 } 782 coordJob.setExecutionOrder(Execution.valueOf(val)); 783 String[] acceptedVals = { Execution.LIFO.toString(), Execution.FIFO.toString(), Execution.LAST_ONLY.toString(), 784 Execution.NONE.toString()}; 785 ParamChecker.isMember(val, acceptedVals, "execution"); 786 787 // datasets 788 resolveTagContents("include", eAppXml.getChild("datasets", eAppXml.getNamespace()), evalNofuncs); 789 // for each data set 790 resolveDataSets(eAppXml); 791 HashMap<String, String> dataNameList = new HashMap<String, String>(); 792 resolveIODataset(eAppXml); 793 resolveIOEvents(eAppXml, dataNameList); 794 795 resolveTagContents("app-path", eAppXml.getChild("action", eAppXml.getNamespace()).getChild("workflow", 796 eAppXml.getNamespace()), evalNofuncs); 797 // TODO: If action or workflow tag is missing, NullPointerException will 798 // occur 799 Element configElem = eAppXml.getChild("action", eAppXml.getNamespace()).getChild("workflow", 800 eAppXml.getNamespace()).getChild("configuration", eAppXml.getNamespace()); 801 evalData = CoordELEvaluator.createELEvaluatorForDataEcho(conf, "coord-job-submit-data", dataNameList); 802 if (configElem != null) { 803 for (Element propElem : (List<Element>) configElem.getChildren("property", configElem.getNamespace())) { 804 resolveTagContents("name", propElem, evalData); 805 // Want to check the data-integrity but don't want to modify the 806 // XML 807 // for properties only 808 Element tmpProp = (Element) propElem.clone(); 809 resolveTagContents("value", tmpProp, evalData); 810 } 811 } 812 evalSla = CoordELEvaluator.createELEvaluatorForDataAndConf(conf, "coord-sla-submit", dataNameList); 813 resolveSLA(eAppXml, coordJob); 814 return eAppXml; 815 } 816 817 /** 818 * Resolve SLA events 819 * 820 * @param eAppXml job XML 821 * @param coordJob coordinator job bean 822 * @throws CommandException thrown if failed to resolve sla events 823 */ 824 private void resolveSLA(Element eAppXml, CoordinatorJobBean coordJob) throws CommandException { 825 Element eSla = XmlUtils.getSLAElement(eAppXml.getChild("action", eAppXml.getNamespace())); 826 827 if (eSla != null) { 828 String slaXml = XmlUtils.prettyPrint(eSla).toString(); 829 try { 830 // EL evaluation 831 slaXml = evalSla.evaluate(slaXml, String.class); 832 // Validate against semantic SXD 833 XmlUtils.validateData(slaXml, SchemaName.SLA_ORIGINAL); 834 } 835 catch (Exception e) { 836 throw new CommandException(ErrorCode.E1004, "Validation ERROR :" + e.getMessage(), e); 837 } 838 } 839 } 840 841 /** 842 * Resolve input-events/data-in and output-events/data-out tags. 843 * 844 * @param eJobOrg : Job element 845 * @throws CoordinatorJobException thrown if failed to resolve input and output events 846 */ 847 @SuppressWarnings("unchecked") 848 private void resolveIOEvents(Element eJobOrg, HashMap<String, String> dataNameList) throws CoordinatorJobException { 849 // Resolving input-events/data-in 850 // Clone the job and don't update anything in the original 851 Element eJob = (Element) eJobOrg.clone(); 852 Element inputList = eJob.getChild("input-events", eJob.getNamespace()); 853 if (inputList != null) { 854 TreeSet<String> eventNameSet = new TreeSet<String>(); 855 for (Element dataIn : (List<Element>) inputList.getChildren("data-in", eJob.getNamespace())) { 856 String dataInName = dataIn.getAttributeValue("name"); 857 dataNameList.put(dataInName, "data-in"); 858 // check whether there is any duplicate data-in name 859 if (eventNameSet.contains(dataInName)) { 860 throw new RuntimeException("Duplicate dataIn name " + dataInName); 861 } 862 else { 863 eventNameSet.add(dataInName); 864 } 865 resolveTagContents("instance", dataIn, evalInst); 866 resolveTagContents("start-instance", dataIn, evalInst); 867 resolveTagContents("end-instance", dataIn, evalInst); 868 869 } 870 } 871 // Resolving output-events/data-out 872 Element outputList = eJob.getChild("output-events", eJob.getNamespace()); 873 if (outputList != null) { 874 TreeSet<String> eventNameSet = new TreeSet<String>(); 875 for (Element dataOut : (List<Element>) outputList.getChildren("data-out", eJob.getNamespace())) { 876 String dataOutName = dataOut.getAttributeValue("name"); 877 dataNameList.put(dataOutName, "data-out"); 878 // check whether there is any duplicate data-out name 879 if (eventNameSet.contains(dataOutName)) { 880 throw new RuntimeException("Duplicate dataIn name " + dataOutName); 881 } 882 else { 883 eventNameSet.add(dataOutName); 884 } 885 resolveTagContents("instance", dataOut, evalInst); 886 887 } 888 } 889 890 } 891 892 /** 893 * Resolve input-events/dataset and output-events/dataset tags. 894 * 895 * @param eJob : Job element 896 * @throws CoordinatorJobException thrown if failed to resolve input and output events 897 */ 898 @SuppressWarnings("unchecked") 899 private void resolveIODataset(Element eAppXml) throws CoordinatorJobException { 900 // Resolving input-events/data-in 901 Element inputList = eAppXml.getChild("input-events", eAppXml.getNamespace()); 902 if (inputList != null) { 903 for (Element dataIn : (List<Element>) inputList.getChildren("data-in", eAppXml.getNamespace())) { 904 resolveAttribute("dataset", dataIn, evalInst); 905 906 } 907 } 908 // Resolving output-events/data-out 909 Element outputList = eAppXml.getChild("output-events", eAppXml.getNamespace()); 910 if (outputList != null) { 911 for (Element dataOut : (List<Element>) outputList.getChildren("data-out", eAppXml.getNamespace())) { 912 resolveAttribute("dataset", dataOut, evalInst); 913 914 } 915 } 916 917 } 918 919 920 /** 921 * Add an attribute into XML element. 922 * 923 * @param attrName :attribute name 924 * @param elem : Element to add attribute 925 * @param value :Value of attribute 926 */ 927 private void addAnAttribute(String attrName, Element elem, String value) { 928 elem.setAttribute(attrName, value); 929 } 930 931 /** 932 * Resolve datasets using job configuration. 933 * 934 * @param eAppXml : Job Element XML 935 * @throws Exception thrown if failed to resolve datasets 936 */ 937 @SuppressWarnings("unchecked") 938 private void resolveDataSets(Element eAppXml) throws Exception { 939 Element datasetList = eAppXml.getChild("datasets", eAppXml.getNamespace()); 940 if (datasetList != null) { 941 942 List<Element> dsElems = datasetList.getChildren("dataset", eAppXml.getNamespace()); 943 resolveDataSets(dsElems); 944 resolveTagContents("app-path", eAppXml.getChild("action", eAppXml.getNamespace()).getChild("workflow", 945 eAppXml.getNamespace()), evalNofuncs); 946 } 947 } 948 949 /** 950 * Resolve datasets using job configuration. 951 * 952 * @param dsElems : Data set XML element. 953 * @throws CoordinatorJobException thrown if failed to resolve datasets 954 */ 955 private void resolveDataSets(List<Element> dsElems) throws CoordinatorJobException { 956 for (Element dsElem : dsElems) { 957 // Setting up default TimeUnit and EndOFDuraion 958 evalFreq.setVariable("timeunit", TimeUnit.MINUTE); 959 evalFreq.setVariable("endOfDuration", TimeUnit.NONE); 960 961 String val = resolveAttribute("frequency", dsElem, evalFreq); 962 int ival = ParamChecker.checkInteger(val, "frequency"); 963 ParamChecker.checkGTZero(ival, "frequency"); 964 addAnAttribute("freq_timeunit", dsElem, evalFreq.getVariable("timeunit") == null ? TimeUnit.MINUTE 965 .toString() : ((TimeUnit) evalFreq.getVariable("timeunit")).toString()); 966 addAnAttribute("end_of_duration", dsElem, evalFreq.getVariable("endOfDuration") == null ? TimeUnit.NONE 967 .toString() : ((TimeUnit) evalFreq.getVariable("endOfDuration")).toString()); 968 val = resolveAttribute("initial-instance", dsElem, evalNofuncs); 969 ParamChecker.checkDateOozieTZ(val, "initial-instance"); 970 checkInitialInstance(val); 971 val = resolveAttribute("timezone", dsElem, evalNofuncs); 972 ParamChecker.checkTimeZone(val, "timezone"); 973 resolveTagContents("uri-template", dsElem, evalNofuncs); 974 resolveTagContents("done-flag", dsElem, evalNofuncs); 975 } 976 } 977 978 /** 979 * Resolve the content of a tag. 980 * 981 * @param tagName : Tag name of job XML i.e. <timeout> 10 </timeout> 982 * @param elem : Element where the tag exists. 983 * @param eval : EL evealuator 984 * @return Resolved tag content. 985 * @throws CoordinatorJobException thrown if failed to resolve tag content 986 */ 987 @SuppressWarnings("unchecked") 988 private String resolveTagContents(String tagName, Element elem, ELEvaluator eval) throws CoordinatorJobException { 989 String ret = ""; 990 if (elem != null) { 991 for (Element tagElem : (List<Element>) elem.getChildren(tagName, elem.getNamespace())) { 992 if (tagElem != null) { 993 String updated; 994 try { 995 updated = CoordELFunctions.evalAndWrap(eval, tagElem.getText().trim()); 996 997 } 998 catch (Exception e) { 999 throw new CoordinatorJobException(ErrorCode.E1004, e.getMessage(), e); 1000 } 1001 tagElem.removeContent(); 1002 tagElem.addContent(updated); 1003 ret += updated; 1004 } 1005 } 1006 } 1007 return ret; 1008 } 1009 1010 /** 1011 * Resolve an attribute value. 1012 * 1013 * @param attrName : Attribute name. 1014 * @param elem : XML Element where attribute is defiend 1015 * @param eval : ELEvaluator used to resolve 1016 * @return Resolved attribute value 1017 * @throws CoordinatorJobException thrown if failed to resolve an attribute value 1018 */ 1019 private String resolveAttribute(String attrName, Element elem, ELEvaluator eval) throws CoordinatorJobException { 1020 Attribute attr = elem.getAttribute(attrName); 1021 String val = null; 1022 if (attr != null) { 1023 try { 1024 val = CoordELFunctions.evalAndWrap(eval, attr.getValue().trim()); 1025 } 1026 catch (Exception e) { 1027 throw new CoordinatorJobException(ErrorCode.E1004, e.getMessage(), e); 1028 } 1029 attr.setValue(val); 1030 } 1031 return val; 1032 } 1033 1034 /** 1035 * Include referred datasets into XML. 1036 * 1037 * @param resolvedXml : Job XML element. 1038 * @param conf : Job configuration 1039 * @throws CoordinatorJobException thrown if failed to include referred datasets into XML 1040 */ 1041 @SuppressWarnings("unchecked") 1042 protected void includeDataSets(Element resolvedXml, Configuration conf) throws CoordinatorJobException { 1043 Element datasets = resolvedXml.getChild("datasets", resolvedXml.getNamespace()); 1044 Element allDataSets = new Element("all_datasets", resolvedXml.getNamespace()); 1045 List<String> dsList = new ArrayList<String>(); 1046 if (datasets != null) { 1047 for (Element includeElem : (List<Element>) datasets.getChildren("include", datasets.getNamespace())) { 1048 String incDSFile = includeElem.getTextTrim(); 1049 includeOneDSFile(incDSFile, dsList, allDataSets, datasets.getNamespace()); 1050 } 1051 for (Element e : (List<Element>) datasets.getChildren("dataset", datasets.getNamespace())) { 1052 String dsName = e.getAttributeValue("name"); 1053 if (dsList.contains(dsName)) {// Override with this DS 1054 // Remove duplicate 1055 removeDataSet(allDataSets, dsName); 1056 } 1057 else { 1058 dsList.add(dsName); 1059 } 1060 allDataSets.addContent((Element) e.clone()); 1061 } 1062 } 1063 insertDataSet(resolvedXml, allDataSets); 1064 resolvedXml.removeChild("datasets", resolvedXml.getNamespace()); 1065 } 1066 1067 /** 1068 * Include one dataset file. 1069 * 1070 * @param incDSFile : Include data set filename. 1071 * @param dsList :List of dataset names to verify the duplicate. 1072 * @param allDataSets : Element that includes all dataset definitions. 1073 * @param dsNameSpace : Data set name space 1074 * @throws CoordinatorJobException thrown if failed to include one dataset file 1075 */ 1076 @SuppressWarnings("unchecked") 1077 private void includeOneDSFile(String incDSFile, List<String> dsList, Element allDataSets, Namespace dsNameSpace) 1078 throws CoordinatorJobException { 1079 Element tmpDataSets = null; 1080 try { 1081 String dsXml = readDefinition(incDSFile); 1082 LOG.debug("DSFILE :" + incDSFile + "\n" + dsXml); 1083 tmpDataSets = XmlUtils.parseXml(dsXml); 1084 } 1085 catch (JDOMException e) { 1086 LOG.warn("Error parsing included dataset [{0}]. Message [{1}]", incDSFile, e.getMessage()); 1087 throw new CoordinatorJobException(ErrorCode.E0700, e.getMessage()); 1088 } 1089 resolveDataSets(tmpDataSets.getChildren("dataset")); 1090 for (Element e : (List<Element>) tmpDataSets.getChildren("dataset")) { 1091 String dsName = e.getAttributeValue("name"); 1092 if (dsList.contains(dsName)) { 1093 throw new RuntimeException("Duplicate Dataset " + dsName); 1094 } 1095 dsList.add(dsName); 1096 Element tmp = (Element) e.clone(); 1097 // TODO: Don't like to over-write the external/include DS's namespace 1098 tmp.setNamespace(dsNameSpace); 1099 tmp.getChild("uri-template").setNamespace(dsNameSpace); 1100 if (e.getChild("done-flag") != null) { 1101 tmp.getChild("done-flag").setNamespace(dsNameSpace); 1102 } 1103 allDataSets.addContent(tmp); 1104 } 1105 // nested include 1106 for (Element includeElem : (List<Element>) tmpDataSets.getChildren("include", tmpDataSets.getNamespace())) { 1107 String incFile = includeElem.getTextTrim(); 1108 includeOneDSFile(incFile, dsList, allDataSets, dsNameSpace); 1109 } 1110 } 1111 1112 /** 1113 * Remove a dataset from a list of dataset. 1114 * 1115 * @param eDatasets : List of dataset 1116 * @param name : Dataset name to be removed. 1117 */ 1118 @SuppressWarnings("unchecked") 1119 private static void removeDataSet(Element eDatasets, String name) { 1120 for (Element eDataset : (List<Element>) eDatasets.getChildren("dataset", eDatasets.getNamespace())) { 1121 if (eDataset.getAttributeValue("name").equals(name)) { 1122 eDataset.detach(); 1123 return; 1124 } 1125 } 1126 throw new RuntimeException("undefined dataset: " + name); 1127 } 1128 1129 /** 1130 * Read coordinator definition. 1131 * 1132 * @param appPath application path. 1133 * @return coordinator definition. 1134 * @throws CoordinatorJobException thrown if the definition could not be read. 1135 */ 1136 protected String readDefinition(String appPath) throws CoordinatorJobException { 1137 String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME); 1138 // Configuration confHadoop = CoordUtils.getHadoopConf(conf); 1139 try { 1140 URI uri = new URI(appPath); 1141 LOG.debug("user =" + user); 1142 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 1143 Configuration fsConf = has.createJobConf(uri.getAuthority()); 1144 FileSystem fs = has.createFileSystem(user, uri, fsConf); 1145 Path appDefPath = null; 1146 1147 // app path could be a directory 1148 Path path = new Path(uri.getPath()); 1149 // check file exists for dataset include file, app xml already checked 1150 if (!fs.exists(path)) { 1151 throw new URISyntaxException(path.toString(), "path not existed : " + path.toString()); 1152 } 1153 if (!fs.isFile(path)) { 1154 appDefPath = new Path(path, COORDINATOR_XML_FILE); 1155 } else { 1156 appDefPath = path; 1157 } 1158 1159 Reader reader = new InputStreamReader(fs.open(appDefPath)); 1160 StringWriter writer = new StringWriter(); 1161 IOUtils.copyCharStream(reader, writer); 1162 return writer.toString(); 1163 } 1164 catch (IOException ex) { 1165 LOG.warn("IOException :" + XmlUtils.prettyPrint(conf), ex); 1166 throw new CoordinatorJobException(ErrorCode.E1001, ex.getMessage(), ex); 1167 } 1168 catch (URISyntaxException ex) { 1169 LOG.warn("URISyException :" + ex.getMessage()); 1170 throw new CoordinatorJobException(ErrorCode.E1002, appPath, ex.getMessage(), ex); 1171 } 1172 catch (HadoopAccessorException ex) { 1173 throw new CoordinatorJobException(ex); 1174 } 1175 catch (Exception ex) { 1176 LOG.warn("Exception :", ex); 1177 throw new CoordinatorJobException(ErrorCode.E1001, ex.getMessage(), ex); 1178 } 1179 } 1180 1181 /** 1182 * Write a coordinator job into database 1183 * 1184 *@param appXML : Coordinator definition xml 1185 * @param eJob : XML element of job 1186 * @param coordJob : Coordinator job bean 1187 * @return Job id 1188 * @throws CommandException thrown if unable to save coordinator job to db 1189 */ 1190 protected String storeToDB(String appXML, Element eJob, CoordinatorJobBean coordJob) throws CommandException { 1191 String jobId = Services.get().get(UUIDService.class).generateId(ApplicationType.COORDINATOR); 1192 coordJob.setId(jobId); 1193 1194 coordJob.setAppPath(conf.get(OozieClient.COORDINATOR_APP_PATH)); 1195 coordJob.setCreatedTime(new Date()); 1196 coordJob.setUser(conf.get(OozieClient.USER_NAME)); 1197 String group = ConfigUtils.getWithDeprecatedCheck(conf, OozieClient.JOB_ACL, OozieClient.GROUP_NAME, null); 1198 coordJob.setGroup(group); 1199 coordJob.setConf(XmlUtils.prettyPrint(conf).toString()); 1200 coordJob.setJobXml(XmlUtils.prettyPrint(eJob).toString()); 1201 coordJob.setLastActionNumber(0); 1202 coordJob.setLastModifiedTime(new Date()); 1203 1204 if (!dryrun) { 1205 coordJob.setLastModifiedTime(new Date()); 1206 try { 1207 CoordJobQueryExecutor.getInstance().insert(coordJob); 1208 } 1209 catch (JPAExecutorException jpaee) { 1210 coordJob.setId(null); 1211 coordJob.setStatus(CoordinatorJob.Status.FAILED); 1212 throw new CommandException(jpaee); 1213 } 1214 } 1215 return jobId; 1216 } 1217 1218 /* 1219 * this method checks if the initial-instance specified for a particular 1220 is not a date earlier than the oozie server default Jan 01, 1970 00:00Z UTC 1221 */ 1222 private void checkInitialInstance(String val) throws CoordinatorJobException, IllegalArgumentException { 1223 Date initialInstance, givenInstance; 1224 try { 1225 initialInstance = DateUtils.parseDateUTC("1970-01-01T00:00Z"); 1226 givenInstance = DateUtils.parseDateOozieTZ(val); 1227 } 1228 catch (Exception e) { 1229 throw new IllegalArgumentException("Unable to parse dataset initial-instance string '" + val + 1230 "' to Date object. ",e); 1231 } 1232 if(givenInstance.compareTo(initialInstance) < 0) { 1233 throw new CoordinatorJobException(ErrorCode.E1021, "Dataset initial-instance " + val + 1234 " is earlier than the default initial instance " + DateUtils.formatDateOozieTZ(initialInstance)); 1235 } 1236 } 1237 1238 /* (non-Javadoc) 1239 * @see org.apache.oozie.command.XCommand#getEntityKey() 1240 */ 1241 @Override 1242 public String getEntityKey() { 1243 return null; 1244 } 1245 1246 /* (non-Javadoc) 1247 * @see org.apache.oozie.command.XCommand#isLockRequired() 1248 */ 1249 @Override 1250 protected boolean isLockRequired() { 1251 return false; 1252 } 1253 1254 /* (non-Javadoc) 1255 * @see org.apache.oozie.command.XCommand#loadState() 1256 */ 1257 @Override 1258 protected void loadState() throws CommandException { 1259 jpaService = Services.get().get(JPAService.class); 1260 if (jpaService == null) { 1261 throw new CommandException(ErrorCode.E0610); 1262 } 1263 coordJob = new CoordinatorJobBean(); 1264 if (this.bundleId != null) { 1265 // this coord job is created from bundle 1266 coordJob.setBundleId(this.bundleId); 1267 // first use bundle id if submit thru bundle 1268 logInfo.setParameter(DagXLogInfoService.JOB, this.bundleId); 1269 LogUtils.setLogInfo(logInfo); 1270 } 1271 if (this.coordName != null) { 1272 // this coord job is created from bundle 1273 coordJob.setAppName(this.coordName); 1274 } 1275 setJob(coordJob); 1276 1277 } 1278 1279 /* (non-Javadoc) 1280 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 1281 */ 1282 @Override 1283 protected void verifyPrecondition() throws CommandException { 1284 1285 } 1286 1287 /* (non-Javadoc) 1288 * @see org.apache.oozie.command.TransitionXCommand#notifyParent() 1289 */ 1290 @Override 1291 public void notifyParent() throws CommandException { 1292 // update bundle action 1293 if (coordJob.getBundleId() != null) { 1294 LOG.debug("Updating bundle record: " + coordJob.getBundleId() + " for coord id: " + coordJob.getId()); 1295 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus); 1296 bundleStatusUpdate.call(); 1297 } 1298 } 1299 1300 /* (non-Javadoc) 1301 * @see org.apache.oozie.command.TransitionXCommand#updateJob() 1302 */ 1303 @Override 1304 public void updateJob() throws CommandException { 1305 } 1306 1307 /* (non-Javadoc) 1308 * @see org.apache.oozie.command.TransitionXCommand#getJob() 1309 */ 1310 @Override 1311 public Job getJob() { 1312 return coordJob; 1313 } 1314 1315 @Override 1316 public void performWrites() throws CommandException { 1317 } 1318}