001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.coord; 019 020 import java.io.IOException; 021 import java.io.InputStreamReader; 022 import java.io.Reader; 023 import java.io.StringReader; 024 import java.io.StringWriter; 025 import java.net.URI; 026 import java.net.URISyntaxException; 027 import java.util.ArrayList; 028 import java.util.Date; 029 import java.util.HashMap; 030 import java.util.HashSet; 031 import java.util.List; 032 import java.util.Set; 033 import java.util.TreeSet; 034 035 import javax.xml.transform.stream.StreamSource; 036 import javax.xml.validation.Validator; 037 038 import org.apache.hadoop.conf.Configuration; 039 import org.apache.hadoop.fs.FileSystem; 040 import org.apache.hadoop.fs.Path; 041 import org.apache.oozie.CoordinatorJobBean; 042 import org.apache.oozie.ErrorCode; 043 import org.apache.oozie.client.CoordinatorJob; 044 import org.apache.oozie.client.OozieClient; 045 import org.apache.oozie.client.CoordinatorJob.Execution; 046 import org.apache.oozie.command.CommandException; 047 import org.apache.oozie.coord.CoordELEvaluator; 048 import org.apache.oozie.coord.CoordELFunctions; 049 import org.apache.oozie.coord.CoordUtils; 050 import org.apache.oozie.coord.CoordinatorJobException; 051 import org.apache.oozie.coord.TimeUnit; 052 import org.apache.oozie.service.DagXLogInfoService; 053 import org.apache.oozie.service.HadoopAccessorException; 054 import org.apache.oozie.service.SchemaService; 055 import org.apache.oozie.service.Service; 056 import org.apache.oozie.service.Services; 057 import org.apache.oozie.service.UUIDService; 058 import org.apache.oozie.service.HadoopAccessorService; 059 import org.apache.oozie.service.WorkflowAppService; 060 import org.apache.oozie.service.SchemaService.SchemaName; 061 import org.apache.oozie.service.UUIDService.ApplicationType; 062 import org.apache.oozie.store.CoordinatorStore; 063 import org.apache.oozie.store.StoreException; 064 import org.apache.oozie.util.DateUtils; 065 import org.apache.oozie.util.ELEvaluator; 066 import org.apache.oozie.util.IOUtils; 067 import org.apache.oozie.util.ParamChecker; 068 import org.apache.oozie.util.PropertiesUtils; 069 import org.apache.oozie.util.XConfiguration; 070 import org.apache.oozie.util.XLog; 071 import org.apache.oozie.util.XmlUtils; 072 import org.apache.oozie.workflow.WorkflowException; 073 import org.jdom.Attribute; 074 import org.jdom.Element; 075 import org.jdom.JDOMException; 076 import org.jdom.Namespace; 077 import org.xml.sax.SAXException; 078 079 /** 080 * This class provides the functionalities to resolve a coordinator job XML and write the job information into a DB 081 * table. <p/> Specifically it performs the following functions: 1. Resolve all the variables or properties using job 082 * configurations. 2. Insert all datasets definition as part of the <data-in> and <data-out> tags. 3. Validate the XML 083 * at runtime. 084 */ 085 public class CoordSubmitCommand extends CoordinatorCommand<String> { 086 087 private Configuration conf; 088 private String authToken; 089 private boolean dryrun; 090 091 public static final String CONFIG_DEFAULT = "coord-config-default.xml"; 092 public static final String COORDINATOR_XML_FILE = "coordinator.xml"; 093 094 private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>(); 095 private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>(); 096 /** 097 * Default timeout for normal jobs, in minutes, after which coordinator input check will timeout 098 */ 099 public static final String CONF_DEFAULT_TIMEOUT_NORMAL = Service.CONF_PREFIX + "coord.normal.default.timeout"; 100 101 private XLog log = XLog.getLog(getClass()); 102 private ELEvaluator evalFreq = null; 103 private ELEvaluator evalNofuncs = null; 104 private ELEvaluator evalData = null; 105 private ELEvaluator evalInst = null; 106 private ELEvaluator evalSla = null; 107 108 static { 109 String[] badUserProps = {PropertiesUtils.YEAR, PropertiesUtils.MONTH, PropertiesUtils.DAY, 110 PropertiesUtils.HOUR, PropertiesUtils.MINUTE, PropertiesUtils.DAYS, PropertiesUtils.HOURS, 111 PropertiesUtils.MINUTES, PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB, 112 PropertiesUtils.TB, PropertiesUtils.PB, PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN, 113 PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN, PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS}; 114 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES); 115 116 String[] badDefaultProps = {PropertiesUtils.HADOOP_USER, PropertiesUtils.HADOOP_UGI, 117 WorkflowAppService.HADOOP_JT_KERBEROS_NAME, WorkflowAppService.HADOOP_NN_KERBEROS_NAME}; 118 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES); 119 PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES); 120 } 121 122 /** 123 * Constructor to create the Coordinator Submit Command. 124 * 125 * @param conf : Configuration for Coordinator job 126 * @param authToken : To be used for authentication 127 */ 128 public CoordSubmitCommand(Configuration conf, String authToken) { 129 super("coord_submit", "coord_submit", 1, XLog.STD); 130 this.conf = ParamChecker.notNull(conf, "conf"); 131 this.authToken = ParamChecker.notEmpty(authToken, "authToken"); 132 } 133 134 public CoordSubmitCommand(boolean dryrun, Configuration conf, String authToken) { 135 super("coord_submit", "coord_submit", 1, XLog.STD, dryrun); 136 this.conf = ParamChecker.notNull(conf, "conf"); 137 this.authToken = ParamChecker.notEmpty(authToken, "authToken"); 138 this.dryrun = dryrun; 139 // TODO Auto-generated constructor stub 140 } 141 142 /* 143 * (non-Javadoc) 144 * 145 * @see org.apache.oozie.command.Command#call(org.apache.oozie.store.Store) 146 */ 147 @Override 148 protected String call(CoordinatorStore store) throws StoreException, CommandException { 149 String jobId = null; 150 log.info("STARTED Coordinator Submit"); 151 incrJobCounter(1); 152 CoordinatorJobBean coordJob = new CoordinatorJobBean(); 153 try { 154 XLog.Info.get().setParameter(DagXLogInfoService.TOKEN, conf.get(OozieClient.LOG_TOKEN)); 155 mergeDefaultConfig(); 156 157 String appXml = readAndValidateXml(); 158 coordJob.setOrigJobXml(appXml); 159 log.debug("jobXml after initial validation " + XmlUtils.prettyPrint(appXml).toString()); 160 appXml = XmlUtils.removeComments(appXml); 161 initEvaluators(); 162 Element eJob = basicResolveAndIncludeDS(appXml, conf, coordJob); 163 log.debug("jobXml after all validation " + XmlUtils.prettyPrint(eJob).toString()); 164 165 jobId = storeToDB(eJob, store, coordJob); 166 // log JOB info for coordinator jobs 167 setLogInfo(coordJob); 168 log = XLog.getLog(getClass()); 169 170 if (!dryrun) { 171 // submit a command to materialize jobs for the next 1 hour (3600 secs) 172 // so we don't wait 10 mins for the Service to run. 173 queueCallable(new CoordJobMatLookupCommand(jobId, 3600), 100); 174 } 175 else { 176 Date startTime = coordJob.getStartTime(); 177 long startTimeMilli = startTime.getTime(); 178 long endTimeMilli = startTimeMilli + (3600 * 1000); 179 Date jobEndTime = coordJob.getEndTime(); 180 Date endTime = new Date(endTimeMilli); 181 if (endTime.compareTo(jobEndTime) > 0) { 182 endTime = jobEndTime; 183 } 184 jobId = coordJob.getId(); 185 log.info("[" + jobId + "]: Update status to PREMATER"); 186 coordJob.setStatus(CoordinatorJob.Status.PREMATER); 187 CoordActionMaterializeCommand coordActionMatCom = new CoordActionMaterializeCommand(jobId, startTime, 188 endTime); 189 Configuration jobConf = null; 190 try { 191 jobConf = new XConfiguration(new StringReader(coordJob.getConf())); 192 } 193 catch (IOException e1) { 194 log.warn("Configuration parse error. read from DB :" + coordJob.getConf(), e1); 195 } 196 String action = coordActionMatCom.materializeJobs(true, coordJob, jobConf, null); 197 String output = coordJob.getJobXml() + System.getProperty("line.separator") 198 + "***actions for instance***" + action; 199 return output; 200 } 201 } 202 catch (CoordinatorJobException ex) { 203 log.warn("ERROR: ", ex); 204 throw new CommandException(ex); 205 } 206 catch (IllegalArgumentException iex) { 207 log.warn("ERROR: ", iex); 208 throw new CommandException(ErrorCode.E1003, iex); 209 } 210 catch (Exception ex) {// TODO 211 log.warn("ERROR: ", ex); 212 throw new CommandException(ErrorCode.E0803, ex); 213 } 214 log.info("ENDED Coordinator Submit jobId=" + jobId); 215 return jobId; 216 } 217 218 /** 219 * Read the application XML and validate against coordinator Schema 220 * 221 * @return validated coordinator XML 222 * @throws CoordinatorJobException 223 */ 224 private String readAndValidateXml() throws CoordinatorJobException { 225 String appPath = ParamChecker.notEmpty(conf.get(OozieClient.COORDINATOR_APP_PATH), 226 OozieClient.COORDINATOR_APP_PATH);// TODO: COORDINATOR_APP_PATH 227 String coordXml = readDefinition(appPath); 228 validateXml(coordXml); 229 return coordXml; 230 } 231 232 /** 233 * Validate against Coordinator XSD file 234 * 235 * @param xmlContent : Input coordinator xml 236 * @throws CoordinatorJobException 237 */ 238 private void validateXml(String xmlContent) throws CoordinatorJobException { 239 javax.xml.validation.Schema schema = Services.get().get(SchemaService.class).getSchema(SchemaName.COORDINATOR); 240 Validator validator = schema.newValidator(); 241 // log.warn("XML " + xmlContent); 242 try { 243 validator.validate(new StreamSource(new StringReader(xmlContent))); 244 } 245 catch (SAXException ex) { 246 log.warn("SAXException :", ex); 247 throw new CoordinatorJobException(ErrorCode.E0701, ex.getMessage(), ex); 248 } 249 catch (IOException ex) { 250 // ex.printStackTrace(); 251 log.warn("IOException :", ex); 252 throw new CoordinatorJobException(ErrorCode.E0702, ex.getMessage(), ex); 253 } 254 } 255 256 /** 257 * Merge default configuration with user-defined configuration. 258 * 259 * @throws CommandException 260 */ 261 protected void mergeDefaultConfig() throws CommandException { 262 Path coordAppDir = new Path(conf.get(OozieClient.COORDINATOR_APP_PATH)).getParent(); 263 Path configDefault = new Path(coordAppDir, CONFIG_DEFAULT); 264 // Configuration fsConfig = new Configuration(); 265 // log.warn("CONFIG :" + configDefault.toUri()); 266 Configuration fsConfig = CoordUtils.getHadoopConf(conf); 267 FileSystem fs; 268 // TODO: which conf? 269 try { 270 String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME); 271 String group = ParamChecker.notEmpty(conf.get(OozieClient.GROUP_NAME), OozieClient.GROUP_NAME); 272 fs = Services.get().get(HadoopAccessorService.class).createFileSystem(user, group, configDefault.toUri(), 273 conf); 274 if (fs.exists(configDefault)) { 275 Configuration defaultConf = new XConfiguration(fs.open(configDefault)); 276 PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES); 277 XConfiguration.injectDefaults(defaultConf, conf); 278 } 279 else { 280 log.info("configDefault Doesn't exist " + configDefault); 281 } 282 PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES); 283 } 284 catch (IOException e) { 285 throw new CommandException(ErrorCode.E0702, e.getMessage() + " : Problem reading default config " 286 + configDefault, e); 287 } 288 catch (HadoopAccessorException e) { 289 throw new CommandException(e); 290 } 291 log.debug("Merged CONF :" + XmlUtils.prettyPrint(conf).toString()); 292 } 293 294 /** 295 * The method resolve all the variables that are defined in configuration. It also include the data set definition 296 * from dataset file into XML. 297 * 298 * @param appXml : Original job XML 299 * @param conf : Configuration of the job 300 * @param coordJob : Coordinator job bean to be populated. 301 * @return : Resolved and modified job XML element. 302 * @throws Exception 303 */ 304 public Element basicResolveAndIncludeDS(String appXml, Configuration conf, CoordinatorJobBean coordJob) 305 throws CoordinatorJobException, Exception { 306 Element basicResolvedApp = resolveInitial(conf, appXml, coordJob); 307 includeDataSets(basicResolvedApp, conf); 308 return basicResolvedApp; 309 } 310 311 /** 312 * Insert data set into data-in and data-out tags. 313 * 314 * @param eAppXml : coordinator application XML 315 * @param eDatasets : DataSet XML 316 * @return updated application 317 */ 318 private void insertDataSet(Element eAppXml, Element eDatasets) { 319 // Adding DS definition in the coordinator XML 320 Element inputList = eAppXml.getChild("input-events", eAppXml.getNamespace()); 321 if (inputList != null) { 322 for (Element dataIn : (List<Element>) inputList.getChildren("data-in", eAppXml.getNamespace())) { 323 Element eDataset = findDataSet(eDatasets, dataIn.getAttributeValue("dataset")); 324 dataIn.getContent().add(0, eDataset); 325 } 326 } 327 Element outputList = eAppXml.getChild("output-events", eAppXml.getNamespace()); 328 if (outputList != null) { 329 for (Element dataOut : (List<Element>) outputList.getChildren("data-out", eAppXml.getNamespace())) { 330 Element eDataset = findDataSet(eDatasets, dataOut.getAttributeValue("dataset")); 331 dataOut.getContent().add(0, eDataset); 332 } 333 } 334 } 335 336 /** 337 * Find a specific dataset from a list of Datasets. 338 * 339 * @param eDatasets : List of data sets 340 * @param name : queried data set name 341 * @return one Dataset element. otherwise throw Exception 342 */ 343 private static Element findDataSet(Element eDatasets, String name) { 344 for (Element eDataset : (List<Element>) eDatasets.getChildren("dataset", eDatasets.getNamespace())) { 345 if (eDataset.getAttributeValue("name").equals(name)) { 346 eDataset = (Element) eDataset.clone(); 347 eDataset.detach(); 348 return eDataset; 349 } 350 } 351 throw new RuntimeException("undefined dataset: " + name); 352 } 353 354 /** 355 * Initialize all the required EL Evaluators. 356 */ 357 protected void initEvaluators() { 358 evalFreq = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-submit-freq"); 359 evalNofuncs = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-submit-nofuncs"); 360 evalInst = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-job-submit-instances"); 361 evalSla = CoordELEvaluator.createELEvaluatorForGroup(conf, "coord-sla-submit"); 362 } 363 364 /** 365 * Resolve basic entities using job Configuration. 366 * 367 * @param conf :Job configuration 368 * @param appXml : Original job XML 369 * @param coordJob : Coordinator job bean to be populated. 370 * @return Resolved job XML element. 371 * @throws Exception 372 */ 373 protected Element resolveInitial(Configuration conf, String appXml, CoordinatorJobBean coordJob) 374 throws CoordinatorJobException, Exception { 375 Element eAppXml = XmlUtils.parseXml(appXml); 376 // job's main attributes 377 // frequency 378 String val = resolveAttribute("frequency", eAppXml, evalFreq); 379 int ival = ParamChecker.checkInteger(val, "frequency"); 380 ParamChecker.checkGTZero(ival, "frequency"); 381 coordJob.setFrequency(ival); 382 TimeUnit tmp = (evalFreq.getVariable("timeunit") == null) ? TimeUnit.MINUTE : ((TimeUnit) evalFreq 383 .getVariable("timeunit")); 384 addAnAttribute("freq_timeunit", eAppXml, tmp.toString()); // TODO: Store 385 // TimeUnit 386 coordJob.setTimeUnit(CoordinatorJob.Timeunit.valueOf(tmp.toString())); 387 // End Of Duration 388 tmp = evalFreq.getVariable("endOfDuration") == null ? TimeUnit.NONE : ((TimeUnit) evalFreq 389 .getVariable("endOfDuration")); 390 addAnAttribute("end_of_duration", eAppXml, tmp.toString()); 391 // coordJob.setEndOfDuration(tmp) // TODO: Add new attribute in Job bean 392 393 // start time 394 val = resolveAttribute("start", eAppXml, evalNofuncs); 395 ParamChecker.checkUTC(val, "start"); 396 coordJob.setStartTime(DateUtils.parseDateUTC(val)); 397 // end time 398 val = resolveAttribute("end", eAppXml, evalNofuncs); 399 ParamChecker.checkUTC(val, "end"); 400 coordJob.setEndTime(DateUtils.parseDateUTC(val)); 401 // Time zone 402 val = resolveAttribute("timezone", eAppXml, evalNofuncs); 403 ParamChecker.checkTimeZone(val, "timezone"); 404 coordJob.setTimeZone(val); 405 406 // controls 407 val = resolveTagContents("timeout", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs); 408 if (val == "") { 409 val = Services.get().getConf().get(CONF_DEFAULT_TIMEOUT_NORMAL); 410 } 411 412 ival = ParamChecker.checkInteger(val, "timeout"); 413 // ParamChecker.checkGEZero(ival, "timeout"); 414 coordJob.setTimeout(ival); 415 val = resolveTagContents("concurrency", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs); 416 if (val == "") { 417 val = "-1"; 418 } 419 ival = ParamChecker.checkInteger(val, "concurrency"); 420 // ParamChecker.checkGEZero(ival, "concurrency"); 421 coordJob.setConcurrency(ival); 422 val = resolveTagContents("execution", eAppXml.getChild("controls", eAppXml.getNamespace()), evalNofuncs); 423 if (val == "") { 424 val = Execution.FIFO.toString(); 425 } 426 coordJob.setExecution(Execution.valueOf(val)); 427 String[] acceptedVals = {Execution.LIFO.toString(), Execution.FIFO.toString(), Execution.LAST_ONLY.toString()}; 428 ParamChecker.isMember(val, acceptedVals, "execution"); 429 430 // datasets 431 resolveTagContents("include", eAppXml.getChild("datasets", eAppXml.getNamespace()), evalNofuncs); 432 // for each data set 433 resolveDataSets(eAppXml); 434 HashMap<String, String> dataNameList = new HashMap<String, String>(); 435 resolveIOEvents(eAppXml, dataNameList); 436 437 resolveTagContents("app-path", eAppXml.getChild("action", eAppXml.getNamespace()).getChild("workflow", 438 eAppXml.getNamespace()), evalNofuncs); 439 // TODO: If action or workflow tag is missing, NullPointerException will 440 // occur 441 Element configElem = eAppXml.getChild("action", eAppXml.getNamespace()).getChild("workflow", 442 eAppXml.getNamespace()).getChild("configuration", eAppXml.getNamespace()); 443 evalData = CoordELEvaluator.createELEvaluatorForDataEcho(conf, "coord-job-submit-data", dataNameList); 444 if (configElem != null) { 445 for (Element propElem : (List<Element>) configElem.getChildren("property", configElem.getNamespace())) { 446 resolveTagContents("name", propElem, evalData); 447 // log.warn("Value :"); 448 // Want to check the data-integrity but don't want to modify the 449 // XML 450 // for properties only 451 Element tmpProp = (Element) propElem.clone(); 452 resolveTagContents("value", tmpProp, evalData); 453 // val = resolveTagContents("value", propElem, evalData); 454 // log.warn("Value OK :" + val); 455 } 456 } 457 resolveSLA(eAppXml, coordJob); 458 return eAppXml; 459 } 460 461 private void resolveSLA(Element eAppXml, CoordinatorJobBean coordJob) throws CommandException { 462 // String prefix = XmlUtils.getNamespacePrefix(eAppXml, 463 // SchemaService.SLA_NAME_SPACE_URI); 464 Element eSla = eAppXml.getChild("action", eAppXml.getNamespace()).getChild("info", 465 Namespace.getNamespace(SchemaService.SLA_NAME_SPACE_URI)); 466 467 if (eSla != null) { 468 String slaXml = XmlUtils.prettyPrint(eSla).toString(); 469 try { 470 // EL evaluation 471 slaXml = evalSla.evaluate(slaXml, String.class); 472 // Validate against semantic SXD 473 XmlUtils.validateData(slaXml, SchemaName.SLA_ORIGINAL); 474 } 475 catch (Exception e) { 476 throw new CommandException(ErrorCode.E1004, "Validation ERROR :" + e.getMessage(), e); 477 } 478 } 479 } 480 481 /** 482 * Resolve input-events/data-in and output-events/data-out tags. 483 * 484 * @param eJob : Job element 485 * @throws CoordinatorJobException 486 */ 487 private void resolveIOEvents(Element eJobOrg, HashMap<String, String> dataNameList) throws CoordinatorJobException { 488 // Resolving input-events/data-in 489 // Clone the job and don't update anything in the original 490 Element eJob = (Element) eJobOrg.clone(); 491 Element inputList = eJob.getChild("input-events", eJob.getNamespace()); 492 if (inputList != null) { 493 TreeSet<String> eventNameSet = new TreeSet<String>(); 494 for (Element dataIn : (List<Element>) inputList.getChildren("data-in", eJob.getNamespace())) { 495 String dataInName = dataIn.getAttributeValue("name"); 496 dataNameList.put(dataInName, "data-in"); 497 // check whether there is any duplicate data-in name 498 if (eventNameSet.contains(dataInName)) { 499 throw new RuntimeException("Duplicate dataIn name " + dataInName); 500 } 501 else { 502 eventNameSet.add(dataInName); 503 } 504 resolveTagContents("instance", dataIn, evalInst); 505 resolveTagContents("start-instance", dataIn, evalInst); 506 resolveTagContents("end-instance", dataIn, evalInst); 507 } 508 } 509 // Resolving output-events/data-out 510 Element outputList = eJob.getChild("output-events", eJob.getNamespace()); 511 if (outputList != null) { 512 TreeSet<String> eventNameSet = new TreeSet<String>(); 513 for (Element dataOut : (List<Element>) outputList.getChildren("data-out", eJob.getNamespace())) { 514 String dataOutName = dataOut.getAttributeValue("name"); 515 dataNameList.put(dataOutName, "data-out"); 516 // check whether there is any duplicate data-out name 517 if (eventNameSet.contains(dataOutName)) { 518 throw new RuntimeException("Duplicate dataIn name " + dataOutName); 519 } 520 else { 521 eventNameSet.add(dataOutName); 522 } 523 resolveTagContents("instance", dataOut, evalInst); 524 } 525 } 526 527 } 528 529 /** 530 * Add an attribute into XML element. 531 * 532 * @param attrName :attribute name 533 * @param elem : Element to add attribute 534 * @param value :Value of attribute 535 */ 536 private void addAnAttribute(String attrName, Element elem, String value) { 537 elem.setAttribute(attrName, value); 538 } 539 540 /** 541 * Resolve Data set using job configuration. 542 * 543 * @param eAppXml : Job Element XML 544 * @throws Exception 545 */ 546 private void resolveDataSets(Element eAppXml) throws Exception { 547 Element datasetList = eAppXml.getChild("datasets", eAppXml.getNamespace()); 548 if (datasetList != null) { 549 550 List<Element> dsElems = datasetList.getChildren("dataset", eAppXml.getNamespace()); 551 resolveDataSets(dsElems); 552 resolveTagContents("app-path", eAppXml.getChild("action", eAppXml.getNamespace()).getChild("workflow", 553 eAppXml.getNamespace()), evalNofuncs); 554 } 555 } 556 557 /** 558 * Resolve Data set using job configuration. 559 * 560 * @param dsElems : Data set XML element. 561 * @throws CoordinatorJobException 562 * @throws Exception 563 */ 564 private void resolveDataSets(List<Element> dsElems) throws CoordinatorJobException /* 565 * throws 566 * Exception 567 */ { 568 for (Element dsElem : dsElems) { 569 // Setting up default TimeUnit and EndOFDuraion 570 evalFreq.setVariable("timeunit", TimeUnit.MINUTE); 571 evalFreq.setVariable("endOfDuration", TimeUnit.NONE); 572 573 String val = resolveAttribute("frequency", dsElem, evalFreq); 574 int ival = ParamChecker.checkInteger(val, "frequency"); 575 ParamChecker.checkGTZero(ival, "frequency"); 576 addAnAttribute("freq_timeunit", dsElem, evalFreq.getVariable("timeunit") == null ? TimeUnit.MINUTE 577 .toString() : ((TimeUnit) evalFreq.getVariable("timeunit")).toString()); 578 addAnAttribute("end_of_duration", dsElem, evalFreq.getVariable("endOfDuration") == null ? TimeUnit.NONE 579 .toString() : ((TimeUnit) evalFreq.getVariable("endOfDuration")).toString()); 580 val = resolveAttribute("initial-instance", dsElem, evalNofuncs); 581 ParamChecker.checkUTC(val, "initial-instance"); 582 val = resolveAttribute("timezone", dsElem, evalNofuncs); 583 ParamChecker.checkTimeZone(val, "timezone"); 584 resolveTagContents("uri-template", dsElem, evalNofuncs); 585 resolveTagContents("done-flag", dsElem, evalNofuncs); 586 } 587 } 588 589 /** 590 * Resolve the content of a tag. 591 * 592 * @param tagName : Tag name of job XML i.e. <timeout> 10 </timeout> 593 * @param elem : Element where the tag exists. 594 * @param eval : 595 * @return Resolved tag content. 596 * @throws CoordinatorJobException 597 */ 598 private String resolveTagContents(String tagName, Element elem, ELEvaluator eval) throws CoordinatorJobException { 599 String ret = ""; 600 if (elem != null) { 601 for (Element tagElem : (List<Element>) elem.getChildren(tagName, elem.getNamespace())) { 602 if (tagElem != null) { 603 String updated; 604 try { 605 updated = CoordELFunctions.evalAndWrap(eval, tagElem.getText().trim()); 606 607 } 608 catch (Exception e) { 609 // e.printStackTrace(); 610 throw new CoordinatorJobException(ErrorCode.E1004, e.getMessage(), e); 611 } 612 tagElem.removeContent(); 613 tagElem.addContent(updated); 614 ret += updated; 615 } 616 /* 617 * else { //TODO: unlike event } 618 */ 619 } 620 } 621 return ret; 622 } 623 624 /** 625 * Resolve an attribute value. 626 * 627 * @param attrName : Attribute name. 628 * @param elem : XML Element where attribute is defiend 629 * @param eval : ELEvaluator used to resolve 630 * @return Resolved attribute value 631 * @throws CoordinatorJobException 632 */ 633 private String resolveAttribute(String attrName, Element elem, ELEvaluator eval) throws CoordinatorJobException { 634 Attribute attr = elem.getAttribute(attrName); 635 String val = null; 636 if (attr != null) { 637 try { 638 val = CoordELFunctions.evalAndWrap(eval, attr.getValue().trim()); 639 640 } 641 catch (Exception e) { 642 // e.printStackTrace(); 643 throw new CoordinatorJobException(ErrorCode.E1004, e.getMessage(), e); 644 } 645 attr.setValue(val); 646 } 647 return val; 648 } 649 650 /** 651 * Include referred Datasets into XML. 652 * 653 * @param resolvedXml : Job XML element. 654 * @param conf : Job configuration 655 * @throws CoordinatorJobException 656 */ 657 protected void includeDataSets(Element resolvedXml, Configuration conf) throws CoordinatorJobException 658 /* throws Exception */ { 659 Element datasets = resolvedXml.getChild("datasets", resolvedXml.getNamespace()); 660 Element allDataSets = new Element("all_datasets", resolvedXml.getNamespace()); 661 List<String> dsList = new ArrayList<String>(); 662 if (datasets != null) { 663 for (Element includeElem : (List<Element>) datasets.getChildren("include", datasets.getNamespace())) { 664 String incDSFile = includeElem.getTextTrim(); 665 // log.warn(" incDSFile " + incDSFile); 666 includeOneDSFile(incDSFile, dsList, allDataSets, datasets.getNamespace()); 667 } 668 for (Element e : (List<Element>) datasets.getChildren("dataset", datasets.getNamespace())) { 669 String dsName = (String) e.getAttributeValue("name"); 670 if (dsList.contains(dsName)) {// Override with this DS 671 // Remove old DS 672 removeDataSet(allDataSets, dsName); 673 // throw new RuntimeException("Duplicate Dataset " + 674 // dsName); 675 } 676 else { 677 dsList.add(dsName); 678 } 679 allDataSets.addContent((Element) e.clone()); 680 } 681 } 682 insertDataSet(resolvedXml, allDataSets); 683 resolvedXml.removeChild("datasets", resolvedXml.getNamespace()); 684 } 685 686 /** 687 * Include One Dataset file. 688 * 689 * @param incDSFile : Include data set filename. 690 * @param dsList :List of dataset names to verify the duplicate. 691 * @param allDataSets : Element that includes all dataset definitions. 692 * @param dsNameSpace : Data set name space 693 * @throws CoordinatorJobException 694 * @throws Exception 695 */ 696 private void includeOneDSFile(String incDSFile, List<String> dsList, Element allDataSets, Namespace dsNameSpace) 697 throws CoordinatorJobException { 698 Element tmpDataSets = null; 699 try { 700 String dsXml = readDefinition(incDSFile); 701 log.debug("DSFILE :" + incDSFile + "\n" + dsXml); 702 tmpDataSets = XmlUtils.parseXml(dsXml); 703 } 704 /* 705 * catch (IOException iex) {XLog.getLog(getClass()).warn( 706 * "Error reading included dataset file [{0}]. Message [{1}]", 707 * incDSFile, iex.getMessage()); throw new 708 * CommandException(ErrorCode.E0803, iex.getMessage()); } 709 */ 710 catch (JDOMException e) { 711 log.warn("Error parsing included dataset [{0}]. Message [{1}]", incDSFile, e.getMessage()); 712 throw new CoordinatorJobException(ErrorCode.E0700, e.getMessage()); 713 } 714 resolveDataSets((List<Element>) tmpDataSets.getChildren("dataset")); 715 for (Element e : (List<Element>) tmpDataSets.getChildren("dataset")) { 716 String dsName = (String) e.getAttributeValue("name"); 717 if (dsList.contains(dsName)) { 718 throw new RuntimeException("Duplicate Dataset " + dsName); 719 } 720 dsList.add(dsName); 721 Element tmp = (Element) e.clone(); 722 // TODO: Don't like to over-write the external/include DS's 723 // namespace 724 tmp.setNamespace(dsNameSpace);// TODO: 725 tmp.getChild("uri-template").setNamespace(dsNameSpace); 726 if (e.getChild("done-flag") != null) { 727 tmp.getChild("done-flag").setNamespace(dsNameSpace); 728 } 729 allDataSets.addContent(tmp); 730 } 731 // nested include 732 for (Element includeElem : (List<Element>) tmpDataSets.getChildren("include", tmpDataSets.getNamespace())) { 733 String incFile = includeElem.getTextTrim(); 734 // log.warn("incDSFile "+ incDSFile); 735 includeOneDSFile(incFile, dsList, allDataSets, dsNameSpace); 736 } 737 } 738 739 /** 740 * Remove a dataset from a list of dataset. 741 * 742 * @param eDatasets : List of dataset 743 * @param name : Dataset name to be removed. 744 */ 745 private static void removeDataSet(Element eDatasets, String name) { 746 for (Element eDataset : (List<Element>) eDatasets.getChildren("dataset", eDatasets.getNamespace())) { 747 if (eDataset.getAttributeValue("name").equals(name)) { 748 eDataset.detach(); 749 } 750 } 751 throw new RuntimeException("undefined dataset: " + name); 752 } 753 754 /** 755 * Read workflow definition. 756 * 757 * @param appPath application path. 758 * @param user user name. 759 * @param group group name. 760 * @param autToken authentication token. 761 * @return workflow definition. 762 * @throws WorkflowException thrown if the definition could not be read. 763 */ 764 protected String readDefinition(String appPath) throws CoordinatorJobException { 765 String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME); 766 String group = ParamChecker.notEmpty(conf.get(OozieClient.GROUP_NAME), OozieClient.GROUP_NAME); 767 Configuration confHadoop = CoordUtils.getHadoopConf(conf); 768 try { 769 URI uri = new URI(appPath); 770 log.debug("user =" + user + " group =" + group); 771 FileSystem fs = Services.get().get(HadoopAccessorService.class).createFileSystem(user, group, uri, conf); 772 Path p = new Path(uri.getPath()); 773 774 // Reader reader = new InputStreamReader(fs.open(new Path(uri 775 // .getPath(), fileName))); 776 Reader reader = new InputStreamReader(fs.open(p));// TODO 777 StringWriter writer = new StringWriter(); 778 IOUtils.copyCharStream(reader, writer); 779 return writer.toString(); 780 } 781 catch (IOException ex) { 782 log.warn("IOException :" + XmlUtils.prettyPrint(confHadoop), ex); 783 throw new CoordinatorJobException(ErrorCode.E1001, ex.getMessage(), ex); // TODO: 784 } 785 catch (URISyntaxException ex) { 786 log.warn("URISyException :" + ex.getMessage()); 787 throw new CoordinatorJobException(ErrorCode.E1002, appPath, ex.getMessage(), ex);// TODO: 788 } 789 catch (HadoopAccessorException ex) { 790 throw new CoordinatorJobException(ex); 791 } 792 catch (Exception ex) { 793 log.warn("Exception :", ex); 794 throw new CoordinatorJobException(ErrorCode.E1001, ex.getMessage(), ex);// TODO: 795 } 796 } 797 798 /** 799 * Write a Coordinator Job into database 800 * 801 * @param eJob : XML element of job 802 * @param store : Coordinator Store to write. 803 * @param coordJob : Coordinator job bean 804 * @return Job if. 805 * @throws StoreException 806 */ 807 private String storeToDB(Element eJob, CoordinatorStore store, CoordinatorJobBean coordJob) throws StoreException { 808 String jobId = Services.get().get(UUIDService.class).generateId(ApplicationType.COORDINATOR); 809 coordJob.setId(jobId); 810 coordJob.setAuthToken(this.authToken); 811 coordJob.setAppName(eJob.getAttributeValue("name")); 812 coordJob.setAppPath(conf.get(OozieClient.COORDINATOR_APP_PATH)); 813 coordJob.setStatus(CoordinatorJob.Status.PREP); 814 coordJob.setCreatedTime(new Date()); // TODO: Do we need that? 815 coordJob.setUser(conf.get(OozieClient.USER_NAME)); 816 coordJob.setGroup(conf.get(OozieClient.GROUP_NAME)); 817 coordJob.setConf(XmlUtils.prettyPrint(conf).toString()); 818 coordJob.setJobXml(XmlUtils.prettyPrint(eJob).toString()); 819 coordJob.setLastActionNumber(0); 820 coordJob.setLastModifiedTime(new Date()); 821 822 if (!dryrun) { 823 store.insertCoordinatorJob(coordJob); 824 } 825 return jobId; 826 } 827 828 /** 829 * For unit-testing only. Will ultimately go away 830 * 831 * @param args 832 * @throws Exception 833 * @throws JDOMException 834 */ 835 public static void main(String[] args) throws Exception { 836 // TODO Auto-generated method stub 837 // Configuration conf = new XConfiguration(IOUtils.getResourceAsReader( 838 // "org/apache/oozie/coord/conf.xml", -1)); 839 840 Configuration conf = new XConfiguration(); 841 842 // base case 843 // conf.set(OozieClient.COORDINATOR_APP_PATH, 844 // "file:///Users/danielwo/oozie/workflows/coord/test1/"); 845 846 // no input datasets 847 // conf.set(OozieClient.COORDINATOR_APP_PATH, 848 // "file:///Users/danielwo/oozie/workflows/coord/coord_noinput/"); 849 // conf.set(OozieClient.COORDINATOR_APP_PATH, 850 // "file:///Users/danielwo/oozie/workflows/coord/coord_use_apppath/"); 851 852 // only 1 instance 853 // conf.set(OozieClient.COORDINATOR_APP_PATH, 854 // "file:///Users/danielwo/oozie/workflows/coord/coord_oneinstance/"); 855 856 // no local props in xml 857 // conf.set(OozieClient.COORDINATOR_APP_PATH, 858 // "file:///Users/danielwo/oozie/workflows/coord/coord_noprops/"); 859 860 conf.set(OozieClient.COORDINATOR_APP_PATH, 861 "file:///homes/test/workspace/sandbox_krishna/oozie-main/core/src/main/java/org/apache/oozie/coord/"); 862 conf.set(OozieClient.USER_NAME, "test"); 863 // conf.set(OozieClient.USER_NAME, "danielwo"); 864 conf.set(OozieClient.GROUP_NAME, "other"); 865 // System.out.println("appXml :"+ appXml + "\n conf :"+ conf); 866 new Services().init(); 867 try { 868 CoordSubmitCommand sc = new CoordSubmitCommand(conf, "TESTING"); 869 String jobId = sc.call(); 870 System.out.println("Job Id " + jobId); 871 Thread.sleep(80000); 872 } 873 finally { 874 Services.get().destroy(); 875 } 876 } 877 }