001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.command.bundle; 020 021import java.io.IOException; 022import java.io.InputStreamReader; 023import java.io.Reader; 024import java.io.StringReader; 025import java.io.StringWriter; 026import java.net.URI; 027import java.net.URISyntaxException; 028import java.util.Date; 029import java.util.HashSet; 030import java.util.List; 031import java.util.Map; 032import java.util.Set; 033 034import javax.xml.transform.stream.StreamSource; 035import javax.xml.validation.Validator; 036 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.fs.FileSystem; 039import org.apache.hadoop.fs.Path; 040import org.apache.oozie.BundleJobBean; 041import org.apache.oozie.ErrorCode; 042import org.apache.oozie.client.Job; 043import org.apache.oozie.client.OozieClient; 044import org.apache.oozie.command.CommandException; 045import org.apache.oozie.command.PreconditionException; 046import org.apache.oozie.command.SubmitTransitionXCommand; 047import org.apache.oozie.executor.jpa.BundleJobQueryExecutor; 048import org.apache.oozie.service.ELService; 049import org.apache.oozie.service.HadoopAccessorException; 050import org.apache.oozie.service.HadoopAccessorService; 051import org.apache.oozie.service.SchemaService; 052import org.apache.oozie.service.SchemaService.SchemaName; 053import org.apache.oozie.service.Services; 054import org.apache.oozie.service.UUIDService; 055import org.apache.oozie.service.UUIDService.ApplicationType; 056import org.apache.oozie.util.ConfigUtils; 057import org.apache.oozie.util.DateUtils; 058import org.apache.oozie.util.ELEvaluator; 059import org.apache.oozie.util.ELUtils; 060import org.apache.oozie.util.IOUtils; 061import org.apache.oozie.util.InstrumentUtils; 062import org.apache.oozie.util.LogUtils; 063import org.apache.oozie.util.ParamChecker; 064import org.apache.oozie.util.ParameterVerifier; 065import org.apache.oozie.util.PropertiesUtils; 066import org.apache.oozie.util.XConfiguration; 067import org.apache.oozie.util.XmlUtils; 068import org.jdom.Attribute; 069import org.jdom.Element; 070import org.jdom.JDOMException; 071import org.xml.sax.SAXException; 072 073/** 074 * This Command will submit the bundle. 075 */ 076public class BundleSubmitXCommand extends SubmitTransitionXCommand { 077 078 private Configuration conf; 079 public static final String CONFIG_DEFAULT = "bundle-config-default.xml"; 080 public static final String BUNDLE_XML_FILE = "bundle.xml"; 081 private final BundleJobBean bundleBean = new BundleJobBean(); 082 private String jobId; 083 private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>(); 084 private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>(); 085 086 static { 087 String[] badUserProps = { PropertiesUtils.YEAR, PropertiesUtils.MONTH, PropertiesUtils.DAY, 088 PropertiesUtils.HOUR, PropertiesUtils.MINUTE, PropertiesUtils.DAYS, PropertiesUtils.HOURS, 089 PropertiesUtils.MINUTES, PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB, 090 PropertiesUtils.TB, PropertiesUtils.PB, PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN, 091 PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN, PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS }; 092 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES); 093 094 String[] badDefaultProps = { PropertiesUtils.HADOOP_USER}; 095 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES); 096 PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES); 097 } 098 099 /** 100 * Constructor to create the bundle submit command. 101 * 102 * @param conf configuration for bundle job 103 */ 104 public BundleSubmitXCommand(Configuration conf) { 105 super("bundle_submit", "bundle_submit", 1); 106 this.conf = ParamChecker.notNull(conf, "conf"); 107 } 108 109 /** 110 * Constructor to create the bundle submit command. 111 * 112 * @param dryrun true if dryrun is enable 113 * @param conf configuration for bundle job 114 */ 115 public BundleSubmitXCommand(boolean dryrun, Configuration conf) { 116 this(conf); 117 this.dryrun = dryrun; 118 } 119 120 /* (non-Javadoc) 121 * @see org.apache.oozie.command.SubmitTransitionXCommand#submit() 122 */ 123 @Override 124 protected String submit() throws CommandException { 125 LOG.info("STARTED Bundle Submit"); 126 try { 127 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation()); 128 129 ParameterVerifier.verifyParameters(conf, XmlUtils.parseXml(bundleBean.getOrigJobXml())); 130 131 String jobXmlWithNoComment = XmlUtils.removeComments(this.bundleBean.getOrigJobXml().toString()); 132 // Resolving all variables in the job properties. 133 // This ensures the Hadoop Configuration semantics is preserved. 134 XConfiguration resolvedVarsConf = new XConfiguration(); 135 for (Map.Entry<String, String> entry : conf) { 136 resolvedVarsConf.set(entry.getKey(), conf.get(entry.getKey())); 137 } 138 conf = resolvedVarsConf; 139 140 String resolvedJobXml = resolvedVarsandFunctions(jobXmlWithNoComment, conf); 141 142 //verify the uniqueness of coord names 143 verifyCoordNameUnique(resolvedJobXml); 144 this.jobId = storeToDB(bundleBean, resolvedJobXml); 145 LogUtils.setLogInfo(bundleBean); 146 147 if (dryrun) { 148 Date startTime = bundleBean.getStartTime(); 149 long startTimeMilli = startTime.getTime(); 150 long endTimeMilli = startTimeMilli + (3600 * 1000); 151 Date jobEndTime = bundleBean.getEndTime(); 152 Date endTime = new Date(endTimeMilli); 153 if (endTime.compareTo(jobEndTime) > 0) { 154 endTime = jobEndTime; 155 } 156 jobId = bundleBean.getId(); 157 LOG.info("[" + jobId + "]: Update status to PREP"); 158 bundleBean.setStatus(Job.Status.PREP); 159 try { 160 new XConfiguration(new StringReader(bundleBean.getConf())); 161 } 162 catch (IOException e1) { 163 LOG.warn("Configuration parse error. read from DB :" + bundleBean.getConf(), e1); 164 } 165 String output = bundleBean.getJobXml() + System.getProperty("line.separator"); 166 return output; 167 } 168 else { 169 if (bundleBean.getKickoffTime() == null) { 170 // If there is no KickOffTime, default kickoff is NOW. 171 LOG.debug("Since kickoff time is not defined for job id " + jobId 172 + ". Queuing and BundleStartXCommand immediately after submission"); 173 queue(new BundleStartXCommand(jobId)); 174 } 175 } 176 } 177 catch (Exception ex) { 178 throw new CommandException(ErrorCode.E1310, ex.getMessage(), ex); 179 } 180 LOG.info("ENDED Bundle Submit"); 181 return this.jobId; 182 } 183 184 /* (non-Javadoc) 185 * @see org.apache.oozie.command.TransitionXCommand#notifyParent() 186 */ 187 @Override 188 public void notifyParent() throws CommandException { 189 } 190 191 /* (non-Javadoc) 192 * @see org.apache.oozie.command.XCommand#getEntityKey() 193 */ 194 @Override 195 public String getEntityKey() { 196 return null; 197 } 198 199 /* (non-Javadoc) 200 * @see org.apache.oozie.command.XCommand#isLockRequired() 201 */ 202 @Override 203 protected boolean isLockRequired() { 204 return false; 205 } 206 207 @Override 208 protected void loadState() throws CommandException { 209 } 210 211 @Override 212 protected void verifyPrecondition() throws CommandException, PreconditionException { 213 } 214 215 @Override 216 protected void eagerLoadState() throws CommandException { 217 } 218 219 @Override 220 protected void eagerVerifyPrecondition() throws CommandException, PreconditionException { 221 try { 222 mergeDefaultConfig(); 223 String appXml = readAndValidateXml(); 224 bundleBean.setOrigJobXml(appXml); 225 LOG.debug("jobXml after initial validation " + XmlUtils.prettyPrint(appXml).toString()); 226 } 227 catch (BundleJobException ex) { 228 LOG.warn("BundleJobException: ", ex); 229 throw new CommandException(ex); 230 } 231 catch (IllegalArgumentException iex) { 232 LOG.warn("IllegalArgumentException: ", iex); 233 throw new CommandException(ErrorCode.E1310, iex.getMessage(), iex); 234 } 235 catch (Exception ex) { 236 LOG.warn("Exception: ", ex); 237 throw new CommandException(ErrorCode.E1310, ex.getMessage(), ex); 238 } 239 } 240 241 /** 242 * Merge default configuration with user-defined configuration. 243 * 244 * @throws CommandException thrown if failed to merge configuration 245 */ 246 protected void mergeDefaultConfig() throws CommandException { 247 Path configDefault = null; 248 try { 249 String bundleAppPathStr = conf.get(OozieClient.BUNDLE_APP_PATH); 250 Path bundleAppPath = new Path(bundleAppPathStr); 251 String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME); 252 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 253 Configuration fsConf = has.createJobConf(bundleAppPath.toUri().getAuthority()); 254 FileSystem fs = has.createFileSystem(user, bundleAppPath.toUri(), fsConf); 255 256 // app path could be a directory 257 if (!fs.isFile(bundleAppPath)) { 258 configDefault = new Path(bundleAppPath, CONFIG_DEFAULT); 259 } else { 260 configDefault = new Path(bundleAppPath.getParent(), CONFIG_DEFAULT); 261 } 262 263 if (fs.exists(configDefault)) { 264 Configuration defaultConf = new XConfiguration(fs.open(configDefault)); 265 PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES); 266 XConfiguration.injectDefaults(defaultConf, conf); 267 } 268 else { 269 LOG.info("configDefault Doesn't exist " + configDefault); 270 } 271 PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES); 272 } 273 catch (IOException e) { 274 throw new CommandException(ErrorCode.E0702, e.getMessage() + " : Problem reading default config " 275 + configDefault, e); 276 } 277 catch (HadoopAccessorException e) { 278 throw new CommandException(e); 279 } 280 LOG.debug("Merged CONF :" + XmlUtils.prettyPrint(conf).toString()); 281 } 282 283 /** 284 * Read the application XML and validate against bundle Schema 285 * 286 * @return validated bundle XML 287 * @throws BundleJobException thrown if failed to read or validate xml 288 */ 289 private String readAndValidateXml() throws BundleJobException { 290 String appPath = ParamChecker.notEmpty(conf.get(OozieClient.BUNDLE_APP_PATH), OozieClient.BUNDLE_APP_PATH); 291 String bundleXml = readDefinition(appPath); 292 validateXml(bundleXml); 293 return bundleXml; 294 } 295 296 /** 297 * Read bundle definition. 298 * 299 * @param appPath application path. 300 * @return bundle definition. 301 * @throws BundleJobException thrown if the definition could not be read. 302 */ 303 protected String readDefinition(String appPath) throws BundleJobException { 304 String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME); 305 //Configuration confHadoop = CoordUtils.getHadoopConf(conf); 306 try { 307 URI uri = new URI(appPath); 308 LOG.debug("user =" + user); 309 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 310 Configuration fsConf = has.createJobConf(uri.getAuthority()); 311 FileSystem fs = has.createFileSystem(user, uri, fsConf); 312 Path appDefPath = null; 313 314 // app path could be a directory 315 Path path = new Path(uri.getPath()); 316 if (!fs.isFile(path)) { 317 appDefPath = new Path(path, BUNDLE_XML_FILE); 318 } else { 319 appDefPath = path; 320 } 321 322 Reader reader = new InputStreamReader(fs.open(appDefPath)); 323 StringWriter writer = new StringWriter(); 324 IOUtils.copyCharStream(reader, writer); 325 return writer.toString(); 326 } 327 catch (IOException ex) { 328 LOG.warn("IOException :" + XmlUtils.prettyPrint(conf), ex); 329 throw new BundleJobException(ErrorCode.E1301, ex.getMessage(), ex); 330 } 331 catch (URISyntaxException ex) { 332 LOG.warn("URISyException :" + ex.getMessage()); 333 throw new BundleJobException(ErrorCode.E1302, appPath, ex.getMessage(), ex); 334 } 335 catch (HadoopAccessorException ex) { 336 throw new BundleJobException(ex); 337 } 338 catch (Exception ex) { 339 LOG.warn("Exception :", ex); 340 throw new BundleJobException(ErrorCode.E1301, ex.getMessage(), ex); 341 } 342 } 343 344 /** 345 * Validate against Bundle XSD file 346 * 347 * @param xmlContent input bundle xml 348 * @throws BundleJobException thrown if failed to validate xml 349 */ 350 private void validateXml(String xmlContent) throws BundleJobException { 351 javax.xml.validation.Schema schema = Services.get().get(SchemaService.class).getSchema(SchemaName.BUNDLE); 352 Validator validator = schema.newValidator(); 353 try { 354 validator.validate(new StreamSource(new StringReader(xmlContent))); 355 } 356 catch (SAXException ex) { 357 LOG.warn("SAXException :", ex); 358 throw new BundleJobException(ErrorCode.E0701, ex.getMessage(), ex); 359 } 360 catch (IOException ex) { 361 LOG.warn("IOException :", ex); 362 throw new BundleJobException(ErrorCode.E0702, ex.getMessage(), ex); 363 } 364 } 365 366 /** 367 * Write a Bundle Job into database 368 * 369 * @param Bundle job bean 370 * @return job id 371 * @throws CommandException thrown if failed to store bundle job bean to db 372 */ 373 private String storeToDB(BundleJobBean bundleJob, String resolvedJobXml) throws CommandException { 374 try { 375 jobId = Services.get().get(UUIDService.class).generateId(ApplicationType.BUNDLE); 376 377 bundleJob.setId(jobId); 378 String name = XmlUtils.parseXml(bundleBean.getOrigJobXml()).getAttributeValue("name"); 379 name = ELUtils.resolveAppName(name, conf); 380 bundleJob.setAppName(name); 381 bundleJob.setAppPath(conf.get(OozieClient.BUNDLE_APP_PATH)); 382 // bundleJob.setStatus(BundleJob.Status.PREP); //This should be set in parent class. 383 bundleJob.setCreatedTime(new Date()); 384 bundleJob.setUser(conf.get(OozieClient.USER_NAME)); 385 String group = ConfigUtils.getWithDeprecatedCheck(conf, OozieClient.JOB_ACL, OozieClient.GROUP_NAME, null); 386 bundleJob.setGroup(group); 387 bundleJob.setConf(XmlUtils.prettyPrint(conf).toString()); 388 bundleJob.setJobXml(resolvedJobXml); 389 Element jobElement = XmlUtils.parseXml(resolvedJobXml); 390 Element controlsElement = jobElement.getChild("controls", jobElement.getNamespace()); 391 if (controlsElement != null) { 392 Element kickoffTimeElement = controlsElement.getChild("kick-off-time", jobElement.getNamespace()); 393 if (kickoffTimeElement != null && !kickoffTimeElement.getValue().isEmpty()) { 394 Date kickoffTime = DateUtils.parseDateOozieTZ(kickoffTimeElement.getValue()); 395 bundleJob.setKickoffTime(kickoffTime); 396 } 397 } 398 bundleJob.setLastModifiedTime(new Date()); 399 400 if (!dryrun) { 401 BundleJobQueryExecutor.getInstance().insert(bundleJob); 402 } 403 } 404 catch (Exception ex) { 405 throw new CommandException(ErrorCode.E1301, ex.getMessage(), ex); 406 } 407 return jobId; 408 } 409 410 /* (non-Javadoc) 411 * @see org.apache.oozie.command.TransitionXCommand#getJob() 412 */ 413 @Override 414 public Job getJob() { 415 return bundleBean; 416 } 417 418 public static ELEvaluator createELEvaluatorForGroup(Configuration conf, String group) { 419 ELEvaluator eval = Services.get().get(ELService.class).createEvaluator(group); 420 setConfigToEval(eval, conf); 421 return eval; 422 } 423 424 private static void setConfigToEval(ELEvaluator eval, Configuration conf) { 425 for (Map.Entry<String, String> entry : conf) { 426 eval.setVariable(entry.getKey(), entry.getValue().trim()); 427 } 428 } 429 430 /** 431 * Resolve job xml with conf 432 * 433 * @param bundleXml bundle job xml 434 * @param conf job configuration 435 * @return resolved job xml 436 * @throws BundleJobException thrown if failed to resolve variables 437 */ 438 private String resolvedVarsandFunctions(String bundleXml, Configuration conf) throws BundleJobException { 439 ELEvaluator eval; 440 try { 441 eval = createELEvaluatorForGroup(conf, "bundle-submit"); 442 return eval.evaluate(bundleXml, String.class); 443 } 444 catch (Exception e) { 445 throw new BundleJobException(ErrorCode.E1004, e.getMessage(), e); 446 } 447 } 448 449 /** 450 * Create ELEvaluator 451 * 452 * @param conf job configuration 453 * @return ELEvaluator the evaluator for el function 454 * @throws BundleJobException thrown if failed to create evaluator 455 */ 456 public ELEvaluator createEvaluator(Configuration conf) throws BundleJobException { 457 ELEvaluator eval; 458 ELEvaluator.Context context; 459 try { 460 context = new ELEvaluator.Context(); 461 eval = new ELEvaluator(context); 462 for (Map.Entry<String, String> entry : conf) { 463 eval.setVariable(entry.getKey(), entry.getValue()); 464 } 465 } 466 catch (Exception e) { 467 throw new BundleJobException(ErrorCode.E1004, e.getMessage(), e); 468 } 469 return eval; 470 } 471 472 /** 473 * Verify the uniqueness of coordinator names 474 * 475 * @param resolved job xml 476 * @throws CommandException thrown if failed to verify the uniqueness of coordinator names 477 */ 478 @SuppressWarnings("unchecked") 479 private Void verifyCoordNameUnique(String resolvedJobXml) throws CommandException { 480 Set<String> set = new HashSet<String>(); 481 try { 482 Element bAppXml = XmlUtils.parseXml(resolvedJobXml); 483 List<Element> coordElems = bAppXml.getChildren("coordinator", bAppXml.getNamespace()); 484 for (Element elem : coordElems) { 485 Attribute name = elem.getAttribute("name"); 486 if (name != null) { 487 String coordName = name.getValue(); 488 try { 489 coordName = ELUtils.resolveAppName(name.getValue(), conf); 490 } 491 catch (Exception e) { 492 throw new CommandException(ErrorCode.E1321, e.getMessage(), e); 493 } 494 if (set.contains(coordName)) { 495 throw new CommandException(ErrorCode.E1304, name); 496 } 497 set.add(coordName); 498 } 499 else { 500 throw new CommandException(ErrorCode.E1305); 501 } 502 } 503 } 504 catch (JDOMException jex) { 505 throw new CommandException(ErrorCode.E1301, jex.getMessage(), jex); 506 } 507 508 return null; 509 } 510 511 /* (non-Javadoc) 512 * @see org.apache.oozie.command.TransitionXCommand#updateJob() 513 */ 514 @Override 515 public void updateJob() throws CommandException { 516 } 517 518 @Override 519 public void performWrites() throws CommandException { 520 } 521}