001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.command.coord; 020 021import java.io.IOException; 022import java.io.InputStreamReader; 023import java.io.Reader; 024import java.io.StringReader; 025import java.io.StringWriter; 026import java.net.URI; 027import java.net.URISyntaxException; 028import java.util.ArrayList; 029import java.util.Calendar; 030import java.util.Date; 031import java.util.HashMap; 032import java.util.HashSet; 033import java.util.Iterator; 034import java.util.List; 035import java.util.Map; 036import java.util.Set; 037import java.util.TreeSet; 038 039import javax.xml.transform.stream.StreamSource; 040import javax.xml.validation.Validator; 041 042import org.apache.hadoop.conf.Configuration; 043import org.apache.hadoop.fs.FileSystem; 044import org.apache.hadoop.fs.Path; 045import org.apache.oozie.CoordinatorJobBean; 046import org.apache.oozie.ErrorCode; 047import org.apache.oozie.client.CoordinatorJob; 048import org.apache.oozie.client.Job; 049import org.apache.oozie.client.OozieClient; 050import org.apache.oozie.client.CoordinatorJob.Execution; 051import org.apache.oozie.command.CommandException; 052import org.apache.oozie.command.SubmitTransitionXCommand; 053import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand; 054import org.apache.oozie.coord.CoordELEvaluator; 055import org.apache.oozie.coord.CoordELFunctions; 056import org.apache.oozie.coord.CoordUtils; 057import org.apache.oozie.coord.CoordinatorJobException; 058import org.apache.oozie.coord.TimeUnit; 059import org.apache.oozie.coord.input.logic.CoordInputLogicEvaluator; 060import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; 061import org.apache.oozie.executor.jpa.JPAExecutorException; 062import org.apache.oozie.service.CoordMaterializeTriggerService; 063import org.apache.oozie.service.ConfigurationService; 064import org.apache.oozie.service.HadoopAccessorException; 065import org.apache.oozie.service.HadoopAccessorService; 066import org.apache.oozie.service.JPAService; 067import org.apache.oozie.service.SchemaService; 068import org.apache.oozie.service.Service; 069import org.apache.oozie.service.Services; 070import org.apache.oozie.service.UUIDService; 071import org.apache.oozie.service.SchemaService.SchemaName; 072import org.apache.oozie.service.UUIDService.ApplicationType; 073import org.apache.oozie.util.ConfigUtils; 074import org.apache.oozie.util.DateUtils; 075import org.apache.oozie.util.ELEvaluator; 076import org.apache.oozie.util.ELUtils; 077import org.apache.oozie.util.IOUtils; 078import org.apache.oozie.util.InstrumentUtils; 079import org.apache.oozie.util.LogUtils; 080import org.apache.oozie.util.ParamChecker; 081import org.apache.oozie.util.ParameterVerifier; 082import org.apache.oozie.util.ParameterVerifierException; 083import org.apache.oozie.util.PropertiesUtils; 084import org.apache.oozie.util.XConfiguration; 085import org.apache.oozie.util.XmlUtils; 086import org.jdom.Attribute; 087import org.jdom.Element; 088import org.jdom.JDOMException; 089import org.jdom.Namespace; 090import org.xml.sax.SAXException; 091 092/** 093 * This class provides the functionalities to resolve a coordinator job XML and write the job information into a DB 094 * table. 095 * <p> 096 * Specifically it performs the following functions: 1. Resolve all the variables or properties using job 097 * configurations. 2. Insert all datasets definition as part of the <data-in> and <data-out> tags. 3. Validate the XML 098 * at runtime. 099 */ 100public class CoordSubmitXCommand extends SubmitTransitionXCommand { 101 102 protected Configuration conf; 103 protected final String bundleId; 104 protected final String coordName; 105 protected boolean dryrun; 106 protected JPAService jpaService = null; 107 private CoordinatorJob.Status prevStatus = CoordinatorJob.Status.PREP; 108 109 public static final String CONFIG_DEFAULT = "coord-config-default.xml"; 110 public static final String COORDINATOR_XML_FILE = "coordinator.xml"; 111 public final String COORD_INPUT_EVENTS ="input-events"; 112 public final String COORD_OUTPUT_EVENTS = "output-events"; 113 public final String COORD_INPUT_EVENTS_DATA_IN ="data-in"; 114 public final String COORD_OUTPUT_EVENTS_DATA_OUT = "data-out"; 115 116 private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>(); 117 private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>(); 118 119 protected CoordinatorJobBean coordJob = null; 120 /** 121 * Default timeout for normal jobs, in minutes, after which coordinator input check will timeout 122 */ 123 public static final String CONF_DEFAULT_TIMEOUT_NORMAL = Service.CONF_PREFIX + "coord.normal.default.timeout"; 124 125 public static final String CONF_DEFAULT_CONCURRENCY = Service.CONF_PREFIX + "coord.default.concurrency"; 126 127 public static final String CONF_DEFAULT_THROTTLE = Service.CONF_PREFIX + "coord.default.throttle"; 128 129 public static final String CONF_MAT_THROTTLING_FACTOR = Service.CONF_PREFIX 130 + "coord.materialization.throttling.factor"; 131 132 /** 133 * Default MAX timeout in minutes, after which coordinator input check will timeout 134 */ 135 public static final String CONF_DEFAULT_MAX_TIMEOUT = Service.CONF_PREFIX + "coord.default.max.timeout"; 136 137 public static final String CONF_QUEUE_SIZE = Service.CONF_PREFIX + "CallableQueueService.queue.size"; 138 139 public static final String CONF_CHECK_MAX_FREQUENCY = Service.CONF_PREFIX + "coord.check.maximum.frequency"; 140 141 private ELEvaluator evalFreq = null; 142 private ELEvaluator evalNofuncs = null; 143 private ELEvaluator evalData = null; 144 private ELEvaluator evalInst = null; 145 private ELEvaluator evalAction = null; 146 private ELEvaluator evalSla = null; 147 private ELEvaluator evalTimeout = null; 148 private ELEvaluator evalInitialInstance = null; 149 150 static { 151 String[] badUserProps = { PropertiesUtils.YEAR, PropertiesUtils.MONTH, PropertiesUtils.DAY, 152 PropertiesUtils.HOUR, PropertiesUtils.MINUTE, PropertiesUtils.DAYS, PropertiesUtils.HOURS, 153 PropertiesUtils.MINUTES, PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB, 154 PropertiesUtils.TB, PropertiesUtils.PB, PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN, 155 PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN, PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS }; 156 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES); 157 158 String[] badDefaultProps = { PropertiesUtils.HADOOP_USER}; 159 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES); 160 PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES); 161 } 162 163 /** 164 * Constructor to create the Coordinator Submit Command. 165 * 166 * @param conf : Configuration for Coordinator job 167 */ 168 public CoordSubmitXCommand(Configuration conf) { 169 super("coord_submit", "coord_submit", 1); 170 this.conf = ParamChecker.notNull(conf, "conf"); 171 this.bundleId = null; 172 this.coordName = null; 173 } 174 175 /** 176 * Constructor to create the Coordinator Submit Command by bundle job. 177 * 178 * @param conf : Configuration for Coordinator job 179 * @param bundleId : bundle id 180 * @param coordName : coord name 181 */ 182 protected CoordSubmitXCommand(Configuration conf, String bundleId, String coordName) { 183 super("coord_submit", "coord_submit", 1); 184 this.conf = ParamChecker.notNull(conf, "conf"); 185 this.bundleId = ParamChecker.notEmpty(bundleId, "bundleId"); 186 this.coordName = ParamChecker.notEmpty(coordName, "coordName"); 187 } 188 189 /** 190 * Constructor to create the Coordinator Submit Command. 191 * 192 * @param dryrun : if dryrun 193 * @param conf : Configuration for Coordinator job 194 */ 195 public CoordSubmitXCommand(boolean dryrun, Configuration conf) { 196 this(conf); 197 this.dryrun = dryrun; 198 } 199 200 /* (non-Javadoc) 201 * @see org.apache.oozie.command.XCommand#execute() 202 */ 203 @Override 204 protected String submit() throws CommandException { 205 LOG.info("STARTED Coordinator Submit"); 206 String jobId = submitJob(); 207 LOG.info("ENDED Coordinator Submit jobId=" + jobId); 208 return jobId; 209 } 210 211 protected String submitJob() throws CommandException { 212 String jobId = null; 213 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation()); 214 215 boolean exceptionOccured = false; 216 try { 217 mergeDefaultConfig(); 218 219 String appXml = readAndValidateXml(); 220 coordJob.setOrigJobXml(appXml); 221 LOG.debug("jobXml after initial validation " + XmlUtils.prettyPrint(appXml).toString()); 222 223 Element eXml = XmlUtils.parseXml(appXml); 224 225 String appNamespace = readAppNamespace(eXml); 226 coordJob.setAppNamespace(appNamespace); 227 228 ParameterVerifier.verifyParameters(conf, eXml); 229 230 appXml = XmlUtils.removeComments(appXml); 231 initEvaluators(); 232 Element eJob = basicResolveAndIncludeDS(appXml, conf, coordJob); 233 234 validateCoordinatorJob(); 235 236 // checking if the coordinator application data input/output events 237 // specify multiple data instance values in erroneous manner 238 checkMultipleTimeInstances(eJob, COORD_INPUT_EVENTS, COORD_INPUT_EVENTS_DATA_IN); 239 checkMultipleTimeInstances(eJob, COORD_OUTPUT_EVENTS, COORD_OUTPUT_EVENTS_DATA_OUT); 240 241 LOG.debug("jobXml after all validation " + XmlUtils.prettyPrint(eJob).toString()); 242 243 jobId = storeToDB(appXml, eJob, coordJob); 244 // log job info for coordinator job 245 LogUtils.setLogInfo(coordJob); 246 247 if (!dryrun) { 248 queueMaterializeTransitionXCommand(jobId); 249 } 250 else { 251 return getDryRun(coordJob); 252 } 253 } 254 catch (JDOMException jex) { 255 exceptionOccured = true; 256 LOG.warn("ERROR: ", jex); 257 throw new CommandException(ErrorCode.E0700, jex.getMessage(), jex); 258 } 259 catch (CoordinatorJobException cex) { 260 exceptionOccured = true; 261 LOG.warn("ERROR: ", cex); 262 throw new CommandException(cex); 263 } 264 catch (ParameterVerifierException pex) { 265 exceptionOccured = true; 266 LOG.warn("ERROR: ", pex); 267 throw new CommandException(pex); 268 } 269 catch (IllegalArgumentException iex) { 270 exceptionOccured = true; 271 LOG.warn("ERROR: ", iex); 272 throw new CommandException(ErrorCode.E1003, iex.getMessage(), iex); 273 } 274 catch (Exception ex) { 275 exceptionOccured = true; 276 LOG.warn("ERROR: ", ex); 277 throw new CommandException(ErrorCode.E0803, ex.getMessage(), ex); 278 } 279 finally { 280 if (exceptionOccured) { 281 if (coordJob.getId() == null || coordJob.getId().equalsIgnoreCase("")) { 282 coordJob.setStatus(CoordinatorJob.Status.FAILED); 283 coordJob.resetPending(); 284 } 285 } 286 } 287 return jobId; 288 } 289 290 /** 291 * Gets the dryrun output. 292 * 293 * @param coordJob the coordinatorJobBean 294 * @return the dry run 295 * @throws Exception the exception 296 */ 297 protected String getDryRun(CoordinatorJobBean coordJob) throws Exception{ 298 int materializationWindow = ConfigurationService 299 .getInt(CoordMaterializeTriggerService.CONF_MATERIALIZATION_WINDOW); 300 Date startTime = coordJob.getStartTime(); 301 long startTimeMilli = startTime.getTime(); 302 long endTimeMilli = startTimeMilli + (materializationWindow * 1000); 303 Date jobEndTime = coordJob.getEndTime(); 304 Date endTime = new Date(endTimeMilli); 305 if (endTime.compareTo(jobEndTime) > 0) { 306 endTime = jobEndTime; 307 } 308 String jobId = coordJob.getId(); 309 LOG.info("[" + jobId + "]: Update status to RUNNING"); 310 coordJob.setStatus(Job.Status.RUNNING); 311 coordJob.setPending(); 312 Configuration jobConf = null; 313 try { 314 jobConf = new XConfiguration(new StringReader(coordJob.getConf())); 315 } 316 catch (IOException e1) { 317 LOG.warn("Configuration parse error. read from DB :" + coordJob.getConf(), e1); 318 } 319 String action = new CoordMaterializeTransitionXCommand(coordJob, materializationWindow, startTime, 320 endTime).materializeActions(true); 321 String output = coordJob.getJobXml() + System.getProperty("line.separator") 322 + "***actions for instance***" + action; 323 return output; 324 } 325 326 /** 327 * Queue MaterializeTransitionXCommand 328 */ 329 protected void queueMaterializeTransitionXCommand(String jobId) { 330 int materializationWindow = ConfigurationService 331 .getInt(CoordMaterializeTriggerService.CONF_MATERIALIZATION_WINDOW); 332 queue(new CoordMaterializeTransitionXCommand(jobId, materializationWindow), 100); 333 } 334 335 /** 336 * Method that validates values in the definition for correctness. Placeholder to add more. 337 */ 338 private void validateCoordinatorJob() throws Exception { 339 // check if startTime < endTime 340 if (!coordJob.getStartTime().before(coordJob.getEndTime())) { 341 throw new IllegalArgumentException("Coordinator Start Time must be earlier than End Time."); 342 } 343 344 try { 345 // Check if a coord job with cron frequency will materialize actions 346 int freq = Integer.parseInt(coordJob.getFrequency()); 347 348 // Check if the frequency is faster than 5 min if enabled 349 if (ConfigurationService.getBoolean(CONF_CHECK_MAX_FREQUENCY)) { 350 CoordinatorJob.Timeunit unit = coordJob.getTimeUnit(); 351 if (freq == 0 || (freq < 5 && unit == CoordinatorJob.Timeunit.MINUTE)) { 352 throw new IllegalArgumentException("Coordinator job with frequency [" + freq + 353 "] minutes is faster than allowed maximum of 5 minutes (" 354 + CONF_CHECK_MAX_FREQUENCY + " is set to true)"); 355 } 356 } 357 } catch (NumberFormatException e) { 358 Date start = coordJob.getStartTime(); 359 Calendar cal = Calendar.getInstance(); 360 cal.setTime(start); 361 cal.add(Calendar.MINUTE, -1); 362 start = cal.getTime(); 363 364 Date nextTime = CoordCommandUtils.getNextValidActionTimeForCronFrequency(start, coordJob); 365 if (nextTime == null) { 366 throw new IllegalArgumentException("Invalid coordinator cron frequency: " + coordJob.getFrequency()); 367 } 368 if (!nextTime.before(coordJob.getEndTime())) { 369 throw new IllegalArgumentException("Coordinator job with frequency '" + 370 coordJob.getFrequency() + "' materializes no actions between start and end time."); 371 } 372 } 373 } 374 375 /* 376 * Check against multiple data instance values inside a single <instance> <start-instance> or <end-instance> tag 377 * If found, the job is not submitted and user is informed to correct the error, instead of defaulting to the first instance value in the list 378 */ 379 private void checkMultipleTimeInstances(Element eCoordJob, String eventType, String dataType) throws CoordinatorJobException { 380 Element eventsSpec, dataSpec, instance; 381 List<Element> instanceSpecList; 382 Namespace ns = eCoordJob.getNamespace(); 383 String instanceValue; 384 eventsSpec = eCoordJob.getChild(eventType, ns); 385 if (eventsSpec != null) { 386 dataSpec = eventsSpec.getChild(dataType, ns); 387 if (dataSpec != null) { 388 // In case of input-events, there can be multiple child <instance> datasets. Iterating to ensure none of them have errors 389 instanceSpecList = dataSpec.getChildren("instance", ns); 390 Iterator instanceIter = instanceSpecList.iterator(); 391 while(instanceIter.hasNext()) { 392 instance = ((Element) instanceIter.next()); 393 if(instance.getContentSize() == 0) { //empty string or whitespace 394 throw new CoordinatorJobException(ErrorCode.E1021, "<instance> tag within " + eventType + " is empty!"); 395 } 396 instanceValue = instance.getContent(0).toString(); 397 boolean isInvalid = false; 398 try { 399 isInvalid = evalAction.checkForExistence(instanceValue, ","); 400 } catch (Exception e) { 401 handleELParseException(eventType, dataType, instanceValue); 402 } 403 if (isInvalid) { // reaching this block implies instance is not empty i.e. length > 0 404 handleExpresionWithMultipleInstances(eventType, dataType, instanceValue); 405 } 406 } 407 408 // In case of input-events, there can be multiple child <start-instance> datasets. Iterating to ensure none of them have errors 409 instanceSpecList = dataSpec.getChildren("start-instance", ns); 410 instanceIter = instanceSpecList.iterator(); 411 while(instanceIter.hasNext()) { 412 instance = ((Element) instanceIter.next()); 413 if(instance.getContentSize() == 0) { //empty string or whitespace 414 throw new CoordinatorJobException(ErrorCode.E1021, "<start-instance> tag within " + eventType + " is empty!"); 415 } 416 instanceValue = instance.getContent(0).toString(); 417 boolean isInvalid = false; 418 try { 419 isInvalid = evalAction.checkForExistence(instanceValue, ","); 420 } catch (Exception e) { 421 handleELParseException(eventType, dataType, instanceValue); 422 } 423 if (isInvalid) { // reaching this block implies start instance is not empty i.e. length > 0 424 handleExpresionWithStartMultipleInstances(eventType, dataType, instanceValue); 425 } 426 } 427 428 // In case of input-events, there can be multiple child <end-instance> datasets. Iterating to ensure none of them have errors 429 instanceSpecList = dataSpec.getChildren("end-instance", ns); 430 instanceIter = instanceSpecList.iterator(); 431 while(instanceIter.hasNext()) { 432 instance = ((Element) instanceIter.next()); 433 if(instance.getContentSize() == 0) { //empty string or whitespace 434 throw new CoordinatorJobException(ErrorCode.E1021, "<end-instance> tag within " + eventType + " is empty!"); 435 } 436 instanceValue = instance.getContent(0).toString(); 437 boolean isInvalid = false; 438 try { 439 isInvalid = evalAction.checkForExistence(instanceValue, ","); 440 } catch (Exception e) { 441 handleELParseException(eventType, dataType, instanceValue); 442 } 443 if (isInvalid) { // reaching this block implies instance is not empty i.e. length > 0 444 handleExpresionWithMultipleEndInstances(eventType, dataType, instanceValue); 445 } 446 } 447 448 } 449 } 450 } 451 452 private void handleELParseException(String eventType, String dataType, String instanceValue) 453 throws CoordinatorJobException { 454 String correctAction = null; 455 if(dataType.equals(COORD_INPUT_EVENTS_DATA_IN)) { 456 correctAction = "Coordinator app definition should have valid <instance> tag for data-in"; 457 } else if(dataType.equals(COORD_OUTPUT_EVENTS_DATA_OUT)) { 458 correctAction = "Coordinator app definition should have valid <instance> tag for data-out"; 459 } 460 throw new CoordinatorJobException(ErrorCode.E1021, eventType + " instance '" + instanceValue 461 + "' is not valid. Coordinator job NOT SUBMITTED. " + correctAction); 462 } 463 464 private void handleExpresionWithMultipleInstances(String eventType, String dataType, String instanceValue) 465 throws CoordinatorJobException { 466 String correctAction = null; 467 if(dataType.equals(COORD_INPUT_EVENTS_DATA_IN)) { 468 correctAction = "Coordinator app definition should have separate <instance> tag per data-in instance"; 469 } else if(dataType.equals(COORD_OUTPUT_EVENTS_DATA_OUT)) { 470 correctAction = "Coordinator app definition can have only one <instance> tag per data-out instance"; 471 } 472 throw new CoordinatorJobException(ErrorCode.E1021, eventType + " instance '" + instanceValue 473 + "' contains more than one date instance. Coordinator job NOT SUBMITTED. " + correctAction); 474 } 475 476 private void handleExpresionWithStartMultipleInstances(String eventType, String dataType, String instanceValue) 477 throws CoordinatorJobException { 478 String correctAction = "Coordinator app definition should not have multiple start-instances"; 479 throw new CoordinatorJobException(ErrorCode.E1021, eventType + " start-instance '" + instanceValue 480 + "' contains more than one date start-instance. Coordinator job NOT SUBMITTED. " + correctAction); 481 } 482 483 private void handleExpresionWithMultipleEndInstances(String eventType, String dataType, String instanceValue) 484 throws CoordinatorJobException { 485 String correctAction = "Coordinator app definition should not have multiple end-instances"; 486 throw new CoordinatorJobException(ErrorCode.E1021, eventType + " end-instance '" + instanceValue 487 + "' contains more than one date end-instance. Coordinator job NOT SUBMITTED. " + correctAction); 488 } 489 /** 490 * Read the application XML and validate against coordinator Schema 491 * 492 * @return validated coordinator XML 493 * @throws CoordinatorJobException thrown if unable to read or validate coordinator xml 494 */ 495 protected String readAndValidateXml() throws CoordinatorJobException { 496 String appPath = ParamChecker.notEmpty(conf.get(OozieClient.COORDINATOR_APP_PATH), 497 OozieClient.COORDINATOR_APP_PATH); 498 String coordXml = readDefinition(appPath); 499 validateXml(coordXml); 500 return coordXml; 501 } 502 503 /** 504 * Validate against Coordinator XSD file 505 * 506 * @param xmlContent : Input coordinator xml 507 * @throws CoordinatorJobException thrown if unable to validate coordinator xml 508 */ 509 private void validateXml(String xmlContent) throws CoordinatorJobException { 510 javax.xml.validation.Schema schema = Services.get().get(SchemaService.class).getSchema(SchemaName.COORDINATOR); 511 Validator validator = schema.newValidator(); 512 try { 513 validator.validate(new StreamSource(new StringReader(xmlContent))); 514 } 515 catch (SAXException ex) { 516 LOG.warn("SAXException :", ex); 517 throw new CoordinatorJobException(ErrorCode.E0701, ex.getMessage(), ex); 518 } 519 catch (IOException ex) { 520 LOG.warn("IOException :", ex); 521 throw new CoordinatorJobException(ErrorCode.E0702, ex.getMessage(), ex); 522 } 523 } 524 525 /** 526 * Read the application XML schema namespace 527 * 528 * @param coordXmlElement input coordinator xml Element 529 * @return app xml namespace 530 * @throws CoordinatorJobException 531 */ 532 private String readAppNamespace(Element coordXmlElement) throws CoordinatorJobException { 533 Namespace ns = coordXmlElement.getNamespace(); 534 if (ns != null && bundleId != null && ns.getURI().equals(SchemaService.COORDINATOR_NAMESPACE_URI_1)) { 535 throw new CoordinatorJobException(ErrorCode.E1319, "bundle app can not submit coordinator namespace " 536 + SchemaService.COORDINATOR_NAMESPACE_URI_1 + ", please use 0.2 or later"); 537 } 538 if (ns != null) { 539 return ns.getURI(); 540 } 541 else { 542 throw new CoordinatorJobException(ErrorCode.E0700, "the application xml namespace is not given"); 543 } 544 } 545 546 /** 547 * Merge default configuration with user-defined configuration. 548 * 549 * @throws CommandException thrown if failed to read or merge configurations 550 */ 551 protected void mergeDefaultConfig() throws CommandException { 552 Path configDefault = null; 553 try { 554 String coordAppPathStr = conf.get(OozieClient.COORDINATOR_APP_PATH); 555 Path coordAppPath = new Path(coordAppPathStr); 556 String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME); 557 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 558 Configuration fsConf = has.createJobConf(coordAppPath.toUri().getAuthority()); 559 FileSystem fs = has.createFileSystem(user, coordAppPath.toUri(), fsConf); 560 561 // app path could be a directory 562 if (!fs.isFile(coordAppPath)) { 563 configDefault = new Path(coordAppPath, CONFIG_DEFAULT); 564 } else { 565 configDefault = new Path(coordAppPath.getParent(), CONFIG_DEFAULT); 566 } 567 568 if (fs.exists(configDefault)) { 569 Configuration defaultConf = new XConfiguration(fs.open(configDefault)); 570 PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES); 571 XConfiguration.injectDefaults(defaultConf, conf); 572 } 573 else { 574 LOG.info("configDefault Doesn't exist " + configDefault); 575 } 576 PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES); 577 578 // Resolving all variables in the job properties. 579 // This ensures the Hadoop Configuration semantics is preserved. 580 XConfiguration resolvedVarsConf = new XConfiguration(); 581 for (Map.Entry<String, String> entry : conf) { 582 resolvedVarsConf.set(entry.getKey(), conf.get(entry.getKey())); 583 } 584 conf = resolvedVarsConf; 585 } 586 catch (IOException e) { 587 throw new CommandException(ErrorCode.E0702, e.getMessage() + " : Problem reading default config " 588 + configDefault, e); 589 } 590 catch (HadoopAccessorException e) { 591 throw new CommandException(e); 592 } 593 LOG.debug("Merged CONF :" + XmlUtils.prettyPrint(conf).toString()); 594 } 595 596 /** 597 * The method resolve all the variables that are defined in configuration. It also include the data set definition 598 * from dataset file into XML. 599 * 600 * @param appXml : Original job XML 601 * @param conf : Configuration of the job 602 * @param coordJob : Coordinator job bean to be populated. 603 * @return Resolved and modified job XML element. 604 * @throws CoordinatorJobException thrown if failed to resolve basic entities or include referred datasets 605 * @throws Exception thrown if failed to resolve basic entities or include referred datasets 606 */ 607 public Element basicResolveAndIncludeDS(String appXml, Configuration conf, CoordinatorJobBean coordJob) 608 throws CoordinatorJobException, Exception { 609 Element basicResolvedApp = resolveInitial(conf, appXml, coordJob); 610 includeDataSets(basicResolvedApp, conf); 611 return basicResolvedApp; 612 } 613 614 /** 615 * Insert data set into data-in and data-out tags. 616 * 617 * @param eAppXml : coordinator application XML 618 * @param eDatasets : DataSet XML 619 */ 620 @SuppressWarnings("unchecked") 621 private void insertDataSet(Element eAppXml, Element eDatasets) { 622 // Adding DS definition in the coordinator XML 623 Element inputList = eAppXml.getChild("input-events", eAppXml.getNamespace()); 624 if (inputList != null) { 625 for (Element dataIn : (List<Element>) inputList.getChildren("data-in", eAppXml.getNamespace())) { 626 Element eDataset = findDataSet(eDatasets, dataIn.getAttributeValue("dataset")); 627 dataIn.getContent().add(0, eDataset); 628 } 629 } 630 Element outputList = eAppXml.getChild("output-events", eAppXml.getNamespace()); 631 if (outputList != null) { 632 for (Element dataOut : (List<Element>) outputList.getChildren("data-out", eAppXml.getNamespace())) { 633 Element eDataset = findDataSet(eDatasets, dataOut.getAttributeValue("dataset")); 634 dataOut.getContent().add(0, eDataset); 635 } 636 } 637 } 638 639 /** 640 * Find a specific dataset from a list of Datasets. 641 * 642 * @param eDatasets : List of data sets 643 * @param name : queried data set name 644 * @return one Dataset element. otherwise throw Exception 645 */ 646 @SuppressWarnings("unchecked") 647 private static Element findDataSet(Element eDatasets, String name) { 648 for (Element eDataset : (List<Element>) eDatasets.getChildren("dataset", eDatasets.getNamespace())) { 649 if (eDataset.getAttributeValue("name").equals(name)) { 650 eDataset = (Element) eDataset.clone(); 651 eDataset.detach(); 652 return eDataset; 653 } 654 } 655 throw new RuntimeException("undefined dataset: " + name); 656 } 657 658 /** 659 * Initialize all the required EL Evaluators. 660 */ 661 protected void initEvaluators() { 662 evalFreq = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-submit-freq"); 663 evalNofuncs = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-submit-nofuncs"); 664 evalInst = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-submit-instances"); 665 evalAction = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-action-start"); 666 evalTimeout = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-wait-timeout"); 667 evalInitialInstance = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-submit-initial-instance"); 668 669 } 670 671 /** 672 * Resolve basic entities using job Configuration. 673 * 674 * @param conf :Job configuration 675 * @param appXml : Original job XML 676 * @param coordJob : Coordinator job bean to be populated. 677 * @return Resolved job XML element. 678 * @throws CoordinatorJobException thrown if failed to resolve basic entities 679 * @throws Exception thrown if failed to resolve basic entities 680 */ 681 @SuppressWarnings("unchecked") 682 protected Element resolveInitial(Configuration conf, String appXml, CoordinatorJobBean coordJob) 683 throws CoordinatorJobException, Exception { 684 Element eAppXml = XmlUtils.parseXml(appXml); 685 // job's main attributes 686 // frequency 687 String val = resolveAttribute("frequency", eAppXml, evalFreq); 688 int ival = 0; 689 690 val = ParamChecker.checkFrequency(val); 691 coordJob.setFrequency(val); 692 TimeUnit tmp = (evalFreq.getVariable("timeunit") == null) ? TimeUnit.MINUTE : ((TimeUnit) evalFreq 693 .getVariable("timeunit")); 694 try { 695 Integer.parseInt(val); 696 } 697 catch (NumberFormatException ex) { 698 tmp=TimeUnit.CRON; 699 } 700 701 addAnAttribute("freq_timeunit", eAppXml, tmp.toString()); 702 // TimeUnit 703 coordJob.setTimeUnit(CoordinatorJob.Timeunit.valueOf(tmp.toString())); 704 // End Of Duration 705 tmp = evalFreq.getVariable("endOfDuration") == null ? TimeUnit.NONE : ((TimeUnit) evalFreq 706 .getVariable("endOfDuration")); 707 addAnAttribute("end_of_duration", eAppXml, tmp.toString()); 708 // coordJob.setEndOfDuration(tmp) // TODO: Add new attribute in Job bean 709 710 // Application name 711 if (this.coordName == null) { 712 String name = ELUtils.resolveAppName(eAppXml.getAttribute("name").getValue(), conf); 713 coordJob.setAppName(name); 714 } 715 else { 716 // this coord job is created from bundle 717 coordJob.setAppName(this.coordName); 718 } 719 720 // start time 721 val = resolveAttribute("start", eAppXml, evalNofuncs); 722 ParamChecker.checkDateOozieTZ(val, "start"); 723 coordJob.setStartTime(DateUtils.parseDateOozieTZ(val)); 724 // end time 725 val = resolveAttribute("end", eAppXml, evalNofuncs); 726 ParamChecker.checkDateOozieTZ(val, "end"); 727 coordJob.setEndTime(DateUtils.parseDateOozieTZ(val)); 728 // Time zone 729 val = resolveAttribute("timezone", eAppXml, evalNofuncs); 730 ParamChecker.checkTimeZone(val, "timezone"); 731 coordJob.setTimeZone(val); 732 733 // controls 734 val = resolveTagContents("timeout", eAppXml.getChild("controls", eAppXml.getNamespace()), evalTimeout); 735 if (val != null && val != "") { 736 int t = Integer.parseInt(val); 737 tmp = (evalTimeout.getVariable("timeunit") == null) ? TimeUnit.MINUTE : ((TimeUnit) evalTimeout 738 .getVariable("timeunit")); 739 switch (tmp) { 740 case HOUR: 741 val = String.valueOf(t * 60); 742 break; 743 case DAY: 744 val = String.valueOf(t * 60 * 24); 745 break; 746 case MONTH: 747 val = String.valueOf(t * 60 * 24 * 30); 748 break; 749 default: 750 break; 751 } 752 } 753 else { 754 val = ConfigurationService.get(CONF_DEFAULT_TIMEOUT_NORMAL); 755 } 756 757 ival = ParamChecker.checkInteger(val, "timeout"); 758 if (ival < 0 || ival > ConfigurationService.getInt(CONF_DEFAULT_MAX_TIMEOUT)) { 759 ival = ConfigurationService.getInt(CONF_DEFAULT_MAX_TIMEOUT); 760 } 761 coordJob.setTimeout(ival); 762 763 val = resolveTagContents("concurrency", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs); 764 if (val == null || val.isEmpty()) { 765 val = ConfigurationService.get(CONF_DEFAULT_CONCURRENCY); 766 } 767 ival = ParamChecker.checkInteger(val, "concurrency"); 768 coordJob.setConcurrency(ival); 769 770 val = resolveTagContents("throttle", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs); 771 if (val == null || val.isEmpty()) { 772 int defaultThrottle = ConfigurationService.getInt(CONF_DEFAULT_THROTTLE); 773 ival = defaultThrottle; 774 } 775 else { 776 ival = ParamChecker.checkInteger(val, "throttle"); 777 } 778 int maxQueue = ConfigurationService.getInt(CONF_QUEUE_SIZE); 779 float factor = ConfigurationService.getFloat(CONF_MAT_THROTTLING_FACTOR); 780 int maxThrottle = (int) (maxQueue * factor); 781 if (ival > maxThrottle || ival < 1) { 782 ival = maxThrottle; 783 } 784 LOG.debug("max throttle " + ival); 785 coordJob.setMatThrottling(ival); 786 787 val = resolveTagContents("execution", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs); 788 if (val == "") { 789 val = Execution.FIFO.toString(); 790 } 791 coordJob.setExecutionOrder(Execution.valueOf(val)); 792 String[] acceptedVals = { Execution.LIFO.toString(), Execution.FIFO.toString(), Execution.LAST_ONLY.toString(), 793 Execution.NONE.toString()}; 794 ParamChecker.isMember(val, acceptedVals, "execution"); 795 796 // datasets 797 resolveTagContents("include", eAppXml.getChild("datasets", eAppXml.getNamespace()), evalNofuncs); 798 // for each data set 799 resolveDataSets(eAppXml); 800 HashMap<String, String> dataNameList = new HashMap<String, String>(); 801 resolveIODataset(eAppXml); 802 resolveIOEvents(eAppXml, dataNameList); 803 804 if (CoordUtils.isInputLogicSpecified(eAppXml)) { 805 resolveInputLogic(eAppXml.getChild(CoordInputLogicEvaluator.INPUT_LOGIC, eAppXml.getNamespace()), evalInst, 806 dataNameList); 807 } 808 809 resolveTagContents("app-path", eAppXml.getChild("action", eAppXml.getNamespace()).getChild("workflow", 810 eAppXml.getNamespace()), evalNofuncs); 811 // TODO: If action or workflow tag is missing, NullPointerException will 812 // occur 813 Element configElem = eAppXml.getChild("action", eAppXml.getNamespace()).getChild("workflow", 814 eAppXml.getNamespace()).getChild("configuration", eAppXml.getNamespace()); 815 evalData = CoordELEvaluator.createELEvaluatorForDataEcho(conf, "coord-job-submit-data", dataNameList); 816 if (configElem != null) { 817 for (Element propElem : (List<Element>) configElem.getChildren("property", configElem.getNamespace())) { 818 resolveTagContents("name", propElem, evalData); 819 // Want to check the data-integrity but don't want to modify the 820 // XML 821 // for properties only 822 Element tmpProp = (Element) propElem.clone(); 823 resolveTagContents("value", tmpProp, evalData); 824 } 825 } 826 evalSla = CoordELEvaluator.createELEvaluatorForDataAndConf(conf, "coord-sla-submit", dataNameList); 827 resolveSLA(eAppXml, coordJob); 828 return eAppXml; 829 } 830 831 /** 832 * Resolve SLA events 833 * 834 * @param eAppXml job XML 835 * @param coordJob coordinator job bean 836 * @throws CommandException thrown if failed to resolve sla events 837 */ 838 private void resolveSLA(Element eAppXml, CoordinatorJobBean coordJob) throws CommandException { 839 Element eSla = XmlUtils.getSLAElement(eAppXml.getChild("action", eAppXml.getNamespace())); 840 841 if (eSla != null) { 842 String slaXml = XmlUtils.prettyPrint(eSla).toString(); 843 try { 844 // EL evaluation 845 slaXml = evalSla.evaluate(slaXml, String.class); 846 // Validate against semantic SXD 847 XmlUtils.validateData(slaXml, SchemaName.SLA_ORIGINAL); 848 } 849 catch (Exception e) { 850 throw new CommandException(ErrorCode.E1004, "Validation ERROR :" + e.getMessage(), e); 851 } 852 } 853 } 854 855 /** 856 * Resolve input-events/data-in and output-events/data-out tags. 857 * 858 * @param eJobOrg : Job element 859 * @throws CoordinatorJobException thrown if failed to resolve input and output events 860 */ 861 @SuppressWarnings("unchecked") 862 private void resolveIOEvents(Element eJobOrg, HashMap<String, String> dataNameList) throws CoordinatorJobException { 863 // Resolving input-events/data-in 864 // Clone the job and don't update anything in the original 865 Element eJob = (Element) eJobOrg.clone(); 866 Element inputList = eJob.getChild("input-events", eJob.getNamespace()); 867 if (inputList != null) { 868 TreeSet<String> eventNameSet = new TreeSet<String>(); 869 for (Element dataIn : (List<Element>) inputList.getChildren("data-in", eJob.getNamespace())) { 870 String dataInName = dataIn.getAttributeValue("name"); 871 dataNameList.put(dataInName, "data-in"); 872 // check whether there is any duplicate data-in name 873 if (eventNameSet.contains(dataInName)) { 874 throw new RuntimeException("Duplicate dataIn name " + dataInName); 875 } 876 else { 877 eventNameSet.add(dataInName); 878 } 879 resolveTagContents("instance", dataIn, evalInst); 880 resolveTagContents("start-instance", dataIn, evalInst); 881 resolveTagContents("end-instance", dataIn, evalInst); 882 883 } 884 } 885 // Resolving output-events/data-out 886 Element outputList = eJob.getChild("output-events", eJob.getNamespace()); 887 if (outputList != null) { 888 TreeSet<String> eventNameSet = new TreeSet<String>(); 889 for (Element dataOut : (List<Element>) outputList.getChildren("data-out", eJob.getNamespace())) { 890 String dataOutName = dataOut.getAttributeValue("name"); 891 dataNameList.put(dataOutName, "data-out"); 892 // check whether there is any duplicate data-out name 893 if (eventNameSet.contains(dataOutName)) { 894 throw new RuntimeException("Duplicate dataIn name " + dataOutName); 895 } 896 else { 897 eventNameSet.add(dataOutName); 898 } 899 resolveTagContents("instance", dataOut, evalInst); 900 901 } 902 } 903 904 } 905 906 private void resolveInputLogic(Element root, ELEvaluator evalInputLogic, HashMap<String, String> dataNameList) 907 throws Exception { 908 for (Object event : root.getChildren()) { 909 Element inputElement = (Element) event; 910 resolveAttribute("dataset", inputElement, evalInputLogic); 911 String name=resolveAttribute("name", inputElement, evalInputLogic); 912 resolveAttribute("or", inputElement, evalInputLogic); 913 resolveAttribute("and", inputElement, evalInputLogic); 914 resolveAttribute("combine", inputElement, evalInputLogic); 915 916 if (name != null) { 917 dataNameList.put(name, "data-in"); 918 } 919 920 if (!inputElement.getChildren().isEmpty()) { 921 resolveInputLogic(inputElement, evalInputLogic, dataNameList); 922 } 923 } 924 } 925 926 /** 927 * Resolve input-events/dataset and output-events/dataset tags. 928 * 929 * @param eJob : Job element 930 * @throws CoordinatorJobException thrown if failed to resolve input and output events 931 */ 932 @SuppressWarnings("unchecked") 933 private void resolveIODataset(Element eAppXml) throws CoordinatorJobException { 934 // Resolving input-events/data-in 935 Element inputList = eAppXml.getChild("input-events", eAppXml.getNamespace()); 936 if (inputList != null) { 937 for (Element dataIn : (List<Element>) inputList.getChildren("data-in", eAppXml.getNamespace())) { 938 resolveAttribute("dataset", dataIn, evalInst); 939 940 } 941 } 942 // Resolving output-events/data-out 943 Element outputList = eAppXml.getChild("output-events", eAppXml.getNamespace()); 944 if (outputList != null) { 945 for (Element dataOut : (List<Element>) outputList.getChildren("data-out", eAppXml.getNamespace())) { 946 resolveAttribute("dataset", dataOut, evalInst); 947 948 } 949 } 950 951 } 952 953 954 /** 955 * Add an attribute into XML element. 956 * 957 * @param attrName :attribute name 958 * @param elem : Element to add attribute 959 * @param value :Value of attribute 960 */ 961 private void addAnAttribute(String attrName, Element elem, String value) { 962 elem.setAttribute(attrName, value); 963 } 964 965 /** 966 * Resolve datasets using job configuration. 967 * 968 * @param eAppXml : Job Element XML 969 * @throws Exception thrown if failed to resolve datasets 970 */ 971 @SuppressWarnings("unchecked") 972 private void resolveDataSets(Element eAppXml) throws Exception { 973 Element datasetList = eAppXml.getChild("datasets", eAppXml.getNamespace()); 974 if (datasetList != null) { 975 976 List<Element> dsElems = datasetList.getChildren("dataset", eAppXml.getNamespace()); 977 resolveDataSets(dsElems); 978 resolveTagContents("app-path", eAppXml.getChild("action", eAppXml.getNamespace()).getChild("workflow", 979 eAppXml.getNamespace()), evalNofuncs); 980 } 981 } 982 983 /** 984 * Resolve datasets using job configuration. 985 * 986 * @param dsElems : Data set XML element. 987 * @throws CoordinatorJobException thrown if failed to resolve datasets 988 */ 989 private void resolveDataSets(List<Element> dsElems) throws CoordinatorJobException { 990 for (Element dsElem : dsElems) { 991 // Setting up default TimeUnit and EndOFDuraion 992 evalFreq.setVariable("timeunit", TimeUnit.MINUTE); 993 evalFreq.setVariable("endOfDuration", TimeUnit.NONE); 994 995 String val = resolveAttribute("frequency", dsElem, evalFreq); 996 int ival = ParamChecker.checkInteger(val, "frequency"); 997 ParamChecker.checkGTZero(ival, "frequency"); 998 addAnAttribute("freq_timeunit", dsElem, evalFreq.getVariable("timeunit") == null ? TimeUnit.MINUTE 999 .toString() : ((TimeUnit) evalFreq.getVariable("timeunit")).toString()); 1000 addAnAttribute("end_of_duration", dsElem, evalFreq.getVariable("endOfDuration") == null ? TimeUnit.NONE 1001 .toString() : ((TimeUnit) evalFreq.getVariable("endOfDuration")).toString()); 1002 val = resolveAttribute("initial-instance", dsElem, evalInitialInstance); 1003 ParamChecker.checkDateOozieTZ(val, "initial-instance"); 1004 checkInitialInstance(val); 1005 val = resolveAttribute("timezone", dsElem, evalNofuncs); 1006 ParamChecker.checkTimeZone(val, "timezone"); 1007 resolveTagContents("uri-template", dsElem, evalNofuncs); 1008 resolveTagContents("done-flag", dsElem, evalNofuncs); 1009 } 1010 } 1011 1012 /** 1013 * Resolve the content of a tag. 1014 * 1015 * @param tagName : Tag name of job XML i.e. <timeout> 10 </timeout> 1016 * @param elem : Element where the tag exists. 1017 * @param eval : EL evealuator 1018 * @return Resolved tag content. 1019 * @throws CoordinatorJobException thrown if failed to resolve tag content 1020 */ 1021 @SuppressWarnings("unchecked") 1022 private String resolveTagContents(String tagName, Element elem, ELEvaluator eval) throws CoordinatorJobException { 1023 String ret = ""; 1024 if (elem != null) { 1025 for (Element tagElem : (List<Element>) elem.getChildren(tagName, elem.getNamespace())) { 1026 if (tagElem != null) { 1027 String updated; 1028 try { 1029 updated = CoordELFunctions.evalAndWrap(eval, tagElem.getText().trim()); 1030 1031 } 1032 catch (Exception e) { 1033 throw new CoordinatorJobException(ErrorCode.E1004, e.getMessage(), e); 1034 } 1035 tagElem.removeContent(); 1036 tagElem.addContent(updated); 1037 ret += updated; 1038 } 1039 } 1040 } 1041 return ret; 1042 } 1043 1044 /** 1045 * Resolve an attribute value. 1046 * 1047 * @param attrName : Attribute name. 1048 * @param elem : XML Element where attribute is defiend 1049 * @param eval : ELEvaluator used to resolve 1050 * @return Resolved attribute value 1051 * @throws CoordinatorJobException thrown if failed to resolve an attribute value 1052 */ 1053 private String resolveAttribute(String attrName, Element elem, ELEvaluator eval) throws CoordinatorJobException { 1054 Attribute attr = elem.getAttribute(attrName); 1055 String val = null; 1056 if (attr != null) { 1057 try { 1058 val = CoordELFunctions.evalAndWrap(eval, attr.getValue().trim()); 1059 } 1060 catch (Exception e) { 1061 throw new CoordinatorJobException(ErrorCode.E1004, e.getMessage(), e); 1062 } 1063 attr.setValue(val); 1064 } 1065 return val; 1066 } 1067 1068 /** 1069 * Include referred datasets into XML. 1070 * 1071 * @param resolvedXml : Job XML element. 1072 * @param conf : Job configuration 1073 * @throws CoordinatorJobException thrown if failed to include referred datasets into XML 1074 */ 1075 @SuppressWarnings("unchecked") 1076 protected void includeDataSets(Element resolvedXml, Configuration conf) throws CoordinatorJobException { 1077 Element datasets = resolvedXml.getChild("datasets", resolvedXml.getNamespace()); 1078 Element allDataSets = new Element("all_datasets", resolvedXml.getNamespace()); 1079 List<String> dsList = new ArrayList<String>(); 1080 if (datasets != null) { 1081 for (Element includeElem : (List<Element>) datasets.getChildren("include", datasets.getNamespace())) { 1082 String incDSFile = includeElem.getTextTrim(); 1083 includeOneDSFile(incDSFile, dsList, allDataSets, datasets.getNamespace()); 1084 } 1085 for (Element e : (List<Element>) datasets.getChildren("dataset", datasets.getNamespace())) { 1086 String dsName = e.getAttributeValue("name"); 1087 if (dsList.contains(dsName)) {// Override with this DS 1088 // Remove duplicate 1089 removeDataSet(allDataSets, dsName); 1090 } 1091 else { 1092 dsList.add(dsName); 1093 } 1094 allDataSets.addContent((Element) e.clone()); 1095 } 1096 } 1097 insertDataSet(resolvedXml, allDataSets); 1098 resolvedXml.removeChild("datasets", resolvedXml.getNamespace()); 1099 } 1100 1101 /** 1102 * Include one dataset file. 1103 * 1104 * @param incDSFile : Include data set filename. 1105 * @param dsList :List of dataset names to verify the duplicate. 1106 * @param allDataSets : Element that includes all dataset definitions. 1107 * @param dsNameSpace : Data set name space 1108 * @throws CoordinatorJobException thrown if failed to include one dataset file 1109 */ 1110 @SuppressWarnings("unchecked") 1111 private void includeOneDSFile(String incDSFile, List<String> dsList, Element allDataSets, Namespace dsNameSpace) 1112 throws CoordinatorJobException { 1113 Element tmpDataSets = null; 1114 try { 1115 String dsXml = readDefinition(incDSFile); 1116 LOG.debug("DSFILE :" + incDSFile + "\n" + dsXml); 1117 tmpDataSets = XmlUtils.parseXml(dsXml); 1118 } 1119 catch (JDOMException e) { 1120 LOG.warn("Error parsing included dataset [{0}]. Message [{1}]", incDSFile, e.getMessage()); 1121 throw new CoordinatorJobException(ErrorCode.E0700, e.getMessage()); 1122 } 1123 resolveDataSets(tmpDataSets.getChildren("dataset")); 1124 for (Element e : (List<Element>) tmpDataSets.getChildren("dataset")) { 1125 String dsName = e.getAttributeValue("name"); 1126 if (dsList.contains(dsName)) { 1127 throw new RuntimeException("Duplicate Dataset " + dsName); 1128 } 1129 dsList.add(dsName); 1130 Element tmp = (Element) e.clone(); 1131 // TODO: Don't like to over-write the external/include DS's namespace 1132 tmp.setNamespace(dsNameSpace); 1133 tmp.getChild("uri-template").setNamespace(dsNameSpace); 1134 if (e.getChild("done-flag") != null) { 1135 tmp.getChild("done-flag").setNamespace(dsNameSpace); 1136 } 1137 allDataSets.addContent(tmp); 1138 } 1139 // nested include 1140 for (Element includeElem : (List<Element>) tmpDataSets.getChildren("include", tmpDataSets.getNamespace())) { 1141 String incFile = includeElem.getTextTrim(); 1142 includeOneDSFile(incFile, dsList, allDataSets, dsNameSpace); 1143 } 1144 } 1145 1146 /** 1147 * Remove a dataset from a list of dataset. 1148 * 1149 * @param eDatasets : List of dataset 1150 * @param name : Dataset name to be removed. 1151 */ 1152 @SuppressWarnings("unchecked") 1153 private static void removeDataSet(Element eDatasets, String name) { 1154 for (Element eDataset : (List<Element>) eDatasets.getChildren("dataset", eDatasets.getNamespace())) { 1155 if (eDataset.getAttributeValue("name").equals(name)) { 1156 eDataset.detach(); 1157 return; 1158 } 1159 } 1160 throw new RuntimeException("undefined dataset: " + name); 1161 } 1162 1163 /** 1164 * Read coordinator definition. 1165 * 1166 * @param appPath application path. 1167 * @return coordinator definition. 1168 * @throws CoordinatorJobException thrown if the definition could not be read. 1169 */ 1170 protected String readDefinition(String appPath) throws CoordinatorJobException { 1171 String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME); 1172 // Configuration confHadoop = CoordUtils.getHadoopConf(conf); 1173 try { 1174 URI uri = new URI(appPath); 1175 LOG.debug("user =" + user); 1176 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 1177 Configuration fsConf = has.createJobConf(uri.getAuthority()); 1178 FileSystem fs = has.createFileSystem(user, uri, fsConf); 1179 Path appDefPath = null; 1180 1181 // app path could be a directory 1182 Path path = new Path(uri.getPath()); 1183 // check file exists for dataset include file, app xml already checked 1184 if (!fs.exists(path)) { 1185 throw new URISyntaxException(path.toString(), "path not existed : " + path.toString()); 1186 } 1187 if (!fs.isFile(path)) { 1188 appDefPath = new Path(path, COORDINATOR_XML_FILE); 1189 } else { 1190 appDefPath = path; 1191 } 1192 1193 Reader reader = new InputStreamReader(fs.open(appDefPath)); 1194 StringWriter writer = new StringWriter(); 1195 IOUtils.copyCharStream(reader, writer); 1196 return writer.toString(); 1197 } 1198 catch (IOException ex) { 1199 LOG.warn("IOException :" + XmlUtils.prettyPrint(conf), ex); 1200 throw new CoordinatorJobException(ErrorCode.E1001, ex.getMessage(), ex); 1201 } 1202 catch (URISyntaxException ex) { 1203 LOG.warn("URISyException :" + ex.getMessage()); 1204 throw new CoordinatorJobException(ErrorCode.E1002, appPath, ex.getMessage(), ex); 1205 } 1206 catch (HadoopAccessorException ex) { 1207 throw new CoordinatorJobException(ex); 1208 } 1209 catch (Exception ex) { 1210 LOG.warn("Exception :", ex); 1211 throw new CoordinatorJobException(ErrorCode.E1001, ex.getMessage(), ex); 1212 } 1213 } 1214 1215 /** 1216 * Write a coordinator job into database 1217 * 1218 *@param appXML : Coordinator definition xml 1219 * @param eJob : XML element of job 1220 * @param coordJob : Coordinator job bean 1221 * @return Job id 1222 * @throws CommandException thrown if unable to save coordinator job to db 1223 */ 1224 protected String storeToDB(String appXML, Element eJob, CoordinatorJobBean coordJob) throws CommandException { 1225 String jobId = Services.get().get(UUIDService.class).generateId(ApplicationType.COORDINATOR); 1226 coordJob.setId(jobId); 1227 1228 coordJob.setAppPath(conf.get(OozieClient.COORDINATOR_APP_PATH)); 1229 coordJob.setCreatedTime(new Date()); 1230 coordJob.setUser(conf.get(OozieClient.USER_NAME)); 1231 String group = ConfigUtils.getWithDeprecatedCheck(conf, OozieClient.JOB_ACL, OozieClient.GROUP_NAME, null); 1232 coordJob.setGroup(group); 1233 coordJob.setConf(XmlUtils.prettyPrint(conf).toString()); 1234 coordJob.setJobXml(XmlUtils.prettyPrint(eJob).toString()); 1235 coordJob.setLastActionNumber(0); 1236 coordJob.setLastModifiedTime(new Date()); 1237 1238 if (!dryrun) { 1239 coordJob.setLastModifiedTime(new Date()); 1240 try { 1241 CoordJobQueryExecutor.getInstance().insert(coordJob); 1242 } 1243 catch (JPAExecutorException jpaee) { 1244 coordJob.setId(null); 1245 coordJob.setStatus(CoordinatorJob.Status.FAILED); 1246 throw new CommandException(jpaee); 1247 } 1248 } 1249 return jobId; 1250 } 1251 1252 /* 1253 * this method checks if the initial-instance specified for a particular 1254 is not a date earlier than the oozie server default Jan 01, 1970 00:00Z UTC 1255 */ 1256 private void checkInitialInstance(String val) throws CoordinatorJobException, IllegalArgumentException { 1257 Date initialInstance, givenInstance; 1258 try { 1259 initialInstance = DateUtils.parseDateUTC("1970-01-01T00:00Z"); 1260 givenInstance = DateUtils.parseDateOozieTZ(val); 1261 } 1262 catch (Exception e) { 1263 throw new IllegalArgumentException("Unable to parse dataset initial-instance string '" + val + 1264 "' to Date object. ",e); 1265 } 1266 if(givenInstance.compareTo(initialInstance) < 0) { 1267 throw new CoordinatorJobException(ErrorCode.E1021, "Dataset initial-instance " + val + 1268 " is earlier than the default initial instance " + DateUtils.formatDateOozieTZ(initialInstance)); 1269 } 1270 } 1271 1272 /* (non-Javadoc) 1273 * @see org.apache.oozie.command.XCommand#getEntityKey() 1274 */ 1275 @Override 1276 public String getEntityKey() { 1277 return null; 1278 } 1279 1280 /* (non-Javadoc) 1281 * @see org.apache.oozie.command.XCommand#isLockRequired() 1282 */ 1283 @Override 1284 protected boolean isLockRequired() { 1285 return false; 1286 } 1287 1288 /* (non-Javadoc) 1289 * @see org.apache.oozie.command.XCommand#loadState() 1290 */ 1291 @Override 1292 protected void loadState() throws CommandException { 1293 jpaService = Services.get().get(JPAService.class); 1294 if (jpaService == null) { 1295 throw new CommandException(ErrorCode.E0610); 1296 } 1297 coordJob = new CoordinatorJobBean(); 1298 if (this.bundleId != null) { 1299 // this coord job is created from bundle 1300 coordJob.setBundleId(this.bundleId); 1301 // first use bundle id if submit thru bundle 1302 LogUtils.setLogInfo(this.bundleId); 1303 } 1304 if (this.coordName != null) { 1305 // this coord job is created from bundle 1306 coordJob.setAppName(this.coordName); 1307 } 1308 setJob(coordJob); 1309 1310 } 1311 1312 /* (non-Javadoc) 1313 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 1314 */ 1315 @Override 1316 protected void verifyPrecondition() throws CommandException { 1317 1318 } 1319 1320 /* (non-Javadoc) 1321 * @see org.apache.oozie.command.TransitionXCommand#notifyParent() 1322 */ 1323 @Override 1324 public void notifyParent() throws CommandException { 1325 // update bundle action 1326 if (coordJob.getBundleId() != null) { 1327 LOG.debug("Updating bundle record: " + coordJob.getBundleId() + " for coord id: " + coordJob.getId()); 1328 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus); 1329 bundleStatusUpdate.call(); 1330 } 1331 } 1332 1333 /* (non-Javadoc) 1334 * @see org.apache.oozie.command.TransitionXCommand#updateJob() 1335 */ 1336 @Override 1337 public void updateJob() throws CommandException { 1338 } 1339 1340 /* (non-Javadoc) 1341 * @see org.apache.oozie.command.TransitionXCommand#getJob() 1342 */ 1343 @Override 1344 public Job getJob() { 1345 return coordJob; 1346 } 1347 1348 @Override 1349 public void performWrites() throws CommandException { 1350 } 1351}