001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.command.coord; 020 021import java.io.IOException; 022import java.io.InputStreamReader; 023import java.io.Reader; 024import java.io.StringReader; 025import java.io.StringWriter; 026import java.net.URI; 027import java.net.URISyntaxException; 028import java.util.ArrayList; 029import java.util.Calendar; 030import java.util.Date; 031import java.util.HashMap; 032import java.util.HashSet; 033import java.util.Iterator; 034import java.util.List; 035import java.util.Map; 036import java.util.Set; 037import java.util.TreeSet; 038 039import javax.xml.transform.stream.StreamSource; 040import javax.xml.validation.Validator; 041 042import org.apache.hadoop.conf.Configuration; 043import org.apache.hadoop.fs.FileSystem; 044import org.apache.hadoop.fs.Path; 045import org.apache.oozie.CoordinatorJobBean; 046import org.apache.oozie.ErrorCode; 047import org.apache.oozie.client.CoordinatorJob; 048import org.apache.oozie.client.Job; 049import org.apache.oozie.client.OozieClient; 050import org.apache.oozie.client.CoordinatorJob.Execution; 051import org.apache.oozie.command.CommandException; 052import org.apache.oozie.command.SubmitTransitionXCommand; 053import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand; 054import org.apache.oozie.coord.CoordELEvaluator; 055import org.apache.oozie.coord.CoordELFunctions; 056import org.apache.oozie.coord.CoordUtils; 057import org.apache.oozie.coord.CoordinatorJobException; 058import org.apache.oozie.coord.TimeUnit; 059import org.apache.oozie.coord.input.logic.CoordInputLogicEvaluator; 060import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; 061import org.apache.oozie.executor.jpa.JPAExecutorException; 062import org.apache.oozie.service.CoordMaterializeTriggerService; 063import org.apache.oozie.service.ConfigurationService; 064import org.apache.oozie.service.HadoopAccessorException; 065import org.apache.oozie.service.HadoopAccessorService; 066import org.apache.oozie.service.JPAService; 067import org.apache.oozie.service.SchemaService; 068import org.apache.oozie.service.Service; 069import org.apache.oozie.service.Services; 070import org.apache.oozie.service.UUIDService; 071import org.apache.oozie.service.SchemaService.SchemaName; 072import org.apache.oozie.service.UUIDService.ApplicationType; 073import org.apache.oozie.util.ConfigUtils; 074import org.apache.oozie.util.DateUtils; 075import org.apache.oozie.util.ELEvaluator; 076import org.apache.oozie.util.ELUtils; 077import org.apache.oozie.util.IOUtils; 078import org.apache.oozie.util.InstrumentUtils; 079import org.apache.oozie.util.LogUtils; 080import org.apache.oozie.util.ParamChecker; 081import org.apache.oozie.util.ParameterVerifier; 082import org.apache.oozie.util.ParameterVerifierException; 083import org.apache.oozie.util.PropertiesUtils; 084import org.apache.oozie.util.XConfiguration; 085import org.apache.oozie.util.XmlUtils; 086import org.jdom.Attribute; 087import org.jdom.Element; 088import org.jdom.JDOMException; 089import org.jdom.Namespace; 090import org.xml.sax.SAXException; 091 092/** 093 * This class provides the functionalities to resolve a coordinator job XML and write the job information into a DB 094 * table. 095 * <p> 096 * Specifically it performs the following functions: 1. Resolve all the variables or properties using job 097 * configurations. 2. Insert all datasets definition as part of the <data-in> and <data-out> tags. 3. Validate the XML 098 * at runtime. 099 */ 100public class CoordSubmitXCommand extends SubmitTransitionXCommand { 101 102 protected Configuration conf; 103 protected final String bundleId; 104 protected final String coordName; 105 protected boolean dryrun; 106 protected JPAService jpaService = null; 107 private CoordinatorJob.Status prevStatus = CoordinatorJob.Status.PREP; 108 109 public static final String CONFIG_DEFAULT = "coord-config-default.xml"; 110 public static final String COORDINATOR_XML_FILE = "coordinator.xml"; 111 public final String COORD_INPUT_EVENTS ="input-events"; 112 public final String COORD_OUTPUT_EVENTS = "output-events"; 113 public final String COORD_INPUT_EVENTS_DATA_IN ="data-in"; 114 public final String COORD_OUTPUT_EVENTS_DATA_OUT = "data-out"; 115 116 private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>(); 117 private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>(); 118 119 protected CoordinatorJobBean coordJob = null; 120 /** 121 * Default timeout for normal jobs, in minutes, after which coordinator input check will timeout 122 */ 123 public static final String CONF_DEFAULT_TIMEOUT_NORMAL = Service.CONF_PREFIX + "coord.normal.default.timeout"; 124 125 public static final String CONF_DEFAULT_CONCURRENCY = Service.CONF_PREFIX + "coord.default.concurrency"; 126 127 public static final String CONF_DEFAULT_THROTTLE = Service.CONF_PREFIX + "coord.default.throttle"; 128 129 public static final String CONF_MAT_THROTTLING_FACTOR = Service.CONF_PREFIX 130 + "coord.materialization.throttling.factor"; 131 132 /** 133 * Default MAX timeout in minutes, after which coordinator input check will timeout 134 */ 135 public static final String CONF_DEFAULT_MAX_TIMEOUT = Service.CONF_PREFIX + "coord.default.max.timeout"; 136 137 public static final String CONF_QUEUE_SIZE = Service.CONF_PREFIX + "CallableQueueService.queue.size"; 138 139 public static final String CONF_CHECK_MAX_FREQUENCY = Service.CONF_PREFIX + "coord.check.maximum.frequency"; 140 141 private ELEvaluator evalFreq = null; 142 private ELEvaluator evalNofuncs = null; 143 private ELEvaluator evalData = null; 144 private ELEvaluator evalInst = null; 145 private ELEvaluator evalAction = null; 146 private ELEvaluator evalSla = null; 147 private ELEvaluator evalTimeout = null; 148 private ELEvaluator evalInitialInstance = null; 149 150 static { 151 String[] badUserProps = { PropertiesUtils.YEAR, PropertiesUtils.MONTH, PropertiesUtils.DAY, 152 PropertiesUtils.HOUR, PropertiesUtils.MINUTE, PropertiesUtils.DAYS, PropertiesUtils.HOURS, 153 PropertiesUtils.MINUTES, PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB, 154 PropertiesUtils.TB, PropertiesUtils.PB, PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN, 155 PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN, PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS }; 156 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES); 157 158 String[] badDefaultProps = { PropertiesUtils.HADOOP_USER}; 159 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES); 160 PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES); 161 } 162 163 /** 164 * Constructor to create the Coordinator Submit Command. 165 * 166 * @param conf : Configuration for Coordinator job 167 */ 168 public CoordSubmitXCommand(Configuration conf) { 169 super("coord_submit", "coord_submit", 1); 170 this.conf = ParamChecker.notNull(conf, "conf"); 171 this.bundleId = null; 172 this.coordName = null; 173 } 174 175 /** 176 * Constructor to create the Coordinator Submit Command by bundle job. 177 * 178 * @param conf : Configuration for Coordinator job 179 * @param bundleId : bundle id 180 * @param coordName : coord name 181 */ 182 protected CoordSubmitXCommand(Configuration conf, String bundleId, String coordName) { 183 super("coord_submit", "coord_submit", 1); 184 this.conf = ParamChecker.notNull(conf, "conf"); 185 this.bundleId = ParamChecker.notEmpty(bundleId, "bundleId"); 186 this.coordName = ParamChecker.notEmpty(coordName, "coordName"); 187 } 188 189 /** 190 * Constructor to create the Coordinator Submit Command. 191 * 192 * @param dryrun : if dryrun 193 * @param conf : Configuration for Coordinator job 194 */ 195 public CoordSubmitXCommand(boolean dryrun, Configuration conf) { 196 this(conf); 197 this.dryrun = dryrun; 198 } 199 200 /* (non-Javadoc) 201 * @see org.apache.oozie.command.XCommand#execute() 202 */ 203 @Override 204 protected String submit() throws CommandException { 205 LOG.info("STARTED Coordinator Submit"); 206 String jobId = submitJob(); 207 LOG.info("ENDED Coordinator Submit jobId=" + jobId); 208 return jobId; 209 } 210 211 protected String submitJob() throws CommandException { 212 String jobId = null; 213 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation()); 214 215 boolean exceptionOccured = false; 216 try { 217 mergeDefaultConfig(); 218 219 String appXml = readAndValidateXml(); 220 coordJob.setOrigJobXml(appXml); 221 LOG.debug("jobXml after initial validation " + XmlUtils.prettyPrint(appXml).toString()); 222 223 Element eXml = XmlUtils.parseXml(appXml); 224 225 String appNamespace = readAppNamespace(eXml); 226 coordJob.setAppNamespace(appNamespace); 227 228 ParameterVerifier.verifyParameters(conf, eXml); 229 230 appXml = XmlUtils.removeComments(appXml); 231 initEvaluators(); 232 Element eJob = basicResolveAndIncludeDS(appXml, conf, coordJob); 233 234 validateCoordinatorJob(); 235 236 // checking if the coordinator application data input/output events 237 // specify multiple data instance values in erroneous manner 238 checkMultipleTimeInstances(eJob, COORD_INPUT_EVENTS, COORD_INPUT_EVENTS_DATA_IN); 239 checkMultipleTimeInstances(eJob, COORD_OUTPUT_EVENTS, COORD_OUTPUT_EVENTS_DATA_OUT); 240 241 LOG.debug("jobXml after all validation " + XmlUtils.prettyPrint(eJob).toString()); 242 243 jobId = storeToDB(appXml, eJob, coordJob); 244 // log job info for coordinator job 245 LogUtils.setLogInfo(coordJob); 246 247 if (!dryrun) { 248 queueMaterializeTransitionXCommand(jobId); 249 } 250 else { 251 return getDryRun(coordJob); 252 } 253 } 254 catch (JDOMException jex) { 255 exceptionOccured = true; 256 LOG.warn("ERROR: ", jex); 257 throw new CommandException(ErrorCode.E0700, jex.getMessage(), jex); 258 } 259 catch (CoordinatorJobException cex) { 260 exceptionOccured = true; 261 LOG.warn("ERROR: ", cex); 262 throw new CommandException(cex); 263 } 264 catch (ParameterVerifierException pex) { 265 exceptionOccured = true; 266 LOG.warn("ERROR: ", pex); 267 throw new CommandException(pex); 268 } 269 catch (IllegalArgumentException iex) { 270 exceptionOccured = true; 271 LOG.warn("ERROR: ", iex); 272 throw new CommandException(ErrorCode.E1003, iex.getMessage(), iex); 273 } 274 catch (Exception ex) { 275 exceptionOccured = true; 276 LOG.warn("ERROR: ", ex); 277 throw new CommandException(ErrorCode.E0803, ex.getMessage(), ex); 278 } 279 finally { 280 if (exceptionOccured) { 281 if (coordJob.getId() == null || coordJob.getId().equalsIgnoreCase("")) { 282 coordJob.setStatus(CoordinatorJob.Status.FAILED); 283 coordJob.resetPending(); 284 } 285 } 286 } 287 return jobId; 288 } 289 290 /** 291 * Gets the dryrun output. 292 * 293 * @param coordJob the coordinatorJobBean 294 * @return the dry run 295 * @throws Exception the exception 296 */ 297 protected String getDryRun(CoordinatorJobBean coordJob) throws Exception{ 298 int materializationWindow = ConfigurationService 299 .getInt(CoordMaterializeTriggerService.CONF_MATERIALIZATION_WINDOW); 300 Date startTime = coordJob.getStartTime(); 301 long startTimeMilli = startTime.getTime(); 302 long endTimeMilli = startTimeMilli + (materializationWindow * 1000); 303 Date jobEndTime = coordJob.getEndTime(); 304 Date endTime = new Date(endTimeMilli); 305 if (endTime.compareTo(jobEndTime) > 0) { 306 endTime = jobEndTime; 307 } 308 String jobId = coordJob.getId(); 309 LOG.info("[" + jobId + "]: Update status to RUNNING"); 310 coordJob.setStatus(Job.Status.RUNNING); 311 coordJob.setPending(); 312 Configuration jobConf = null; 313 try { 314 jobConf = new XConfiguration(new StringReader(coordJob.getConf())); 315 } 316 catch (IOException e1) { 317 LOG.warn("Configuration parse error. read from DB :" + coordJob.getConf(), e1); 318 } 319 String action = new CoordMaterializeTransitionXCommand(coordJob, materializationWindow, startTime, 320 endTime).materializeActions(true); 321 String output = coordJob.getJobXml() + System.getProperty("line.separator") 322 + "***actions for instance***" + action; 323 return output; 324 } 325 326 /** 327 * Queue MaterializeTransitionXCommand 328 */ 329 protected void queueMaterializeTransitionXCommand(String jobId) { 330 int materializationWindow = ConfigurationService 331 .getInt(CoordMaterializeTriggerService.CONF_MATERIALIZATION_WINDOW); 332 queue(new CoordMaterializeTransitionXCommand(jobId, materializationWindow), 100); 333 } 334 335 /** 336 * Method that validates values in the definition for correctness. Placeholder to add more. 337 */ 338 private void validateCoordinatorJob() throws Exception { 339 // check if startTime < endTime 340 if (!coordJob.getStartTime().before(coordJob.getEndTime())) { 341 throw new IllegalArgumentException("Coordinator Start Time must be earlier than End Time."); 342 } 343 344 try { 345 // Check if a coord job with cron frequency will materialize actions 346 int freq = Integer.parseInt(coordJob.getFrequency()); 347 348 // Check if the frequency is faster than 5 min if enabled 349 if (ConfigurationService.getBoolean(CONF_CHECK_MAX_FREQUENCY)) { 350 CoordinatorJob.Timeunit unit = coordJob.getTimeUnit(); 351 if (freq == 0 || (freq < 5 && unit == CoordinatorJob.Timeunit.MINUTE)) { 352 throw new IllegalArgumentException("Coordinator job with frequency [" + freq + 353 "] minutes is faster than allowed maximum of 5 minutes (" 354 + CONF_CHECK_MAX_FREQUENCY + " is set to true)"); 355 } 356 } 357 } catch (NumberFormatException e) { 358 Date start = coordJob.getStartTime(); 359 Calendar cal = Calendar.getInstance(); 360 cal.setTime(start); 361 cal.add(Calendar.MINUTE, -1); 362 start = cal.getTime(); 363 364 Date nextTime = CoordCommandUtils.getNextValidActionTimeForCronFrequency(start, coordJob); 365 if (nextTime == null) { 366 throw new IllegalArgumentException("Invalid coordinator cron frequency: " + coordJob.getFrequency()); 367 } 368 if (!nextTime.before(coordJob.getEndTime())) { 369 throw new IllegalArgumentException("Coordinator job with frequency '" + 370 coordJob.getFrequency() + "' materializes no actions between start and end time."); 371 } 372 } 373 } 374 375 /* 376 * Check against multiple data instance values inside a single <instance> <start-instance> or <end-instance> tag 377 * If found, the job is not submitted and user is informed to correct the error, instead of defaulting to the first instance value in the list 378 */ 379 private void checkMultipleTimeInstances(Element eCoordJob, String eventType, String dataType) throws CoordinatorJobException { 380 Element eventsSpec, dataSpec, instance; 381 List<Element> instanceSpecList; 382 Namespace ns = eCoordJob.getNamespace(); 383 String instanceValue; 384 eventsSpec = eCoordJob.getChild(eventType, ns); 385 if (eventsSpec != null) { 386 dataSpec = eventsSpec.getChild(dataType, ns); 387 if (dataSpec != null) { 388 // In case of input-events, there can be multiple child <instance> datasets. Iterating to ensure none of them have errors 389 instanceSpecList = dataSpec.getChildren("instance", ns); 390 Iterator instanceIter = instanceSpecList.iterator(); 391 while(instanceIter.hasNext()) { 392 instance = ((Element) instanceIter.next()); 393 if(instance.getContentSize() == 0) { //empty string or whitespace 394 throw new CoordinatorJobException(ErrorCode.E1021, "<instance> tag within " + eventType + " is empty!"); 395 } 396 instanceValue = instance.getContent(0).toString(); 397 boolean isInvalid = false; 398 try { 399 isInvalid = evalAction.checkForExistence(instanceValue, ","); 400 } catch (Exception e) { 401 handleELParseException(eventType, dataType, instanceValue); 402 } 403 if (isInvalid) { // reaching this block implies instance is not empty i.e. length > 0 404 handleExpresionWithMultipleInstances(eventType, dataType, instanceValue); 405 } 406 } 407 408 // In case of input-events, there can be multiple child <start-instance> datasets. Iterating to ensure none of them have errors 409 instanceSpecList = dataSpec.getChildren("start-instance", ns); 410 instanceIter = instanceSpecList.iterator(); 411 while(instanceIter.hasNext()) { 412 instance = ((Element) instanceIter.next()); 413 if(instance.getContentSize() == 0) { //empty string or whitespace 414 throw new CoordinatorJobException(ErrorCode.E1021, "<start-instance> tag within " + eventType + " is empty!"); 415 } 416 instanceValue = instance.getContent(0).toString(); 417 boolean isInvalid = false; 418 try { 419 isInvalid = evalAction.checkForExistence(instanceValue, ","); 420 } catch (Exception e) { 421 handleELParseException(eventType, dataType, instanceValue); 422 } 423 if (isInvalid) { // reaching this block implies start instance is not empty i.e. length > 0 424 handleExpresionWithStartMultipleInstances(eventType, dataType, instanceValue); 425 } 426 } 427 428 // In case of input-events, there can be multiple child <end-instance> datasets. Iterating to ensure none of them have errors 429 instanceSpecList = dataSpec.getChildren("end-instance", ns); 430 instanceIter = instanceSpecList.iterator(); 431 while(instanceIter.hasNext()) { 432 instance = ((Element) instanceIter.next()); 433 if(instance.getContentSize() == 0) { //empty string or whitespace 434 throw new CoordinatorJobException(ErrorCode.E1021, "<end-instance> tag within " + eventType + " is empty!"); 435 } 436 instanceValue = instance.getContent(0).toString(); 437 boolean isInvalid = false; 438 try { 439 isInvalid = evalAction.checkForExistence(instanceValue, ","); 440 } catch (Exception e) { 441 handleELParseException(eventType, dataType, instanceValue); 442 } 443 if (isInvalid) { // reaching this block implies instance is not empty i.e. length > 0 444 handleExpresionWithMultipleEndInstances(eventType, dataType, instanceValue); 445 } 446 } 447 448 } 449 } 450 } 451 452 private void handleELParseException(String eventType, String dataType, String instanceValue) 453 throws CoordinatorJobException { 454 String correctAction = null; 455 if(dataType.equals(COORD_INPUT_EVENTS_DATA_IN)) { 456 correctAction = "Coordinator app definition should have valid <instance> tag for data-in"; 457 } else if(dataType.equals(COORD_OUTPUT_EVENTS_DATA_OUT)) { 458 correctAction = "Coordinator app definition should have valid <instance> tag for data-out"; 459 } 460 throw new CoordinatorJobException(ErrorCode.E1021, eventType + " instance '" + instanceValue 461 + "' is not valid. Coordinator job NOT SUBMITTED. " + correctAction); 462 } 463 464 private void handleExpresionWithMultipleInstances(String eventType, String dataType, String instanceValue) 465 throws CoordinatorJobException { 466 String correctAction = null; 467 if(dataType.equals(COORD_INPUT_EVENTS_DATA_IN)) { 468 correctAction = "Coordinator app definition should have separate <instance> tag per data-in instance"; 469 } else if(dataType.equals(COORD_OUTPUT_EVENTS_DATA_OUT)) { 470 correctAction = "Coordinator app definition can have only one <instance> tag per data-out instance"; 471 } 472 throw new CoordinatorJobException(ErrorCode.E1021, eventType + " instance '" + instanceValue 473 + "' contains more than one date instance. Coordinator job NOT SUBMITTED. " + correctAction); 474 } 475 476 private void handleExpresionWithStartMultipleInstances(String eventType, String dataType, String instanceValue) 477 throws CoordinatorJobException { 478 String correctAction = "Coordinator app definition should not have multiple start-instances"; 479 throw new CoordinatorJobException(ErrorCode.E1021, eventType + " start-instance '" + instanceValue 480 + "' contains more than one date start-instance. Coordinator job NOT SUBMITTED. " + correctAction); 481 } 482 483 private void handleExpresionWithMultipleEndInstances(String eventType, String dataType, String instanceValue) 484 throws CoordinatorJobException { 485 String correctAction = "Coordinator app definition should not have multiple end-instances"; 486 throw new CoordinatorJobException(ErrorCode.E1021, eventType + " end-instance '" + instanceValue 487 + "' contains more than one date end-instance. Coordinator job NOT SUBMITTED. " + correctAction); 488 } 489 /** 490 * Read the application XML and validate against coordinator Schema 491 * 492 * @return validated coordinator XML 493 * @throws CoordinatorJobException thrown if unable to read or validate coordinator xml 494 */ 495 protected String readAndValidateXml() throws CoordinatorJobException { 496 String appPath = ParamChecker.notEmpty(conf.get(OozieClient.COORDINATOR_APP_PATH), 497 OozieClient.COORDINATOR_APP_PATH); 498 String coordXml = readDefinition(appPath); 499 validateXml(coordXml); 500 return coordXml; 501 } 502 503 /** 504 * Validate against Coordinator XSD file 505 * 506 * @param xmlContent : Input coordinator xml 507 * @throws CoordinatorJobException thrown if unable to validate coordinator xml 508 */ 509 private void validateXml(String xmlContent) throws CoordinatorJobException { 510 try { 511 Validator validator = Services.get().get(SchemaService.class).getValidator(SchemaName.COORDINATOR); 512 validator.validate(new StreamSource(new StringReader(xmlContent))); 513 } 514 catch (SAXException ex) { 515 LOG.warn("SAXException :", ex); 516 throw new CoordinatorJobException(ErrorCode.E0701, ex.getMessage(), ex); 517 } 518 catch (IOException ex) { 519 LOG.warn("IOException :", ex); 520 throw new CoordinatorJobException(ErrorCode.E0702, ex.getMessage(), ex); 521 } 522 } 523 524 /** 525 * Read the application XML schema namespace 526 * 527 * @param coordXmlElement input coordinator xml Element 528 * @return app xml namespace 529 * @throws CoordinatorJobException 530 */ 531 private String readAppNamespace(Element coordXmlElement) throws CoordinatorJobException { 532 Namespace ns = coordXmlElement.getNamespace(); 533 if (ns != null && bundleId != null && ns.getURI().equals(SchemaService.COORDINATOR_NAMESPACE_URI_1)) { 534 throw new CoordinatorJobException(ErrorCode.E1319, "bundle app can not submit coordinator namespace " 535 + SchemaService.COORDINATOR_NAMESPACE_URI_1 + ", please use 0.2 or later"); 536 } 537 if (ns != null) { 538 return ns.getURI(); 539 } 540 else { 541 throw new CoordinatorJobException(ErrorCode.E0700, "the application xml namespace is not given"); 542 } 543 } 544 545 /** 546 * Merge default configuration with user-defined configuration. 547 * 548 * @throws CommandException thrown if failed to read or merge configurations 549 */ 550 protected void mergeDefaultConfig() throws CommandException { 551 Path configDefault = null; 552 try { 553 String coordAppPathStr = conf.get(OozieClient.COORDINATOR_APP_PATH); 554 Path coordAppPath = new Path(coordAppPathStr); 555 String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME); 556 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 557 Configuration fsConf = has.createJobConf(coordAppPath.toUri().getAuthority()); 558 FileSystem fs = has.createFileSystem(user, coordAppPath.toUri(), fsConf); 559 560 // app path could be a directory 561 if (!fs.isFile(coordAppPath)) { 562 configDefault = new Path(coordAppPath, CONFIG_DEFAULT); 563 } else { 564 configDefault = new Path(coordAppPath.getParent(), CONFIG_DEFAULT); 565 } 566 567 if (fs.exists(configDefault)) { 568 Configuration defaultConf = new XConfiguration(fs.open(configDefault)); 569 PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES); 570 XConfiguration.injectDefaults(defaultConf, conf); 571 } 572 else { 573 LOG.info("configDefault Doesn't exist " + configDefault); 574 } 575 PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES); 576 577 // Resolving all variables in the job properties. 578 // This ensures the Hadoop Configuration semantics is preserved. 579 XConfiguration resolvedVarsConf = new XConfiguration(); 580 for (Map.Entry<String, String> entry : conf) { 581 resolvedVarsConf.set(entry.getKey(), conf.get(entry.getKey())); 582 } 583 conf = resolvedVarsConf; 584 } 585 catch (IOException e) { 586 throw new CommandException(ErrorCode.E0702, e.getMessage() + " : Problem reading default config " 587 + configDefault, e); 588 } 589 catch (HadoopAccessorException e) { 590 throw new CommandException(e); 591 } 592 LOG.debug("Merged CONF :" + XmlUtils.prettyPrint(conf).toString()); 593 } 594 595 /** 596 * The method resolve all the variables that are defined in configuration. It also include the data set definition 597 * from dataset file into XML. 598 * 599 * @param appXml : Original job XML 600 * @param conf : Configuration of the job 601 * @param coordJob : Coordinator job bean to be populated. 602 * @return Resolved and modified job XML element. 603 * @throws CoordinatorJobException thrown if failed to resolve basic entities or include referred datasets 604 * @throws Exception thrown if failed to resolve basic entities or include referred datasets 605 */ 606 public Element basicResolveAndIncludeDS(String appXml, Configuration conf, CoordinatorJobBean coordJob) 607 throws CoordinatorJobException, Exception { 608 Element basicResolvedApp = resolveInitial(conf, appXml, coordJob); 609 includeDataSets(basicResolvedApp, conf); 610 return basicResolvedApp; 611 } 612 613 /** 614 * Insert data set into data-in and data-out tags. 615 * 616 * @param eAppXml : coordinator application XML 617 * @param eDatasets : DataSet XML 618 */ 619 @SuppressWarnings("unchecked") 620 private void insertDataSet(Element eAppXml, Element eDatasets) { 621 // Adding DS definition in the coordinator XML 622 Element inputList = eAppXml.getChild("input-events", eAppXml.getNamespace()); 623 if (inputList != null) { 624 for (Element dataIn : (List<Element>) inputList.getChildren("data-in", eAppXml.getNamespace())) { 625 Element eDataset = findDataSet(eDatasets, dataIn.getAttributeValue("dataset")); 626 dataIn.getContent().add(0, eDataset); 627 } 628 } 629 Element outputList = eAppXml.getChild("output-events", eAppXml.getNamespace()); 630 if (outputList != null) { 631 for (Element dataOut : (List<Element>) outputList.getChildren("data-out", eAppXml.getNamespace())) { 632 Element eDataset = findDataSet(eDatasets, dataOut.getAttributeValue("dataset")); 633 dataOut.getContent().add(0, eDataset); 634 } 635 } 636 } 637 638 /** 639 * Find a specific dataset from a list of Datasets. 640 * 641 * @param eDatasets : List of data sets 642 * @param name : queried data set name 643 * @return one Dataset element. otherwise throw Exception 644 */ 645 @SuppressWarnings("unchecked") 646 private static Element findDataSet(Element eDatasets, String name) { 647 for (Element eDataset : (List<Element>) eDatasets.getChildren("dataset", eDatasets.getNamespace())) { 648 if (eDataset.getAttributeValue("name").equals(name)) { 649 eDataset = (Element) eDataset.clone(); 650 eDataset.detach(); 651 return eDataset; 652 } 653 } 654 throw new RuntimeException("undefined dataset: " + name); 655 } 656 657 /** 658 * Initialize all the required EL Evaluators. 659 */ 660 protected void initEvaluators() { 661 evalFreq = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-submit-freq"); 662 evalNofuncs = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-submit-nofuncs"); 663 evalInst = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-submit-instances"); 664 evalAction = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-action-start"); 665 evalTimeout = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-wait-timeout"); 666 evalInitialInstance = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-submit-initial-instance"); 667 668 } 669 670 /** 671 * Resolve basic entities using job Configuration. 672 * 673 * @param conf :Job configuration 674 * @param appXml : Original job XML 675 * @param coordJob : Coordinator job bean to be populated. 676 * @return Resolved job XML element. 677 * @throws CoordinatorJobException thrown if failed to resolve basic entities 678 * @throws Exception thrown if failed to resolve basic entities 679 */ 680 @SuppressWarnings("unchecked") 681 protected Element resolveInitial(Configuration conf, String appXml, CoordinatorJobBean coordJob) 682 throws CoordinatorJobException, Exception { 683 Element eAppXml = XmlUtils.parseXml(appXml); 684 // job's main attributes 685 // frequency 686 String val = resolveAttribute("frequency", eAppXml, evalFreq); 687 int ival = 0; 688 689 val = ParamChecker.checkFrequency(val); 690 coordJob.setFrequency(val); 691 TimeUnit tmp = (evalFreq.getVariable("timeunit") == null) ? TimeUnit.MINUTE : ((TimeUnit) evalFreq 692 .getVariable("timeunit")); 693 try { 694 Integer.parseInt(val); 695 } 696 catch (NumberFormatException ex) { 697 tmp=TimeUnit.CRON; 698 } 699 700 addAnAttribute("freq_timeunit", eAppXml, tmp.toString()); 701 // TimeUnit 702 coordJob.setTimeUnit(CoordinatorJob.Timeunit.valueOf(tmp.toString())); 703 // End Of Duration 704 tmp = evalFreq.getVariable("endOfDuration") == null ? TimeUnit.NONE : ((TimeUnit) evalFreq 705 .getVariable("endOfDuration")); 706 addAnAttribute("end_of_duration", eAppXml, tmp.toString()); 707 // coordJob.setEndOfDuration(tmp) // TODO: Add new attribute in Job bean 708 709 // Application name 710 if (this.coordName == null) { 711 String name = ELUtils.resolveAppName(eAppXml.getAttribute("name").getValue(), conf); 712 coordJob.setAppName(name); 713 } 714 else { 715 // this coord job is created from bundle 716 coordJob.setAppName(this.coordName); 717 } 718 719 // start time 720 val = resolveAttribute("start", eAppXml, evalNofuncs); 721 ParamChecker.checkDateOozieTZ(val, "start"); 722 coordJob.setStartTime(DateUtils.parseDateOozieTZ(val)); 723 // end time 724 val = resolveAttribute("end", eAppXml, evalNofuncs); 725 ParamChecker.checkDateOozieTZ(val, "end"); 726 coordJob.setEndTime(DateUtils.parseDateOozieTZ(val)); 727 // Time zone 728 val = resolveAttribute("timezone", eAppXml, evalNofuncs); 729 ParamChecker.checkTimeZone(val, "timezone"); 730 coordJob.setTimeZone(val); 731 732 // controls 733 val = resolveTagContents("timeout", eAppXml.getChild("controls", eAppXml.getNamespace()), evalTimeout); 734 if (val != null && val != "") { 735 int t = Integer.parseInt(val); 736 tmp = (evalTimeout.getVariable("timeunit") == null) ? TimeUnit.MINUTE : ((TimeUnit) evalTimeout 737 .getVariable("timeunit")); 738 switch (tmp) { 739 case HOUR: 740 val = String.valueOf(t * 60); 741 break; 742 case DAY: 743 val = String.valueOf(t * 60 * 24); 744 break; 745 case MONTH: 746 val = String.valueOf(t * 60 * 24 * 30); 747 break; 748 default: 749 break; 750 } 751 } 752 else { 753 val = ConfigurationService.get(CONF_DEFAULT_TIMEOUT_NORMAL); 754 } 755 756 ival = ParamChecker.checkInteger(val, "timeout"); 757 if (ival < 0 || ival > ConfigurationService.getInt(CONF_DEFAULT_MAX_TIMEOUT)) { 758 ival = ConfigurationService.getInt(CONF_DEFAULT_MAX_TIMEOUT); 759 } 760 coordJob.setTimeout(ival); 761 762 val = resolveTagContents("concurrency", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs); 763 if (val == null || val.isEmpty()) { 764 val = ConfigurationService.get(CONF_DEFAULT_CONCURRENCY); 765 } 766 ival = ParamChecker.checkInteger(val, "concurrency"); 767 coordJob.setConcurrency(ival); 768 769 val = resolveTagContents("throttle", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs); 770 if (val == null || val.isEmpty()) { 771 int defaultThrottle = ConfigurationService.getInt(CONF_DEFAULT_THROTTLE); 772 ival = defaultThrottle; 773 } 774 else { 775 ival = ParamChecker.checkInteger(val, "throttle"); 776 } 777 int maxQueue = ConfigurationService.getInt(CONF_QUEUE_SIZE); 778 float factor = ConfigurationService.getFloat(CONF_MAT_THROTTLING_FACTOR); 779 int maxThrottle = (int) (maxQueue * factor); 780 if (ival > maxThrottle || ival < 1) { 781 ival = maxThrottle; 782 } 783 LOG.debug("max throttle " + ival); 784 coordJob.setMatThrottling(ival); 785 786 val = resolveTagContents("execution", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs); 787 if (val == "") { 788 val = Execution.FIFO.toString(); 789 } 790 coordJob.setExecutionOrder(Execution.valueOf(val)); 791 String[] acceptedVals = { Execution.LIFO.toString(), Execution.FIFO.toString(), Execution.LAST_ONLY.toString(), 792 Execution.NONE.toString()}; 793 ParamChecker.isMember(val, acceptedVals, "execution"); 794 795 // datasets 796 resolveTagContents("include", eAppXml.getChild("datasets", eAppXml.getNamespace()), evalNofuncs); 797 // for each data set 798 resolveDataSets(eAppXml); 799 HashMap<String, String> dataNameList = new HashMap<String, String>(); 800 resolveIODataset(eAppXml); 801 resolveIOEvents(eAppXml, dataNameList); 802 803 if (CoordUtils.isInputLogicSpecified(eAppXml)) { 804 resolveInputLogic(eAppXml.getChild(CoordInputLogicEvaluator.INPUT_LOGIC, eAppXml.getNamespace()), evalInst, 805 dataNameList); 806 } 807 808 resolveTagContents("app-path", eAppXml.getChild("action", eAppXml.getNamespace()).getChild("workflow", 809 eAppXml.getNamespace()), evalNofuncs); 810 // TODO: If action or workflow tag is missing, NullPointerException will 811 // occur 812 Element configElem = eAppXml.getChild("action", eAppXml.getNamespace()).getChild("workflow", 813 eAppXml.getNamespace()).getChild("configuration", eAppXml.getNamespace()); 814 evalData = CoordELEvaluator.createELEvaluatorForDataEcho(conf, "coord-job-submit-data", dataNameList); 815 if (configElem != null) { 816 for (Element propElem : (List<Element>) configElem.getChildren("property", configElem.getNamespace())) { 817 resolveTagContents("name", propElem, evalData); 818 // Want to check the data-integrity but don't want to modify the 819 // XML 820 // for properties only 821 Element tmpProp = (Element) propElem.clone(); 822 resolveTagContents("value", tmpProp, evalData); 823 } 824 } 825 evalSla = CoordELEvaluator.createELEvaluatorForDataAndConf(conf, "coord-sla-submit", dataNameList); 826 resolveSLA(eAppXml, coordJob); 827 return eAppXml; 828 } 829 830 /** 831 * Resolve SLA events 832 * 833 * @param eAppXml job XML 834 * @param coordJob coordinator job bean 835 * @throws CommandException thrown if failed to resolve sla events 836 */ 837 private void resolveSLA(Element eAppXml, CoordinatorJobBean coordJob) throws CommandException { 838 Element eSla = XmlUtils.getSLAElement(eAppXml.getChild("action", eAppXml.getNamespace())); 839 840 if (eSla != null) { 841 String slaXml = XmlUtils.prettyPrint(eSla).toString(); 842 try { 843 // EL evaluation 844 slaXml = evalSla.evaluate(slaXml, String.class); 845 // Validate against semantic SXD 846 XmlUtils.validateData(slaXml, SchemaName.SLA_ORIGINAL); 847 } 848 catch (Exception e) { 849 throw new CommandException(ErrorCode.E1004, "Validation ERROR :" + e.getMessage(), e); 850 } 851 } 852 } 853 854 /** 855 * Resolve input-events/data-in and output-events/data-out tags. 856 * 857 * @param eJobOrg : Job element 858 * @throws CoordinatorJobException thrown if failed to resolve input and output events 859 */ 860 @SuppressWarnings("unchecked") 861 private void resolveIOEvents(Element eJobOrg, HashMap<String, String> dataNameList) throws CoordinatorJobException { 862 // Resolving input-events/data-in 863 // Clone the job and don't update anything in the original 864 Element eJob = (Element) eJobOrg.clone(); 865 Element inputList = eJob.getChild("input-events", eJob.getNamespace()); 866 if (inputList != null) { 867 TreeSet<String> eventNameSet = new TreeSet<String>(); 868 for (Element dataIn : (List<Element>) inputList.getChildren("data-in", eJob.getNamespace())) { 869 String dataInName = dataIn.getAttributeValue("name"); 870 dataNameList.put(dataInName, "data-in"); 871 // check whether there is any duplicate data-in name 872 if (eventNameSet.contains(dataInName)) { 873 throw new RuntimeException("Duplicate dataIn name " + dataInName); 874 } 875 else { 876 eventNameSet.add(dataInName); 877 } 878 resolveTagContents("instance", dataIn, evalInst); 879 resolveTagContents("start-instance", dataIn, evalInst); 880 resolveTagContents("end-instance", dataIn, evalInst); 881 882 } 883 } 884 // Resolving output-events/data-out 885 Element outputList = eJob.getChild("output-events", eJob.getNamespace()); 886 if (outputList != null) { 887 TreeSet<String> eventNameSet = new TreeSet<String>(); 888 for (Element dataOut : (List<Element>) outputList.getChildren("data-out", eJob.getNamespace())) { 889 String dataOutName = dataOut.getAttributeValue("name"); 890 dataNameList.put(dataOutName, "data-out"); 891 // check whether there is any duplicate data-out name 892 if (eventNameSet.contains(dataOutName)) { 893 throw new RuntimeException("Duplicate dataIn name " + dataOutName); 894 } 895 else { 896 eventNameSet.add(dataOutName); 897 } 898 resolveTagContents("instance", dataOut, evalInst); 899 900 } 901 } 902 903 } 904 905 private void resolveInputLogic(Element root, ELEvaluator evalInputLogic, HashMap<String, String> dataNameList) 906 throws Exception { 907 for (Object event : root.getChildren()) { 908 Element inputElement = (Element) event; 909 resolveAttribute("dataset", inputElement, evalInputLogic); 910 String name=resolveAttribute("name", inputElement, evalInputLogic); 911 resolveAttribute("or", inputElement, evalInputLogic); 912 resolveAttribute("and", inputElement, evalInputLogic); 913 resolveAttribute("combine", inputElement, evalInputLogic); 914 915 if (name != null) { 916 dataNameList.put(name, "data-in"); 917 } 918 919 if (!inputElement.getChildren().isEmpty()) { 920 resolveInputLogic(inputElement, evalInputLogic, dataNameList); 921 } 922 } 923 } 924 925 /** 926 * Resolve input-events/dataset and output-events/dataset tags. 927 * 928 * @param eJob : Job element 929 * @throws CoordinatorJobException thrown if failed to resolve input and output events 930 */ 931 @SuppressWarnings("unchecked") 932 private void resolveIODataset(Element eAppXml) throws CoordinatorJobException { 933 // Resolving input-events/data-in 934 Element inputList = eAppXml.getChild("input-events", eAppXml.getNamespace()); 935 if (inputList != null) { 936 for (Element dataIn : (List<Element>) inputList.getChildren("data-in", eAppXml.getNamespace())) { 937 resolveAttribute("dataset", dataIn, evalInst); 938 939 } 940 } 941 // Resolving output-events/data-out 942 Element outputList = eAppXml.getChild("output-events", eAppXml.getNamespace()); 943 if (outputList != null) { 944 for (Element dataOut : (List<Element>) outputList.getChildren("data-out", eAppXml.getNamespace())) { 945 resolveAttribute("dataset", dataOut, evalInst); 946 947 } 948 } 949 950 } 951 952 953 /** 954 * Add an attribute into XML element. 955 * 956 * @param attrName :attribute name 957 * @param elem : Element to add attribute 958 * @param value :Value of attribute 959 */ 960 private void addAnAttribute(String attrName, Element elem, String value) { 961 elem.setAttribute(attrName, value); 962 } 963 964 /** 965 * Resolve datasets using job configuration. 966 * 967 * @param eAppXml : Job Element XML 968 * @throws Exception thrown if failed to resolve datasets 969 */ 970 @SuppressWarnings("unchecked") 971 private void resolveDataSets(Element eAppXml) throws Exception { 972 Element datasetList = eAppXml.getChild("datasets", eAppXml.getNamespace()); 973 if (datasetList != null) { 974 975 List<Element> dsElems = datasetList.getChildren("dataset", eAppXml.getNamespace()); 976 resolveDataSets(dsElems); 977 resolveTagContents("app-path", eAppXml.getChild("action", eAppXml.getNamespace()).getChild("workflow", 978 eAppXml.getNamespace()), evalNofuncs); 979 } 980 } 981 982 /** 983 * Resolve datasets using job configuration. 984 * 985 * @param dsElems : Data set XML element. 986 * @throws CoordinatorJobException thrown if failed to resolve datasets 987 */ 988 private void resolveDataSets(List<Element> dsElems) throws CoordinatorJobException { 989 for (Element dsElem : dsElems) { 990 // Setting up default TimeUnit and EndOFDuraion 991 evalFreq.setVariable("timeunit", TimeUnit.MINUTE); 992 evalFreq.setVariable("endOfDuration", TimeUnit.NONE); 993 994 String val = resolveAttribute("frequency", dsElem, evalFreq); 995 int ival = ParamChecker.checkInteger(val, "frequency"); 996 ParamChecker.checkGTZero(ival, "frequency"); 997 addAnAttribute("freq_timeunit", dsElem, evalFreq.getVariable("timeunit") == null ? TimeUnit.MINUTE 998 .toString() : ((TimeUnit) evalFreq.getVariable("timeunit")).toString()); 999 addAnAttribute("end_of_duration", dsElem, evalFreq.getVariable("endOfDuration") == null ? TimeUnit.NONE 1000 .toString() : ((TimeUnit) evalFreq.getVariable("endOfDuration")).toString()); 1001 val = resolveAttribute("initial-instance", dsElem, evalInitialInstance); 1002 ParamChecker.checkDateOozieTZ(val, "initial-instance"); 1003 checkInitialInstance(val); 1004 val = resolveAttribute("timezone", dsElem, evalNofuncs); 1005 ParamChecker.checkTimeZone(val, "timezone"); 1006 resolveTagContents("uri-template", dsElem, evalNofuncs); 1007 resolveTagContents("done-flag", dsElem, evalNofuncs); 1008 } 1009 } 1010 1011 /** 1012 * Resolve the content of a tag. 1013 * 1014 * @param tagName : Tag name of job XML i.e. <timeout> 10 </timeout> 1015 * @param elem : Element where the tag exists. 1016 * @param eval : EL evealuator 1017 * @return Resolved tag content. 1018 * @throws CoordinatorJobException thrown if failed to resolve tag content 1019 */ 1020 @SuppressWarnings("unchecked") 1021 private String resolveTagContents(String tagName, Element elem, ELEvaluator eval) throws CoordinatorJobException { 1022 String ret = ""; 1023 if (elem != null) { 1024 for (Element tagElem : (List<Element>) elem.getChildren(tagName, elem.getNamespace())) { 1025 if (tagElem != null) { 1026 String updated; 1027 try { 1028 updated = CoordELFunctions.evalAndWrap(eval, tagElem.getText().trim()); 1029 1030 } 1031 catch (Exception e) { 1032 throw new CoordinatorJobException(ErrorCode.E1004, e.getMessage(), e); 1033 } 1034 tagElem.removeContent(); 1035 tagElem.addContent(updated); 1036 ret += updated; 1037 } 1038 } 1039 } 1040 return ret; 1041 } 1042 1043 /** 1044 * Resolve an attribute value. 1045 * 1046 * @param attrName : Attribute name. 1047 * @param elem : XML Element where attribute is defiend 1048 * @param eval : ELEvaluator used to resolve 1049 * @return Resolved attribute value 1050 * @throws CoordinatorJobException thrown if failed to resolve an attribute value 1051 */ 1052 private String resolveAttribute(String attrName, Element elem, ELEvaluator eval) throws CoordinatorJobException { 1053 Attribute attr = elem.getAttribute(attrName); 1054 String val = null; 1055 if (attr != null) { 1056 try { 1057 val = CoordELFunctions.evalAndWrap(eval, attr.getValue().trim()); 1058 } 1059 catch (Exception e) { 1060 throw new CoordinatorJobException(ErrorCode.E1004, e.getMessage(), e); 1061 } 1062 attr.setValue(val); 1063 } 1064 return val; 1065 } 1066 1067 /** 1068 * Include referred datasets into XML. 1069 * 1070 * @param resolvedXml : Job XML element. 1071 * @param conf : Job configuration 1072 * @throws CoordinatorJobException thrown if failed to include referred datasets into XML 1073 */ 1074 @SuppressWarnings("unchecked") 1075 protected void includeDataSets(Element resolvedXml, Configuration conf) throws CoordinatorJobException { 1076 Element datasets = resolvedXml.getChild("datasets", resolvedXml.getNamespace()); 1077 Element allDataSets = new Element("all_datasets", resolvedXml.getNamespace()); 1078 List<String> dsList = new ArrayList<String>(); 1079 if (datasets != null) { 1080 for (Element includeElem : (List<Element>) datasets.getChildren("include", datasets.getNamespace())) { 1081 String incDSFile = includeElem.getTextTrim(); 1082 includeOneDSFile(incDSFile, dsList, allDataSets, datasets.getNamespace()); 1083 } 1084 for (Element e : (List<Element>) datasets.getChildren("dataset", datasets.getNamespace())) { 1085 String dsName = e.getAttributeValue("name"); 1086 if (dsList.contains(dsName)) {// Override with this DS 1087 // Remove duplicate 1088 removeDataSet(allDataSets, dsName); 1089 } 1090 else { 1091 dsList.add(dsName); 1092 } 1093 allDataSets.addContent((Element) e.clone()); 1094 } 1095 } 1096 insertDataSet(resolvedXml, allDataSets); 1097 resolvedXml.removeChild("datasets", resolvedXml.getNamespace()); 1098 } 1099 1100 /** 1101 * Include one dataset file. 1102 * 1103 * @param incDSFile : Include data set filename. 1104 * @param dsList :List of dataset names to verify the duplicate. 1105 * @param allDataSets : Element that includes all dataset definitions. 1106 * @param dsNameSpace : Data set name space 1107 * @throws CoordinatorJobException thrown if failed to include one dataset file 1108 */ 1109 @SuppressWarnings("unchecked") 1110 private void includeOneDSFile(String incDSFile, List<String> dsList, Element allDataSets, Namespace dsNameSpace) 1111 throws CoordinatorJobException { 1112 Element tmpDataSets = null; 1113 try { 1114 String dsXml = readDefinition(incDSFile); 1115 LOG.debug("DSFILE :" + incDSFile + "\n" + dsXml); 1116 tmpDataSets = XmlUtils.parseXml(dsXml); 1117 } 1118 catch (JDOMException e) { 1119 LOG.warn("Error parsing included dataset [{0}]. Message [{1}]", incDSFile, e.getMessage()); 1120 throw new CoordinatorJobException(ErrorCode.E0700, e.getMessage()); 1121 } 1122 resolveDataSets(tmpDataSets.getChildren("dataset")); 1123 for (Element e : (List<Element>) tmpDataSets.getChildren("dataset")) { 1124 String dsName = e.getAttributeValue("name"); 1125 if (dsList.contains(dsName)) { 1126 throw new RuntimeException("Duplicate Dataset " + dsName); 1127 } 1128 dsList.add(dsName); 1129 Element tmp = (Element) e.clone(); 1130 // TODO: Don't like to over-write the external/include DS's namespace 1131 tmp.setNamespace(dsNameSpace); 1132 tmp.getChild("uri-template").setNamespace(dsNameSpace); 1133 if (e.getChild("done-flag") != null) { 1134 tmp.getChild("done-flag").setNamespace(dsNameSpace); 1135 } 1136 allDataSets.addContent(tmp); 1137 } 1138 // nested include 1139 for (Element includeElem : (List<Element>) tmpDataSets.getChildren("include", tmpDataSets.getNamespace())) { 1140 String incFile = includeElem.getTextTrim(); 1141 includeOneDSFile(incFile, dsList, allDataSets, dsNameSpace); 1142 } 1143 } 1144 1145 /** 1146 * Remove a dataset from a list of dataset. 1147 * 1148 * @param eDatasets : List of dataset 1149 * @param name : Dataset name to be removed. 1150 */ 1151 @SuppressWarnings("unchecked") 1152 private static void removeDataSet(Element eDatasets, String name) { 1153 for (Element eDataset : (List<Element>) eDatasets.getChildren("dataset", eDatasets.getNamespace())) { 1154 if (eDataset.getAttributeValue("name").equals(name)) { 1155 eDataset.detach(); 1156 return; 1157 } 1158 } 1159 throw new RuntimeException("undefined dataset: " + name); 1160 } 1161 1162 /** 1163 * Read coordinator definition. 1164 * 1165 * @param appPath application path. 1166 * @return coordinator definition. 1167 * @throws CoordinatorJobException thrown if the definition could not be read. 1168 */ 1169 protected String readDefinition(String appPath) throws CoordinatorJobException { 1170 String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME); 1171 // Configuration confHadoop = CoordUtils.getHadoopConf(conf); 1172 try { 1173 URI uri = new URI(appPath); 1174 LOG.debug("user =" + user); 1175 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 1176 Configuration fsConf = has.createJobConf(uri.getAuthority()); 1177 FileSystem fs = has.createFileSystem(user, uri, fsConf); 1178 Path appDefPath = null; 1179 1180 // app path could be a directory 1181 Path path = new Path(uri.getPath()); 1182 // check file exists for dataset include file, app xml already checked 1183 if (!fs.exists(path)) { 1184 throw new URISyntaxException(path.toString(), "path not existed : " + path.toString()); 1185 } 1186 if (!fs.isFile(path)) { 1187 appDefPath = new Path(path, COORDINATOR_XML_FILE); 1188 } else { 1189 appDefPath = path; 1190 } 1191 1192 Reader reader = new InputStreamReader(fs.open(appDefPath)); 1193 StringWriter writer = new StringWriter(); 1194 IOUtils.copyCharStream(reader, writer); 1195 return writer.toString(); 1196 } 1197 catch (IOException ex) { 1198 LOG.warn("IOException :" + XmlUtils.prettyPrint(conf), ex); 1199 throw new CoordinatorJobException(ErrorCode.E1001, ex.getMessage(), ex); 1200 } 1201 catch (URISyntaxException ex) { 1202 LOG.warn("URISyException :" + ex.getMessage()); 1203 throw new CoordinatorJobException(ErrorCode.E1002, appPath, ex.getMessage(), ex); 1204 } 1205 catch (HadoopAccessorException ex) { 1206 throw new CoordinatorJobException(ex); 1207 } 1208 catch (Exception ex) { 1209 LOG.warn("Exception :", ex); 1210 throw new CoordinatorJobException(ErrorCode.E1001, ex.getMessage(), ex); 1211 } 1212 } 1213 1214 /** 1215 * Write a coordinator job into database 1216 * 1217 *@param appXML : Coordinator definition xml 1218 * @param eJob : XML element of job 1219 * @param coordJob : Coordinator job bean 1220 * @return Job id 1221 * @throws CommandException thrown if unable to save coordinator job to db 1222 */ 1223 protected String storeToDB(String appXML, Element eJob, CoordinatorJobBean coordJob) throws CommandException { 1224 String jobId = Services.get().get(UUIDService.class).generateId(ApplicationType.COORDINATOR); 1225 coordJob.setId(jobId); 1226 1227 coordJob.setAppPath(conf.get(OozieClient.COORDINATOR_APP_PATH)); 1228 coordJob.setCreatedTime(new Date()); 1229 coordJob.setUser(conf.get(OozieClient.USER_NAME)); 1230 String group = ConfigUtils.getWithDeprecatedCheck(conf, OozieClient.JOB_ACL, OozieClient.GROUP_NAME, null); 1231 coordJob.setGroup(group); 1232 coordJob.setConf(XmlUtils.prettyPrint(conf).toString()); 1233 coordJob.setJobXml(XmlUtils.prettyPrint(eJob).toString()); 1234 coordJob.setLastActionNumber(0); 1235 coordJob.setLastModifiedTime(new Date()); 1236 1237 if (!dryrun) { 1238 coordJob.setLastModifiedTime(new Date()); 1239 try { 1240 CoordJobQueryExecutor.getInstance().insert(coordJob); 1241 } 1242 catch (JPAExecutorException jpaee) { 1243 coordJob.setId(null); 1244 coordJob.setStatus(CoordinatorJob.Status.FAILED); 1245 throw new CommandException(jpaee); 1246 } 1247 } 1248 return jobId; 1249 } 1250 1251 /* 1252 * this method checks if the initial-instance specified for a particular 1253 is not a date earlier than the oozie server default Jan 01, 1970 00:00Z UTC 1254 */ 1255 private void checkInitialInstance(String val) throws CoordinatorJobException, IllegalArgumentException { 1256 Date initialInstance, givenInstance; 1257 try { 1258 initialInstance = DateUtils.parseDateUTC("1970-01-01T00:00Z"); 1259 givenInstance = DateUtils.parseDateOozieTZ(val); 1260 } 1261 catch (Exception e) { 1262 throw new IllegalArgumentException("Unable to parse dataset initial-instance string '" + val + 1263 "' to Date object. ",e); 1264 } 1265 if(givenInstance.compareTo(initialInstance) < 0) { 1266 throw new CoordinatorJobException(ErrorCode.E1021, "Dataset initial-instance " + val + 1267 " is earlier than the default initial instance " + DateUtils.formatDateOozieTZ(initialInstance)); 1268 } 1269 } 1270 1271 /* (non-Javadoc) 1272 * @see org.apache.oozie.command.XCommand#getEntityKey() 1273 */ 1274 @Override 1275 public String getEntityKey() { 1276 return null; 1277 } 1278 1279 /* (non-Javadoc) 1280 * @see org.apache.oozie.command.XCommand#isLockRequired() 1281 */ 1282 @Override 1283 protected boolean isLockRequired() { 1284 return false; 1285 } 1286 1287 /* (non-Javadoc) 1288 * @see org.apache.oozie.command.XCommand#loadState() 1289 */ 1290 @Override 1291 protected void loadState() throws CommandException { 1292 jpaService = Services.get().get(JPAService.class); 1293 if (jpaService == null) { 1294 throw new CommandException(ErrorCode.E0610); 1295 } 1296 coordJob = new CoordinatorJobBean(); 1297 if (this.bundleId != null) { 1298 // this coord job is created from bundle 1299 coordJob.setBundleId(this.bundleId); 1300 // first use bundle id if submit thru bundle 1301 LogUtils.setLogInfo(this.bundleId); 1302 } 1303 if (this.coordName != null) { 1304 // this coord job is created from bundle 1305 coordJob.setAppName(this.coordName); 1306 } 1307 setJob(coordJob); 1308 1309 } 1310 1311 /* (non-Javadoc) 1312 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 1313 */ 1314 @Override 1315 protected void verifyPrecondition() throws CommandException { 1316 1317 } 1318 1319 /* (non-Javadoc) 1320 * @see org.apache.oozie.command.TransitionXCommand#notifyParent() 1321 */ 1322 @Override 1323 public void notifyParent() throws CommandException { 1324 // update bundle action 1325 if (coordJob.getBundleId() != null) { 1326 LOG.debug("Updating bundle record: " + coordJob.getBundleId() + " for coord id: " + coordJob.getId()); 1327 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus); 1328 bundleStatusUpdate.call(); 1329 } 1330 } 1331 1332 /* (non-Javadoc) 1333 * @see org.apache.oozie.command.TransitionXCommand#updateJob() 1334 */ 1335 @Override 1336 public void updateJob() throws CommandException { 1337 } 1338 1339 /* (non-Javadoc) 1340 * @see org.apache.oozie.command.TransitionXCommand#getJob() 1341 */ 1342 @Override 1343 public Job getJob() { 1344 return coordJob; 1345 } 1346 1347 @Override 1348 public void performWrites() throws CommandException { 1349 } 1350}