001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.command.bundle; 020 021import java.io.IOException; 022import java.io.InputStreamReader; 023import java.io.Reader; 024import java.io.StringReader; 025import java.io.StringWriter; 026import java.net.URI; 027import java.net.URISyntaxException; 028import java.util.Date; 029import java.util.HashSet; 030import java.util.List; 031import java.util.Map; 032import java.util.Set; 033 034import javax.xml.transform.stream.StreamSource; 035import javax.xml.validation.Validator; 036 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.fs.FileSystem; 039import org.apache.hadoop.fs.Path; 040import org.apache.oozie.BundleJobBean; 041import org.apache.oozie.ErrorCode; 042import org.apache.oozie.client.Job; 043import org.apache.oozie.client.OozieClient; 044import org.apache.oozie.command.CommandException; 045import org.apache.oozie.command.PreconditionException; 046import org.apache.oozie.command.SubmitTransitionXCommand; 047import org.apache.oozie.executor.jpa.BundleJobQueryExecutor; 048import org.apache.oozie.service.ELService; 049import org.apache.oozie.service.HadoopAccessorException; 050import org.apache.oozie.service.HadoopAccessorService; 051import org.apache.oozie.service.SchemaService; 052import org.apache.oozie.service.SchemaService.SchemaName; 053import org.apache.oozie.service.Services; 054import org.apache.oozie.service.UUIDService; 055import org.apache.oozie.service.UUIDService.ApplicationType; 056import org.apache.oozie.util.ConfigUtils; 057import org.apache.oozie.util.DateUtils; 058import org.apache.oozie.util.ELEvaluator; 059import org.apache.oozie.util.ELUtils; 060import org.apache.oozie.util.IOUtils; 061import org.apache.oozie.util.InstrumentUtils; 062import org.apache.oozie.util.LogUtils; 063import org.apache.oozie.util.ParamChecker; 064import org.apache.oozie.util.ParameterVerifier; 065import org.apache.oozie.util.PropertiesUtils; 066import org.apache.oozie.util.XConfiguration; 067import org.apache.oozie.util.XmlUtils; 068import org.jdom.Attribute; 069import org.jdom.Element; 070import org.jdom.JDOMException; 071import org.xml.sax.SAXException; 072 073/** 074 * This Command will submit the bundle. 075 */ 076public class BundleSubmitXCommand extends SubmitTransitionXCommand { 077 078 private Configuration conf; 079 public static final String CONFIG_DEFAULT = "bundle-config-default.xml"; 080 public static final String BUNDLE_XML_FILE = "bundle.xml"; 081 private final BundleJobBean bundleBean = new BundleJobBean(); 082 private String jobId; 083 private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>(); 084 private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>(); 085 086 static { 087 String[] badUserProps = { PropertiesUtils.YEAR, PropertiesUtils.MONTH, PropertiesUtils.DAY, 088 PropertiesUtils.HOUR, PropertiesUtils.MINUTE, PropertiesUtils.DAYS, PropertiesUtils.HOURS, 089 PropertiesUtils.MINUTES, PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB, 090 PropertiesUtils.TB, PropertiesUtils.PB, PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN, 091 PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN, PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS }; 092 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES); 093 094 String[] badDefaultProps = { PropertiesUtils.HADOOP_USER}; 095 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES); 096 PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES); 097 } 098 099 /** 100 * Constructor to create the bundle submit command. 101 * 102 * @param conf configuration for bundle job 103 */ 104 public BundleSubmitXCommand(Configuration conf) { 105 super("bundle_submit", "bundle_submit", 1); 106 this.conf = ParamChecker.notNull(conf, "conf"); 107 } 108 109 /** 110 * Constructor to create the bundle submit command. 111 * 112 * @param dryrun true if dryrun is enable 113 * @param conf configuration for bundle job 114 */ 115 public BundleSubmitXCommand(boolean dryrun, Configuration conf) { 116 this(conf); 117 this.dryrun = dryrun; 118 } 119 120 /* (non-Javadoc) 121 * @see org.apache.oozie.command.SubmitTransitionXCommand#submit() 122 */ 123 @Override 124 protected String submit() throws CommandException { 125 LOG.info("STARTED Bundle Submit"); 126 try { 127 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation()); 128 129 ParameterVerifier.verifyParameters(conf, XmlUtils.parseXml(bundleBean.getOrigJobXml())); 130 131 String jobXmlWithNoComment = XmlUtils.removeComments(this.bundleBean.getOrigJobXml().toString()); 132 // Resolving all variables in the job properties. 133 // This ensures the Hadoop Configuration semantics is preserved. 134 XConfiguration resolvedVarsConf = new XConfiguration(); 135 for (Map.Entry<String, String> entry : conf) { 136 resolvedVarsConf.set(entry.getKey(), conf.get(entry.getKey())); 137 } 138 conf = resolvedVarsConf; 139 140 String resolvedJobXml = resolvedVarsandFunctions(jobXmlWithNoComment, conf); 141 142 //verify the uniqueness of coord names 143 verifyCoordNameUnique(resolvedJobXml); 144 this.jobId = storeToDB(bundleBean, resolvedJobXml); 145 LogUtils.setLogInfo(bundleBean); 146 147 if (dryrun) { 148 Date startTime = bundleBean.getStartTime(); 149 long startTimeMilli = startTime.getTime(); 150 long endTimeMilli = startTimeMilli + (3600 * 1000); 151 Date jobEndTime = bundleBean.getEndTime(); 152 Date endTime = new Date(endTimeMilli); 153 if (endTime.compareTo(jobEndTime) > 0) { 154 endTime = jobEndTime; 155 } 156 jobId = bundleBean.getId(); 157 LOG.info("[" + jobId + "]: Update status to PREP"); 158 bundleBean.setStatus(Job.Status.PREP); 159 try { 160 new XConfiguration(new StringReader(bundleBean.getConf())); 161 } 162 catch (IOException e1) { 163 LOG.warn("Configuration parse error. read from DB :" + bundleBean.getConf(), e1); 164 } 165 String output = bundleBean.getJobXml() + System.getProperty("line.separator"); 166 return output; 167 } 168 else { 169 if (bundleBean.getKickoffTime() == null) { 170 // If there is no KickOffTime, default kickoff is NOW. 171 LOG.debug("Since kickoff time is not defined for job id " + jobId 172 + ". Queuing and BundleStartXCommand immediately after submission"); 173 queue(new BundleStartXCommand(jobId)); 174 } 175 } 176 } 177 catch (Exception ex) { 178 throw new CommandException(ErrorCode.E1310, ex.getMessage(), ex); 179 } 180 LOG.info("ENDED Bundle Submit"); 181 return this.jobId; 182 } 183 184 /* (non-Javadoc) 185 * @see org.apache.oozie.command.TransitionXCommand#notifyParent() 186 */ 187 @Override 188 public void notifyParent() throws CommandException { 189 } 190 191 /* (non-Javadoc) 192 * @see org.apache.oozie.command.XCommand#getEntityKey() 193 */ 194 @Override 195 public String getEntityKey() { 196 return null; 197 } 198 199 /* (non-Javadoc) 200 * @see org.apache.oozie.command.XCommand#isLockRequired() 201 */ 202 @Override 203 protected boolean isLockRequired() { 204 return false; 205 } 206 207 @Override 208 protected void loadState() throws CommandException { 209 } 210 211 @Override 212 protected void verifyPrecondition() throws CommandException, PreconditionException { 213 } 214 215 @Override 216 protected void eagerLoadState() throws CommandException { 217 } 218 219 @Override 220 protected void eagerVerifyPrecondition() throws CommandException, PreconditionException { 221 try { 222 mergeDefaultConfig(); 223 String appXml = readAndValidateXml(); 224 bundleBean.setOrigJobXml(appXml); 225 LOG.debug("jobXml after initial validation " + XmlUtils.prettyPrint(appXml).toString()); 226 } 227 catch (BundleJobException ex) { 228 LOG.warn("BundleJobException: ", ex); 229 throw new CommandException(ex); 230 } 231 catch (IllegalArgumentException iex) { 232 LOG.warn("IllegalArgumentException: ", iex); 233 throw new CommandException(ErrorCode.E1310, iex.getMessage(), iex); 234 } 235 catch (Exception ex) { 236 LOG.warn("Exception: ", ex); 237 throw new CommandException(ErrorCode.E1310, ex.getMessage(), ex); 238 } 239 } 240 241 /** 242 * Merge default configuration with user-defined configuration. 243 * 244 * @throws CommandException thrown if failed to merge configuration 245 */ 246 protected void mergeDefaultConfig() throws CommandException { 247 Path configDefault = null; 248 try { 249 String bundleAppPathStr = conf.get(OozieClient.BUNDLE_APP_PATH); 250 Path bundleAppPath = new Path(bundleAppPathStr); 251 String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME); 252 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 253 Configuration fsConf = has.createJobConf(bundleAppPath.toUri().getAuthority()); 254 FileSystem fs = has.createFileSystem(user, bundleAppPath.toUri(), fsConf); 255 256 // app path could be a directory 257 if (!fs.isFile(bundleAppPath)) { 258 configDefault = new Path(bundleAppPath, CONFIG_DEFAULT); 259 } else { 260 configDefault = new Path(bundleAppPath.getParent(), CONFIG_DEFAULT); 261 } 262 263 if (fs.exists(configDefault)) { 264 Configuration defaultConf = new XConfiguration(fs.open(configDefault)); 265 PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES); 266 XConfiguration.injectDefaults(defaultConf, conf); 267 } 268 else { 269 LOG.info("configDefault Doesn't exist " + configDefault); 270 } 271 PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES); 272 } 273 catch (IOException e) { 274 throw new CommandException(ErrorCode.E0702, e.getMessage() + " : Problem reading default config " 275 + configDefault, e); 276 } 277 catch (HadoopAccessorException e) { 278 throw new CommandException(e); 279 } 280 LOG.debug("Merged CONF :" + XmlUtils.prettyPrint(conf).toString()); 281 } 282 283 /** 284 * Read the application XML and validate against bundle Schema 285 * 286 * @return validated bundle XML 287 * @throws BundleJobException thrown if failed to read or validate xml 288 */ 289 private String readAndValidateXml() throws BundleJobException { 290 String appPath = ParamChecker.notEmpty(conf.get(OozieClient.BUNDLE_APP_PATH), OozieClient.BUNDLE_APP_PATH); 291 String bundleXml = readDefinition(appPath); 292 validateXml(bundleXml); 293 return bundleXml; 294 } 295 296 /** 297 * Read bundle definition. 298 * 299 * @param appPath application path. 300 * @return bundle definition. 301 * @throws BundleJobException thrown if the definition could not be read. 302 */ 303 protected String readDefinition(String appPath) throws BundleJobException { 304 String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME); 305 //Configuration confHadoop = CoordUtils.getHadoopConf(conf); 306 try { 307 URI uri = new URI(appPath); 308 LOG.debug("user =" + user); 309 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 310 Configuration fsConf = has.createJobConf(uri.getAuthority()); 311 FileSystem fs = has.createFileSystem(user, uri, fsConf); 312 Path appDefPath = null; 313 314 // app path could be a directory 315 Path path = new Path(uri.getPath()); 316 if (!fs.isFile(path)) { 317 appDefPath = new Path(path, BUNDLE_XML_FILE); 318 } else { 319 appDefPath = path; 320 } 321 322 Reader reader = new InputStreamReader(fs.open(appDefPath)); 323 StringWriter writer = new StringWriter(); 324 IOUtils.copyCharStream(reader, writer); 325 return writer.toString(); 326 } 327 catch (IOException ex) { 328 LOG.warn("IOException :" + XmlUtils.prettyPrint(conf), ex); 329 throw new BundleJobException(ErrorCode.E1301, ex.getMessage(), ex); 330 } 331 catch (URISyntaxException ex) { 332 LOG.warn("URISyException :" + ex.getMessage()); 333 throw new BundleJobException(ErrorCode.E1302, appPath, ex.getMessage(), ex); 334 } 335 catch (HadoopAccessorException ex) { 336 throw new BundleJobException(ex); 337 } 338 catch (Exception ex) { 339 LOG.warn("Exception :", ex); 340 throw new BundleJobException(ErrorCode.E1301, ex.getMessage(), ex); 341 } 342 } 343 344 /** 345 * Validate against Bundle XSD file 346 * 347 * @param xmlContent input bundle xml 348 * @throws BundleJobException thrown if failed to validate xml 349 */ 350 private void validateXml(String xmlContent) throws BundleJobException { 351 try { 352 Validator validator = Services.get().get(SchemaService.class).getValidator(SchemaName.BUNDLE); 353 validator.validate(new StreamSource(new StringReader(xmlContent))); 354 } 355 catch (SAXException ex) { 356 LOG.warn("SAXException :", ex); 357 throw new BundleJobException(ErrorCode.E0701, ex.getMessage(), ex); 358 } 359 catch (IOException ex) { 360 LOG.warn("IOException :", ex); 361 throw new BundleJobException(ErrorCode.E0702, ex.getMessage(), ex); 362 } 363 } 364 365 /** 366 * Write a Bundle Job into database 367 * 368 * @param Bundle job bean 369 * @return job id 370 * @throws CommandException thrown if failed to store bundle job bean to db 371 */ 372 private String storeToDB(BundleJobBean bundleJob, String resolvedJobXml) throws CommandException { 373 try { 374 jobId = Services.get().get(UUIDService.class).generateId(ApplicationType.BUNDLE); 375 376 bundleJob.setId(jobId); 377 String name = XmlUtils.parseXml(bundleBean.getOrigJobXml()).getAttributeValue("name"); 378 name = ELUtils.resolveAppName(name, conf); 379 bundleJob.setAppName(name); 380 bundleJob.setAppPath(conf.get(OozieClient.BUNDLE_APP_PATH)); 381 // bundleJob.setStatus(BundleJob.Status.PREP); //This should be set in parent class. 382 bundleJob.setCreatedTime(new Date()); 383 bundleJob.setUser(conf.get(OozieClient.USER_NAME)); 384 String group = ConfigUtils.getWithDeprecatedCheck(conf, OozieClient.JOB_ACL, OozieClient.GROUP_NAME, null); 385 bundleJob.setGroup(group); 386 bundleJob.setConf(XmlUtils.prettyPrint(conf).toString()); 387 bundleJob.setJobXml(resolvedJobXml); 388 Element jobElement = XmlUtils.parseXml(resolvedJobXml); 389 Element controlsElement = jobElement.getChild("controls", jobElement.getNamespace()); 390 if (controlsElement != null) { 391 Element kickoffTimeElement = controlsElement.getChild("kick-off-time", jobElement.getNamespace()); 392 if (kickoffTimeElement != null && !kickoffTimeElement.getValue().isEmpty()) { 393 Date kickoffTime = DateUtils.parseDateOozieTZ(kickoffTimeElement.getValue()); 394 bundleJob.setKickoffTime(kickoffTime); 395 } 396 } 397 bundleJob.setLastModifiedTime(new Date()); 398 399 if (!dryrun) { 400 BundleJobQueryExecutor.getInstance().insert(bundleJob); 401 } 402 } 403 catch (Exception ex) { 404 throw new CommandException(ErrorCode.E1301, ex.getMessage(), ex); 405 } 406 return jobId; 407 } 408 409 /* (non-Javadoc) 410 * @see org.apache.oozie.command.TransitionXCommand#getJob() 411 */ 412 @Override 413 public Job getJob() { 414 return bundleBean; 415 } 416 417 public static ELEvaluator createELEvaluatorForGroup(Configuration conf, String group) { 418 ELEvaluator eval = Services.get().get(ELService.class).createEvaluator(group); 419 setConfigToEval(eval, conf); 420 return eval; 421 } 422 423 private static void setConfigToEval(ELEvaluator eval, Configuration conf) { 424 for (Map.Entry<String, String> entry : conf) { 425 eval.setVariable(entry.getKey(), entry.getValue().trim()); 426 } 427 } 428 429 /** 430 * Resolve job xml with conf 431 * 432 * @param bundleXml bundle job xml 433 * @param conf job configuration 434 * @return resolved job xml 435 * @throws BundleJobException thrown if failed to resolve variables 436 */ 437 private String resolvedVarsandFunctions(String bundleXml, Configuration conf) throws BundleJobException { 438 ELEvaluator eval; 439 try { 440 eval = createELEvaluatorForGroup(conf, "bundle-submit"); 441 return eval.evaluate(bundleXml, String.class); 442 } 443 catch (Exception e) { 444 throw new BundleJobException(ErrorCode.E1004, e.getMessage(), e); 445 } 446 } 447 448 /** 449 * Create ELEvaluator 450 * 451 * @param conf job configuration 452 * @return ELEvaluator the evaluator for el function 453 * @throws BundleJobException thrown if failed to create evaluator 454 */ 455 public ELEvaluator createEvaluator(Configuration conf) throws BundleJobException { 456 ELEvaluator eval; 457 ELEvaluator.Context context; 458 try { 459 context = new ELEvaluator.Context(); 460 eval = new ELEvaluator(context); 461 for (Map.Entry<String, String> entry : conf) { 462 eval.setVariable(entry.getKey(), entry.getValue()); 463 } 464 } 465 catch (Exception e) { 466 throw new BundleJobException(ErrorCode.E1004, e.getMessage(), e); 467 } 468 return eval; 469 } 470 471 /** 472 * Verify the uniqueness of coordinator names 473 * 474 * @param resolved job xml 475 * @throws CommandException thrown if failed to verify the uniqueness of coordinator names 476 */ 477 @SuppressWarnings("unchecked") 478 private Void verifyCoordNameUnique(String resolvedJobXml) throws CommandException { 479 Set<String> set = new HashSet<String>(); 480 try { 481 Element bAppXml = XmlUtils.parseXml(resolvedJobXml); 482 List<Element> coordElems = bAppXml.getChildren("coordinator", bAppXml.getNamespace()); 483 for (Element elem : coordElems) { 484 Attribute name = elem.getAttribute("name"); 485 if (name != null) { 486 String coordName = name.getValue(); 487 try { 488 coordName = ELUtils.resolveAppName(name.getValue(), conf); 489 } 490 catch (Exception e) { 491 throw new CommandException(ErrorCode.E1321, e.getMessage(), e); 492 } 493 if (set.contains(coordName)) { 494 throw new CommandException(ErrorCode.E1304, name); 495 } 496 set.add(coordName); 497 } 498 else { 499 throw new CommandException(ErrorCode.E1305); 500 } 501 } 502 } 503 catch (JDOMException jex) { 504 throw new CommandException(ErrorCode.E1301, jex.getMessage(), jex); 505 } 506 507 return null; 508 } 509 510 /* (non-Javadoc) 511 * @see org.apache.oozie.command.TransitionXCommand#updateJob() 512 */ 513 @Override 514 public void updateJob() throws CommandException { 515 } 516 517 @Override 518 public void performWrites() throws CommandException { 519 } 520}