001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.coord; 019 020 import java.io.IOException; 021 import java.io.InputStreamReader; 022 import java.io.Reader; 023 import java.io.StringReader; 024 import java.io.StringWriter; 025 import java.net.URI; 026 import java.net.URISyntaxException; 027 import java.util.ArrayList; 028 import java.util.Date; 029 import java.util.HashMap; 030 import java.util.HashSet; 031 import java.util.List; 032 import java.util.Map; 033 import java.util.Set; 034 import java.util.TreeSet; 035 036 import javax.xml.transform.stream.StreamSource; 037 import javax.xml.validation.Validator; 038 039 import org.apache.hadoop.conf.Configuration; 040 import org.apache.hadoop.fs.FileSystem; 041 import org.apache.hadoop.fs.Path; 042 import org.apache.oozie.CoordinatorJobBean; 043 import org.apache.oozie.ErrorCode; 044 import org.apache.oozie.client.CoordinatorJob; 045 import org.apache.oozie.client.Job; 046 import org.apache.oozie.client.OozieClient; 047 import org.apache.oozie.client.CoordinatorJob.Execution; 048 import org.apache.oozie.command.CommandException; 049 import org.apache.oozie.command.SubmitTransitionXCommand; 050 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand; 051 import org.apache.oozie.coord.CoordELEvaluator; 052 import org.apache.oozie.coord.CoordELFunctions; 053 import org.apache.oozie.coord.CoordinatorJobException; 054 import org.apache.oozie.coord.TimeUnit; 055 import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor; 056 import org.apache.oozie.executor.jpa.JPAExecutorException; 057 import org.apache.oozie.service.DagXLogInfoService; 058 import org.apache.oozie.service.HadoopAccessorException; 059 import org.apache.oozie.service.HadoopAccessorService; 060 import org.apache.oozie.service.JPAService; 061 import org.apache.oozie.service.SchemaService; 062 import org.apache.oozie.service.Service; 063 import org.apache.oozie.service.Services; 064 import org.apache.oozie.service.UUIDService; 065 import org.apache.oozie.service.WorkflowAppService; 066 import org.apache.oozie.service.SchemaService.SchemaName; 067 import org.apache.oozie.service.UUIDService.ApplicationType; 068 import org.apache.oozie.util.DateUtils; 069 import org.apache.oozie.util.ELEvaluator; 070 import org.apache.oozie.util.IOUtils; 071 import org.apache.oozie.util.InstrumentUtils; 072 import org.apache.oozie.util.LogUtils; 073 import org.apache.oozie.util.ParamChecker; 074 import org.apache.oozie.util.PropertiesUtils; 075 import org.apache.oozie.util.XConfiguration; 076 import org.apache.oozie.util.XLog; 077 import org.apache.oozie.util.XmlUtils; 078 import org.jdom.Attribute; 079 import org.jdom.Element; 080 import org.jdom.JDOMException; 081 import org.jdom.Namespace; 082 import org.xml.sax.SAXException; 083 084 /** 085 * This class provides the functionalities to resolve a coordinator job XML and write the job information into a DB 086 * table. 087 * <p/> 088 * Specifically it performs the following functions: 1. Resolve all the variables or properties using job 089 * configurations. 2. Insert all datasets definition as part of the <data-in> and <data-out> tags. 3. Validate the XML 090 * at runtime. 091 */ 092 public class CoordSubmitXCommand extends SubmitTransitionXCommand { 093 094 private Configuration conf; 095 private final String authToken; 096 private final String bundleId; 097 private final String coordName; 098 private boolean dryrun; 099 private JPAService jpaService = null; 100 private CoordinatorJob.Status prevStatus = CoordinatorJob.Status.PREP; 101 102 public static final String CONFIG_DEFAULT = "coord-config-default.xml"; 103 public static final String COORDINATOR_XML_FILE = "coordinator.xml"; 104 105 private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>(); 106 private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>(); 107 108 private CoordinatorJobBean coordJob = null; 109 /** 110 * Default timeout for normal jobs, in minutes, after which coordinator input check will timeout 111 */ 112 public static final String CONF_DEFAULT_TIMEOUT_NORMAL = Service.CONF_PREFIX + "coord.normal.default.timeout"; 113 114 public static final String CONF_DEFAULT_CONCURRENCY = Service.CONF_PREFIX + "coord.default.concurrency"; 115 116 public static final String CONF_DEFAULT_THROTTLE = Service.CONF_PREFIX + "coord.default.throttle"; 117 118 public static final String CONF_MAT_THROTTLING_FACTOR = Service.CONF_PREFIX 119 + "coord.materialization.throttling.factor"; 120 121 /** 122 * Default MAX timeout in minutes, after which coordinator input check will timeout 123 */ 124 public static final String CONF_DEFAULT_MAX_TIMEOUT = Service.CONF_PREFIX + "coord.default.max.timeout"; 125 126 127 public static final String CONF_QUEUE_SIZE = Service.CONF_PREFIX + "CallableQueueService.queue.size"; 128 129 private ELEvaluator evalFreq = null; 130 private ELEvaluator evalNofuncs = null; 131 private ELEvaluator evalData = null; 132 private ELEvaluator evalInst = null; 133 private ELEvaluator evalSla = null; 134 135 static { 136 String[] badUserProps = { PropertiesUtils.YEAR, PropertiesUtils.MONTH, PropertiesUtils.DAY, 137 PropertiesUtils.HOUR, PropertiesUtils.MINUTE, PropertiesUtils.DAYS, PropertiesUtils.HOURS, 138 PropertiesUtils.MINUTES, PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB, 139 PropertiesUtils.TB, PropertiesUtils.PB, PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN, 140 PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN, PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS }; 141 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES); 142 143 String[] badDefaultProps = { PropertiesUtils.HADOOP_USER, PropertiesUtils.HADOOP_UGI, 144 WorkflowAppService.HADOOP_JT_KERBEROS_NAME, WorkflowAppService.HADOOP_NN_KERBEROS_NAME }; 145 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES); 146 PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES); 147 } 148 149 /** 150 * Constructor to create the Coordinator Submit Command. 151 * 152 * @param conf : Configuration for Coordinator job 153 * @param authToken : To be used for authentication 154 */ 155 public CoordSubmitXCommand(Configuration conf, String authToken) { 156 super("coord_submit", "coord_submit", 1); 157 this.conf = ParamChecker.notNull(conf, "conf"); 158 this.authToken = ParamChecker.notEmpty(authToken, "authToken"); 159 this.bundleId = null; 160 this.coordName = null; 161 } 162 163 /** 164 * Constructor to create the Coordinator Submit Command by bundle job. 165 * 166 * @param conf : Configuration for Coordinator job 167 * @param authToken : To be used for authentication 168 * @param bundleId : bundle id 169 * @param coordName : coord name 170 */ 171 public CoordSubmitXCommand(Configuration conf, String authToken, String bundleId, String coordName) { 172 super("coord_submit", "coord_submit", 1); 173 this.conf = ParamChecker.notNull(conf, "conf"); 174 this.authToken = ParamChecker.notEmpty(authToken, "authToken"); 175 this.bundleId = ParamChecker.notEmpty(bundleId, "bundleId"); 176 this.coordName = ParamChecker.notEmpty(coordName, "coordName"); 177 } 178 179 /** 180 * Constructor to create the Coordinator Submit Command. 181 * 182 * @param dryrun : if dryrun 183 * @param conf : Configuration for Coordinator job 184 * @param authToken : To be used for authentication 185 */ 186 public CoordSubmitXCommand(boolean dryrun, Configuration conf, String authToken) { 187 this(conf, authToken); 188 this.dryrun = dryrun; 189 } 190 191 /* (non-Javadoc) 192 * @see org.apache.oozie.command.XCommand#execute() 193 */ 194 @Override 195 protected String submit() throws CommandException { 196 String jobId = null; 197 LOG.info("STARTED Coordinator Submit"); 198 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation()); 199 200 boolean exceptionOccured = false; 201 try { 202 mergeDefaultConfig(); 203 204 String appXml = readAndValidateXml(); 205 coordJob.setOrigJobXml(appXml); 206 LOG.debug("jobXml after initial validation " + XmlUtils.prettyPrint(appXml).toString()); 207 208 String appNamespace = readAppNamespace(appXml); 209 coordJob.setAppNamespace(appNamespace); 210 211 appXml = XmlUtils.removeComments(appXml); 212 initEvaluators(); 213 Element eJob = basicResolveAndIncludeDS(appXml, conf, coordJob); 214 LOG.debug("jobXml after all validation " + XmlUtils.prettyPrint(eJob).toString()); 215 216 jobId = storeToDB(eJob, coordJob); 217 // log job info for coordinator job 218 LogUtils.setLogInfo(coordJob, logInfo); 219 LOG = XLog.resetPrefix(LOG); 220 221 if (!dryrun) { 222 // submit a command to materialize jobs for the next 1 hour (3600 secs) 223 // so we don't wait 10 mins for the Service to run. 224 queue(new CoordMaterializeTransitionXCommand(jobId, 3600), 100); 225 } 226 else { 227 Date startTime = coordJob.getStartTime(); 228 long startTimeMilli = startTime.getTime(); 229 long endTimeMilli = startTimeMilli + (3600 * 1000); 230 Date jobEndTime = coordJob.getEndTime(); 231 Date endTime = new Date(endTimeMilli); 232 if (endTime.compareTo(jobEndTime) > 0) { 233 endTime = jobEndTime; 234 } 235 jobId = coordJob.getId(); 236 LOG.info("[" + jobId + "]: Update status to RUNNING"); 237 coordJob.setStatus(Job.Status.RUNNING); 238 coordJob.setPending(); 239 CoordActionMaterializeCommand coordActionMatCom = new CoordActionMaterializeCommand(jobId, startTime, 240 endTime); 241 Configuration jobConf = null; 242 try { 243 jobConf = new XConfiguration(new StringReader(coordJob.getConf())); 244 } 245 catch (IOException e1) { 246 LOG.warn("Configuration parse error. read from DB :" + coordJob.getConf(), e1); 247 } 248 String action = coordActionMatCom.materializeJobs(true, coordJob, jobConf, null); 249 String output = coordJob.getJobXml() + System.getProperty("line.separator") 250 + "***actions for instance***" + action; 251 return output; 252 } 253 } 254 catch (CoordinatorJobException cex) { 255 exceptionOccured = true; 256 LOG.warn("ERROR: ", cex); 257 throw new CommandException(cex); 258 } 259 catch (IllegalArgumentException iex) { 260 exceptionOccured = true; 261 LOG.warn("ERROR: ", iex); 262 throw new CommandException(ErrorCode.E1003, iex); 263 } 264 catch (Exception ex) { 265 exceptionOccured = true; 266 LOG.warn("ERROR: ", ex); 267 throw new CommandException(ErrorCode.E0803, ex); 268 } 269 finally { 270 if (exceptionOccured) { 271 if(coordJob.getId() == null || coordJob.getId().equalsIgnoreCase("")){ 272 coordJob.setStatus(CoordinatorJob.Status.FAILED); 273 coordJob.resetPending(); 274 } 275 } 276 } 277 278 LOG.info("ENDED Coordinator Submit jobId=" + jobId); 279 return jobId; 280 } 281 282 /** 283 * Read the application XML and validate against coordinator Schema 284 * 285 * @return validated coordinator XML 286 * @throws CoordinatorJobException thrown if unable to read or validate coordinator xml 287 */ 288 private String readAndValidateXml() throws CoordinatorJobException { 289 String appPath = ParamChecker.notEmpty(conf.get(OozieClient.COORDINATOR_APP_PATH), 290 OozieClient.COORDINATOR_APP_PATH); 291 String coordXml = readDefinition(appPath); 292 validateXml(coordXml); 293 return coordXml; 294 } 295 296 /** 297 * Validate against Coordinator XSD file 298 * 299 * @param xmlContent : Input coordinator xml 300 * @throws CoordinatorJobException thrown if unable to validate coordinator xml 301 */ 302 private void validateXml(String xmlContent) throws CoordinatorJobException { 303 javax.xml.validation.Schema schema = Services.get().get(SchemaService.class).getSchema(SchemaName.COORDINATOR); 304 Validator validator = schema.newValidator(); 305 try { 306 validator.validate(new StreamSource(new StringReader(xmlContent))); 307 } 308 catch (SAXException ex) { 309 LOG.warn("SAXException :", ex); 310 throw new CoordinatorJobException(ErrorCode.E0701, ex.getMessage(), ex); 311 } 312 catch (IOException ex) { 313 LOG.warn("IOException :", ex); 314 throw new CoordinatorJobException(ErrorCode.E0702, ex.getMessage(), ex); 315 } 316 } 317 318 /** 319 * Read the application XML schema namespace 320 * 321 * @param xmlContent input coordinator xml 322 * @return app xml namespace 323 * @throws CoordinatorJobException 324 */ 325 private String readAppNamespace(String xmlContent) throws CoordinatorJobException { 326 try { 327 Element coordXmlElement = XmlUtils.parseXml(xmlContent); 328 Namespace ns = coordXmlElement.getNamespace(); 329 if (ns != null && bundleId != null && ns.getURI().equals(SchemaService.COORDINATOR_NAMESPACE_URI_1)) { 330 throw new CoordinatorJobException(ErrorCode.E1319, "bundle app can not submit coordinator namespace " 331 + SchemaService.COORDINATOR_NAMESPACE_URI_1 + ", please use 0.2 or later"); 332 } 333 if (ns != null) { 334 return ns.getURI(); 335 } 336 else { 337 throw new CoordinatorJobException(ErrorCode.E0700, "the application xml namespace is not given"); 338 } 339 } 340 catch (JDOMException ex) { 341 LOG.warn("JDOMException :", ex); 342 throw new CoordinatorJobException(ErrorCode.E0700, ex.getMessage(), ex); 343 } 344 } 345 346 /** 347 * Merge default configuration with user-defined configuration. 348 * 349 * @throws CommandException thrown if failed to read or merge configurations 350 */ 351 protected void mergeDefaultConfig() throws CommandException { 352 Path configDefault = null; 353 try { 354 String coordAppPathStr = conf.get(OozieClient.COORDINATOR_APP_PATH); 355 Path coordAppPath = new Path(coordAppPathStr); 356 String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME); 357 String group = ParamChecker.notEmpty(conf.get(OozieClient.GROUP_NAME), OozieClient.GROUP_NAME); 358 FileSystem fs = Services.get().get(HadoopAccessorService.class).createFileSystem(user, group, 359 coordAppPath.toUri(), new Configuration()); 360 361 // app path could be a directory 362 if (!fs.isFile(coordAppPath)) { 363 configDefault = new Path(coordAppPath, CONFIG_DEFAULT); 364 } else { 365 configDefault = new Path(coordAppPath.getParent(), CONFIG_DEFAULT); 366 } 367 368 if (fs.exists(configDefault)) { 369 Configuration defaultConf = new XConfiguration(fs.open(configDefault)); 370 PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES); 371 XConfiguration.injectDefaults(defaultConf, conf); 372 } 373 else { 374 LOG.info("configDefault Doesn't exist " + configDefault); 375 } 376 PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES); 377 378 // Resolving all variables in the job properties. 379 // This ensures the Hadoop Configuration semantics is preserved. 380 XConfiguration resolvedVarsConf = new XConfiguration(); 381 for (Map.Entry<String, String> entry : conf) { 382 resolvedVarsConf.set(entry.getKey(), conf.get(entry.getKey())); 383 } 384 conf = resolvedVarsConf; 385 } 386 catch (IOException e) { 387 throw new CommandException(ErrorCode.E0702, e.getMessage() + " : Problem reading default config " 388 + configDefault, e); 389 } 390 catch (HadoopAccessorException e) { 391 throw new CommandException(e); 392 } 393 LOG.debug("Merged CONF :" + XmlUtils.prettyPrint(conf).toString()); 394 } 395 396 /** 397 * The method resolve all the variables that are defined in configuration. It also include the data set definition 398 * from dataset file into XML. 399 * 400 * @param appXml : Original job XML 401 * @param conf : Configuration of the job 402 * @param coordJob : Coordinator job bean to be populated. 403 * @return Resolved and modified job XML element. 404 * @throws CoordinatorJobException thrown if failed to resolve basic entities or include referred datasets 405 * @throws Exception thrown if failed to resolve basic entities or include referred datasets 406 */ 407 public Element basicResolveAndIncludeDS(String appXml, Configuration conf, CoordinatorJobBean coordJob) 408 throws CoordinatorJobException, Exception { 409 Element basicResolvedApp = resolveInitial(conf, appXml, coordJob); 410 includeDataSets(basicResolvedApp, conf); 411 return basicResolvedApp; 412 } 413 414 /** 415 * Insert data set into data-in and data-out tags. 416 * 417 * @param eAppXml : coordinator application XML 418 * @param eDatasets : DataSet XML 419 */ 420 @SuppressWarnings("unchecked") 421 private void insertDataSet(Element eAppXml, Element eDatasets) { 422 // Adding DS definition in the coordinator XML 423 Element inputList = eAppXml.getChild("input-events", eAppXml.getNamespace()); 424 if (inputList != null) { 425 for (Element dataIn : (List<Element>) inputList.getChildren("data-in", eAppXml.getNamespace())) { 426 Element eDataset = findDataSet(eDatasets, dataIn.getAttributeValue("dataset")); 427 dataIn.getContent().add(0, eDataset); 428 } 429 } 430 Element outputList = eAppXml.getChild("output-events", eAppXml.getNamespace()); 431 if (outputList != null) { 432 for (Element dataOut : (List<Element>) outputList.getChildren("data-out", eAppXml.getNamespace())) { 433 Element eDataset = findDataSet(eDatasets, dataOut.getAttributeValue("dataset")); 434 dataOut.getContent().add(0, eDataset); 435 } 436 } 437 } 438 439 /** 440 * Find a specific dataset from a list of Datasets. 441 * 442 * @param eDatasets : List of data sets 443 * @param name : queried data set name 444 * @return one Dataset element. otherwise throw Exception 445 */ 446 @SuppressWarnings("unchecked") 447 private static Element findDataSet(Element eDatasets, String name) { 448 for (Element eDataset : (List<Element>) eDatasets.getChildren("dataset", eDatasets.getNamespace())) { 449 if (eDataset.getAttributeValue("name").equals(name)) { 450 eDataset = (Element) eDataset.clone(); 451 eDataset.detach(); 452 return eDataset; 453 } 454 } 455 throw new RuntimeException("undefined dataset: " + name); 456 } 457 458 /** 459 * Initialize all the required EL Evaluators. 460 */ 461 protected void initEvaluators() { 462 evalFreq = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-submit-freq"); 463 evalNofuncs = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-submit-nofuncs"); 464 evalInst = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-submit-instances"); 465 evalSla = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-sla-submit"); 466 } 467 468 /** 469 * Resolve basic entities using job Configuration. 470 * 471 * @param conf :Job configuration 472 * @param appXml : Original job XML 473 * @param coordJob : Coordinator job bean to be populated. 474 * @return Resolved job XML element. 475 * @throws CoordinatorJobException thrown if failed to resolve basic entities 476 * @throws Exception thrown if failed to resolve basic entities 477 */ 478 @SuppressWarnings("unchecked") 479 protected Element resolveInitial(Configuration conf, String appXml, CoordinatorJobBean coordJob) 480 throws CoordinatorJobException, Exception { 481 Element eAppXml = XmlUtils.parseXml(appXml); 482 // job's main attributes 483 // frequency 484 String val = resolveAttribute("frequency", eAppXml, evalFreq); 485 int ival = ParamChecker.checkInteger(val, "frequency"); 486 ParamChecker.checkGTZero(ival, "frequency"); 487 coordJob.setFrequency(ival); 488 TimeUnit tmp = (evalFreq.getVariable("timeunit") == null) ? TimeUnit.MINUTE : ((TimeUnit) evalFreq 489 .getVariable("timeunit")); 490 addAnAttribute("freq_timeunit", eAppXml, tmp.toString()); 491 // TimeUnit 492 coordJob.setTimeUnit(CoordinatorJob.Timeunit.valueOf(tmp.toString())); 493 // End Of Duration 494 tmp = evalFreq.getVariable("endOfDuration") == null ? TimeUnit.NONE : ((TimeUnit) evalFreq 495 .getVariable("endOfDuration")); 496 addAnAttribute("end_of_duration", eAppXml, tmp.toString()); 497 // coordJob.setEndOfDuration(tmp) // TODO: Add new attribute in Job bean 498 499 // Application name 500 if (this.coordName == null) { 501 val = resolveAttribute("name", eAppXml, evalNofuncs); 502 coordJob.setAppName(val); 503 } 504 else { 505 // this coord job is created from bundle 506 coordJob.setAppName(this.coordName); 507 } 508 509 // start time 510 val = resolveAttribute("start", eAppXml, evalNofuncs); 511 ParamChecker.checkUTC(val, "start"); 512 coordJob.setStartTime(DateUtils.parseDateUTC(val)); 513 // end time 514 val = resolveAttribute("end", eAppXml, evalNofuncs); 515 ParamChecker.checkUTC(val, "end"); 516 coordJob.setEndTime(DateUtils.parseDateUTC(val)); 517 // Time zone 518 val = resolveAttribute("timezone", eAppXml, evalNofuncs); 519 ParamChecker.checkTimeZone(val, "timezone"); 520 coordJob.setTimeZone(val); 521 522 // controls 523 val = resolveTagContents("timeout", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs); 524 if (val == "") { 525 val = Services.get().getConf().get(CONF_DEFAULT_TIMEOUT_NORMAL); 526 } 527 528 ival = ParamChecker.checkInteger(val, "timeout"); 529 if (ival < 0 || ival > Services.get().getConf().getInt(CONF_DEFAULT_MAX_TIMEOUT, 129600)) { 530 ival = Services.get().getConf().getInt(CONF_DEFAULT_MAX_TIMEOUT, 129600); 531 } 532 coordJob.setTimeout(ival); 533 534 val = resolveTagContents("concurrency", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs); 535 if (val == null || val.isEmpty()) { 536 val = Services.get().getConf().get(CONF_DEFAULT_CONCURRENCY, "-1"); 537 } 538 ival = ParamChecker.checkInteger(val, "concurrency"); 539 coordJob.setConcurrency(ival); 540 541 val = resolveTagContents("throttle", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs); 542 if (val == null || val.isEmpty()) { 543 int defaultThrottle = Services.get().getConf().getInt(CONF_DEFAULT_THROTTLE, 12); 544 ival = defaultThrottle; 545 } 546 else { 547 ival = ParamChecker.checkInteger(val, "throttle"); 548 } 549 int maxQueue = Services.get().getConf().getInt(CONF_QUEUE_SIZE, 10000); 550 float factor = Services.get().getConf().getFloat(CONF_MAT_THROTTLING_FACTOR, 0.10f); 551 int maxThrottle = (int) (maxQueue * factor); 552 if (ival > maxThrottle || ival < 1) { 553 ival = maxThrottle; 554 } 555 LOG.debug("max throttle " + ival); 556 coordJob.setMatThrottling(ival); 557 558 val = resolveTagContents("execution", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs); 559 if (val == "") { 560 val = Execution.FIFO.toString(); 561 } 562 coordJob.setExecution(Execution.valueOf(val)); 563 String[] acceptedVals = { Execution.LIFO.toString(), Execution.FIFO.toString(), Execution.LAST_ONLY.toString() }; 564 ParamChecker.isMember(val, acceptedVals, "execution"); 565 566 // datasets 567 resolveTagContents("include", eAppXml.getChild("datasets", eAppXml.getNamespace()), evalNofuncs); 568 // for each data set 569 resolveDataSets(eAppXml); 570 HashMap<String, String> dataNameList = new HashMap<String, String>(); 571 resolveIOEvents(eAppXml, dataNameList); 572 573 resolveTagContents("app-path", eAppXml.getChild("action", eAppXml.getNamespace()).getChild("workflow", 574 eAppXml.getNamespace()), evalNofuncs); 575 // TODO: If action or workflow tag is missing, NullPointerException will 576 // occur 577 Element configElem = eAppXml.getChild("action", eAppXml.getNamespace()).getChild("workflow", 578 eAppXml.getNamespace()).getChild("configuration", eAppXml.getNamespace()); 579 evalData = CoordELEvaluator.createELEvaluatorForDataEcho(conf, "coord-job-submit-data", dataNameList); 580 if (configElem != null) { 581 for (Element propElem : (List<Element>) configElem.getChildren("property", configElem.getNamespace())) { 582 resolveTagContents("name", propElem, evalData); 583 // Want to check the data-integrity but don't want to modify the 584 // XML 585 // for properties only 586 Element tmpProp = (Element) propElem.clone(); 587 resolveTagContents("value", tmpProp, evalData); 588 } 589 } 590 resolveSLA(eAppXml, coordJob); 591 return eAppXml; 592 } 593 594 /** 595 * Resolve SLA events 596 * 597 * @param eAppXml job XML 598 * @param coordJob coordinator job bean 599 * @throws CommandException thrown if failed to resolve sla events 600 */ 601 private void resolveSLA(Element eAppXml, CoordinatorJobBean coordJob) throws CommandException { 602 Element eSla = eAppXml.getChild("action", eAppXml.getNamespace()).getChild("info", 603 Namespace.getNamespace(SchemaService.SLA_NAME_SPACE_URI)); 604 605 if (eSla != null) { 606 String slaXml = XmlUtils.prettyPrint(eSla).toString(); 607 try { 608 // EL evaluation 609 slaXml = evalSla.evaluate(slaXml, String.class); 610 // Validate against semantic SXD 611 XmlUtils.validateData(slaXml, SchemaName.SLA_ORIGINAL); 612 } 613 catch (Exception e) { 614 throw new CommandException(ErrorCode.E1004, "Validation ERROR :" + e.getMessage(), e); 615 } 616 } 617 } 618 619 /** 620 * Resolve input-events/data-in and output-events/data-out tags. 621 * 622 * @param eJob : Job element 623 * @throws CoordinatorJobException thrown if failed to resolve input and output events 624 */ 625 @SuppressWarnings("unchecked") 626 private void resolveIOEvents(Element eJobOrg, HashMap<String, String> dataNameList) throws CoordinatorJobException { 627 // Resolving input-events/data-in 628 // Clone the job and don't update anything in the original 629 Element eJob = (Element) eJobOrg.clone(); 630 Element inputList = eJob.getChild("input-events", eJob.getNamespace()); 631 if (inputList != null) { 632 TreeSet<String> eventNameSet = new TreeSet<String>(); 633 for (Element dataIn : (List<Element>) inputList.getChildren("data-in", eJob.getNamespace())) { 634 String dataInName = dataIn.getAttributeValue("name"); 635 dataNameList.put(dataInName, "data-in"); 636 // check whether there is any duplicate data-in name 637 if (eventNameSet.contains(dataInName)) { 638 throw new RuntimeException("Duplicate dataIn name " + dataInName); 639 } 640 else { 641 eventNameSet.add(dataInName); 642 } 643 resolveTagContents("instance", dataIn, evalInst); 644 resolveTagContents("start-instance", dataIn, evalInst); 645 resolveTagContents("end-instance", dataIn, evalInst); 646 } 647 } 648 // Resolving output-events/data-out 649 Element outputList = eJob.getChild("output-events", eJob.getNamespace()); 650 if (outputList != null) { 651 TreeSet<String> eventNameSet = new TreeSet<String>(); 652 for (Element dataOut : (List<Element>) outputList.getChildren("data-out", eJob.getNamespace())) { 653 String dataOutName = dataOut.getAttributeValue("name"); 654 dataNameList.put(dataOutName, "data-out"); 655 // check whether there is any duplicate data-out name 656 if (eventNameSet.contains(dataOutName)) { 657 throw new RuntimeException("Duplicate dataIn name " + dataOutName); 658 } 659 else { 660 eventNameSet.add(dataOutName); 661 } 662 resolveTagContents("instance", dataOut, evalInst); 663 } 664 } 665 666 } 667 668 /** 669 * Add an attribute into XML element. 670 * 671 * @param attrName :attribute name 672 * @param elem : Element to add attribute 673 * @param value :Value of attribute 674 */ 675 private void addAnAttribute(String attrName, Element elem, String value) { 676 elem.setAttribute(attrName, value); 677 } 678 679 /** 680 * Resolve datasets using job configuration. 681 * 682 * @param eAppXml : Job Element XML 683 * @throws Exception thrown if failed to resolve datasets 684 */ 685 @SuppressWarnings("unchecked") 686 private void resolveDataSets(Element eAppXml) throws Exception { 687 Element datasetList = eAppXml.getChild("datasets", eAppXml.getNamespace()); 688 if (datasetList != null) { 689 690 List<Element> dsElems = datasetList.getChildren("dataset", eAppXml.getNamespace()); 691 resolveDataSets(dsElems); 692 resolveTagContents("app-path", eAppXml.getChild("action", eAppXml.getNamespace()).getChild("workflow", 693 eAppXml.getNamespace()), evalNofuncs); 694 } 695 } 696 697 /** 698 * Resolve datasets using job configuration. 699 * 700 * @param dsElems : Data set XML element. 701 * @throws CoordinatorJobException thrown if failed to resolve datasets 702 */ 703 private void resolveDataSets(List<Element> dsElems) throws CoordinatorJobException { 704 for (Element dsElem : dsElems) { 705 // Setting up default TimeUnit and EndOFDuraion 706 evalFreq.setVariable("timeunit", TimeUnit.MINUTE); 707 evalFreq.setVariable("endOfDuration", TimeUnit.NONE); 708 709 String val = resolveAttribute("frequency", dsElem, evalFreq); 710 int ival = ParamChecker.checkInteger(val, "frequency"); 711 ParamChecker.checkGTZero(ival, "frequency"); 712 addAnAttribute("freq_timeunit", dsElem, evalFreq.getVariable("timeunit") == null ? TimeUnit.MINUTE 713 .toString() : ((TimeUnit) evalFreq.getVariable("timeunit")).toString()); 714 addAnAttribute("end_of_duration", dsElem, evalFreq.getVariable("endOfDuration") == null ? TimeUnit.NONE 715 .toString() : ((TimeUnit) evalFreq.getVariable("endOfDuration")).toString()); 716 val = resolveAttribute("initial-instance", dsElem, evalNofuncs); 717 ParamChecker.checkUTC(val, "initial-instance"); 718 val = resolveAttribute("timezone", dsElem, evalNofuncs); 719 ParamChecker.checkTimeZone(val, "timezone"); 720 resolveTagContents("uri-template", dsElem, evalNofuncs); 721 resolveTagContents("done-flag", dsElem, evalNofuncs); 722 } 723 } 724 725 /** 726 * Resolve the content of a tag. 727 * 728 * @param tagName : Tag name of job XML i.e. <timeout> 10 </timeout> 729 * @param elem : Element where the tag exists. 730 * @param eval : EL evealuator 731 * @return Resolved tag content. 732 * @throws CoordinatorJobException thrown if failed to resolve tag content 733 */ 734 @SuppressWarnings("unchecked") 735 private String resolveTagContents(String tagName, Element elem, ELEvaluator eval) throws CoordinatorJobException { 736 String ret = ""; 737 if (elem != null) { 738 for (Element tagElem : (List<Element>) elem.getChildren(tagName, elem.getNamespace())) { 739 if (tagElem != null) { 740 String updated; 741 try { 742 updated = CoordELFunctions.evalAndWrap(eval, tagElem.getText().trim()); 743 744 } 745 catch (Exception e) { 746 throw new CoordinatorJobException(ErrorCode.E1004, e.getMessage(), e); 747 } 748 tagElem.removeContent(); 749 tagElem.addContent(updated); 750 ret += updated; 751 } 752 } 753 } 754 return ret; 755 } 756 757 /** 758 * Resolve an attribute value. 759 * 760 * @param attrName : Attribute name. 761 * @param elem : XML Element where attribute is defiend 762 * @param eval : ELEvaluator used to resolve 763 * @return Resolved attribute value 764 * @throws CoordinatorJobException thrown if failed to resolve an attribute value 765 */ 766 private String resolveAttribute(String attrName, Element elem, ELEvaluator eval) throws CoordinatorJobException { 767 Attribute attr = elem.getAttribute(attrName); 768 String val = null; 769 if (attr != null) { 770 try { 771 val = CoordELFunctions.evalAndWrap(eval, attr.getValue().trim()); 772 } 773 catch (Exception e) { 774 throw new CoordinatorJobException(ErrorCode.E1004, e.getMessage(), e); 775 } 776 attr.setValue(val); 777 } 778 return val; 779 } 780 781 /** 782 * Include referred datasets into XML. 783 * 784 * @param resolvedXml : Job XML element. 785 * @param conf : Job configuration 786 * @throws CoordinatorJobException thrown if failed to include referred datasets into XML 787 */ 788 @SuppressWarnings("unchecked") 789 protected void includeDataSets(Element resolvedXml, Configuration conf) throws CoordinatorJobException { 790 Element datasets = resolvedXml.getChild("datasets", resolvedXml.getNamespace()); 791 Element allDataSets = new Element("all_datasets", resolvedXml.getNamespace()); 792 List<String> dsList = new ArrayList<String>(); 793 if (datasets != null) { 794 for (Element includeElem : (List<Element>) datasets.getChildren("include", datasets.getNamespace())) { 795 String incDSFile = includeElem.getTextTrim(); 796 includeOneDSFile(incDSFile, dsList, allDataSets, datasets.getNamespace()); 797 } 798 for (Element e : (List<Element>) datasets.getChildren("dataset", datasets.getNamespace())) { 799 String dsName = e.getAttributeValue("name"); 800 if (dsList.contains(dsName)) {// Override with this DS 801 // Remove old DS 802 removeDataSet(allDataSets, dsName); 803 } 804 else { 805 dsList.add(dsName); 806 } 807 allDataSets.addContent((Element) e.clone()); 808 } 809 } 810 insertDataSet(resolvedXml, allDataSets); 811 resolvedXml.removeChild("datasets", resolvedXml.getNamespace()); 812 } 813 814 /** 815 * Include one dataset file. 816 * 817 * @param incDSFile : Include data set filename. 818 * @param dsList :List of dataset names to verify the duplicate. 819 * @param allDataSets : Element that includes all dataset definitions. 820 * @param dsNameSpace : Data set name space 821 * @throws CoordinatorJobException thrown if failed to include one dataset file 822 */ 823 @SuppressWarnings("unchecked") 824 private void includeOneDSFile(String incDSFile, List<String> dsList, Element allDataSets, Namespace dsNameSpace) 825 throws CoordinatorJobException { 826 Element tmpDataSets = null; 827 try { 828 String dsXml = readDefinition(incDSFile); 829 LOG.debug("DSFILE :" + incDSFile + "\n" + dsXml); 830 tmpDataSets = XmlUtils.parseXml(dsXml); 831 } 832 catch (JDOMException e) { 833 LOG.warn("Error parsing included dataset [{0}]. Message [{1}]", incDSFile, e.getMessage()); 834 throw new CoordinatorJobException(ErrorCode.E0700, e.getMessage()); 835 } 836 resolveDataSets(tmpDataSets.getChildren("dataset")); 837 for (Element e : (List<Element>) tmpDataSets.getChildren("dataset")) { 838 String dsName = e.getAttributeValue("name"); 839 if (dsList.contains(dsName)) { 840 throw new RuntimeException("Duplicate Dataset " + dsName); 841 } 842 dsList.add(dsName); 843 Element tmp = (Element) e.clone(); 844 // TODO: Don't like to over-write the external/include DS's namespace 845 tmp.setNamespace(dsNameSpace); 846 tmp.getChild("uri-template").setNamespace(dsNameSpace); 847 if (e.getChild("done-flag") != null) { 848 tmp.getChild("done-flag").setNamespace(dsNameSpace); 849 } 850 allDataSets.addContent(tmp); 851 } 852 // nested include 853 for (Element includeElem : (List<Element>) tmpDataSets.getChildren("include", tmpDataSets.getNamespace())) { 854 String incFile = includeElem.getTextTrim(); 855 includeOneDSFile(incFile, dsList, allDataSets, dsNameSpace); 856 } 857 } 858 859 /** 860 * Remove a dataset from a list of dataset. 861 * 862 * @param eDatasets : List of dataset 863 * @param name : Dataset name to be removed. 864 */ 865 @SuppressWarnings("unchecked") 866 private static void removeDataSet(Element eDatasets, String name) { 867 for (Element eDataset : (List<Element>) eDatasets.getChildren("dataset", eDatasets.getNamespace())) { 868 if (eDataset.getAttributeValue("name").equals(name)) { 869 eDataset.detach(); 870 } 871 } 872 throw new RuntimeException("undefined dataset: " + name); 873 } 874 875 /** 876 * Read coordinator definition. 877 * 878 * @param appPath application path. 879 * @return coordinator definition. 880 * @throws CoordinatorJobException thrown if the definition could not be read. 881 */ 882 protected String readDefinition(String appPath) throws CoordinatorJobException { 883 String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME); 884 String group = ParamChecker.notEmpty(conf.get(OozieClient.GROUP_NAME), OozieClient.GROUP_NAME); 885 // Configuration confHadoop = CoordUtils.getHadoopConf(conf); 886 try { 887 URI uri = new URI(appPath); 888 LOG.debug("user =" + user + " group =" + group); 889 FileSystem fs = Services.get().get(HadoopAccessorService.class).createFileSystem(user, group, uri, 890 new Configuration()); 891 Path appDefPath = null; 892 893 // app path could be a directory 894 Path path = new Path(uri.getPath()); 895 // check file exists for dataset include file, app xml already checked 896 if (!fs.exists(path)) { 897 throw new URISyntaxException(path.toString(), "path not existed : " + path.toString()); 898 } 899 if (!fs.isFile(path)) { 900 appDefPath = new Path(path, COORDINATOR_XML_FILE); 901 } else { 902 appDefPath = path; 903 } 904 905 Reader reader = new InputStreamReader(fs.open(appDefPath)); 906 StringWriter writer = new StringWriter(); 907 IOUtils.copyCharStream(reader, writer); 908 return writer.toString(); 909 } 910 catch (IOException ex) { 911 LOG.warn("IOException :" + XmlUtils.prettyPrint(conf), ex); 912 throw new CoordinatorJobException(ErrorCode.E1001, ex.getMessage(), ex); 913 } 914 catch (URISyntaxException ex) { 915 LOG.warn("URISyException :" + ex.getMessage()); 916 throw new CoordinatorJobException(ErrorCode.E1002, appPath, ex.getMessage(), ex); 917 } 918 catch (HadoopAccessorException ex) { 919 throw new CoordinatorJobException(ex); 920 } 921 catch (Exception ex) { 922 LOG.warn("Exception :", ex); 923 throw new CoordinatorJobException(ErrorCode.E1001, ex.getMessage(), ex); 924 } 925 } 926 927 /** 928 * Write a coordinator job into database 929 * 930 * @param eJob : XML element of job 931 * @param coordJob : Coordinator job bean 932 * @return Job id 933 * @throws CommandException thrown if unable to save coordinator job to db 934 */ 935 private String storeToDB(Element eJob, CoordinatorJobBean coordJob) throws CommandException { 936 String jobId = Services.get().get(UUIDService.class).generateId(ApplicationType.COORDINATOR); 937 coordJob.setId(jobId); 938 coordJob.setAuthToken(this.authToken); 939 940 coordJob.setAppPath(conf.get(OozieClient.COORDINATOR_APP_PATH)); 941 coordJob.setCreatedTime(new Date()); 942 coordJob.setUser(conf.get(OozieClient.USER_NAME)); 943 coordJob.setGroup(conf.get(OozieClient.GROUP_NAME)); 944 coordJob.setConf(XmlUtils.prettyPrint(conf).toString()); 945 coordJob.setJobXml(XmlUtils.prettyPrint(eJob).toString()); 946 coordJob.setLastActionNumber(0); 947 coordJob.setLastModifiedTime(new Date()); 948 949 if (!dryrun) { 950 coordJob.setLastModifiedTime(new Date()); 951 try { 952 jpaService.execute(new CoordJobInsertJPAExecutor(coordJob)); 953 } 954 catch (JPAExecutorException je) { 955 throw new CommandException(je); 956 } 957 } 958 return jobId; 959 } 960 961 /* (non-Javadoc) 962 * @see org.apache.oozie.command.XCommand#getEntityKey() 963 */ 964 @Override 965 protected String getEntityKey() { 966 return null; 967 } 968 969 /* (non-Javadoc) 970 * @see org.apache.oozie.command.XCommand#isLockRequired() 971 */ 972 @Override 973 protected boolean isLockRequired() { 974 return false; 975 } 976 977 /* (non-Javadoc) 978 * @see org.apache.oozie.command.XCommand#loadState() 979 */ 980 @Override 981 protected void loadState() throws CommandException { 982 jpaService = Services.get().get(JPAService.class); 983 if (jpaService == null) { 984 throw new CommandException(ErrorCode.E0610); 985 } 986 coordJob = new CoordinatorJobBean(); 987 if (this.bundleId != null) { 988 // this coord job is created from bundle 989 coordJob.setBundleId(this.bundleId); 990 // first use bundle id if submit thru bundle 991 logInfo.setParameter(DagXLogInfoService.JOB, this.bundleId); 992 LogUtils.setLogInfo(logInfo); 993 } 994 if (this.coordName != null) { 995 // this coord job is created from bundle 996 coordJob.setAppName(this.coordName); 997 } 998 setJob(coordJob); 999 1000 } 1001 1002 /* (non-Javadoc) 1003 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 1004 */ 1005 @Override 1006 protected void verifyPrecondition() throws CommandException { 1007 1008 } 1009 1010 /* (non-Javadoc) 1011 * @see org.apache.oozie.command.TransitionXCommand#notifyParent() 1012 */ 1013 @Override 1014 public void notifyParent() throws CommandException { 1015 // update bundle action 1016 if (coordJob.getBundleId() != null) { 1017 LOG.debug("Updating bundle record: " + coordJob.getBundleId() + " for coord id: " + coordJob.getId()); 1018 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus); 1019 bundleStatusUpdate.call(); 1020 } 1021 } 1022 1023 /* (non-Javadoc) 1024 * @see org.apache.oozie.command.TransitionXCommand#updateJob() 1025 */ 1026 @Override 1027 public void updateJob() throws CommandException { 1028 } 1029 1030 /* (non-Javadoc) 1031 * @see org.apache.oozie.command.TransitionXCommand#getJob() 1032 */ 1033 @Override 1034 public Job getJob() { 1035 return coordJob; 1036 } 1037 }