001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.bundle; 019 020 import java.io.IOException; 021 import java.io.InputStreamReader; 022 import java.io.Reader; 023 import java.io.StringReader; 024 import java.io.StringWriter; 025 import java.net.URI; 026 import java.net.URISyntaxException; 027 import java.util.Date; 028 import java.util.HashSet; 029 import java.util.List; 030 import java.util.Map; 031 import java.util.Set; 032 033 import javax.xml.transform.stream.StreamSource; 034 import javax.xml.validation.Validator; 035 036 import org.apache.hadoop.conf.Configuration; 037 import org.apache.hadoop.fs.FileSystem; 038 import org.apache.hadoop.fs.Path; 039 import org.apache.oozie.BundleJobBean; 040 import org.apache.oozie.ErrorCode; 041 import org.apache.oozie.client.Job; 042 import org.apache.oozie.client.OozieClient; 043 import org.apache.oozie.command.CommandException; 044 import org.apache.oozie.command.PreconditionException; 045 import org.apache.oozie.command.SubmitTransitionXCommand; 046 import org.apache.oozie.executor.jpa.BundleJobInsertJPAExecutor; 047 import org.apache.oozie.service.HadoopAccessorException; 048 import org.apache.oozie.service.HadoopAccessorService; 049 import org.apache.oozie.service.JPAService; 050 import org.apache.oozie.service.SchemaService; 051 import org.apache.oozie.service.Services; 052 import org.apache.oozie.service.UUIDService; 053 import org.apache.oozie.service.SchemaService.SchemaName; 054 import org.apache.oozie.service.UUIDService.ApplicationType; 055 import org.apache.oozie.util.ConfigUtils; 056 import org.apache.oozie.util.DateUtils; 057 import org.apache.oozie.util.ELEvaluator; 058 import org.apache.oozie.util.ELUtils; 059 import org.apache.oozie.util.IOUtils; 060 import org.apache.oozie.util.InstrumentUtils; 061 import org.apache.oozie.util.LogUtils; 062 import org.apache.oozie.util.ParamChecker; 063 import org.apache.oozie.util.PropertiesUtils; 064 import org.apache.oozie.util.XConfiguration; 065 import org.apache.oozie.util.XmlUtils; 066 import org.apache.oozie.util.ParameterVerifier; 067 import org.jdom.Attribute; 068 import org.jdom.Element; 069 import org.jdom.JDOMException; 070 import org.xml.sax.SAXException; 071 072 /** 073 * This Command will submit the bundle. 074 */ 075 public class BundleSubmitXCommand extends SubmitTransitionXCommand { 076 077 private Configuration conf; 078 public static final String CONFIG_DEFAULT = "bundle-config-default.xml"; 079 public static final String BUNDLE_XML_FILE = "bundle.xml"; 080 private final BundleJobBean bundleBean = new BundleJobBean(); 081 private String jobId; 082 private JPAService jpaService = null; 083 084 private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>(); 085 private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>(); 086 087 static { 088 String[] badUserProps = { PropertiesUtils.YEAR, PropertiesUtils.MONTH, PropertiesUtils.DAY, 089 PropertiesUtils.HOUR, PropertiesUtils.MINUTE, PropertiesUtils.DAYS, PropertiesUtils.HOURS, 090 PropertiesUtils.MINUTES, PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB, 091 PropertiesUtils.TB, PropertiesUtils.PB, PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN, 092 PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN, PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS }; 093 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES); 094 095 String[] badDefaultProps = { PropertiesUtils.HADOOP_USER}; 096 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES); 097 PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES); 098 } 099 100 /** 101 * Constructor to create the bundle submit command. 102 * 103 * @param conf configuration for bundle job 104 */ 105 public BundleSubmitXCommand(Configuration conf) { 106 super("bundle_submit", "bundle_submit", 1); 107 this.conf = ParamChecker.notNull(conf, "conf"); 108 } 109 110 /** 111 * Constructor to create the bundle submit command. 112 * 113 * @param dryrun true if dryrun is enable 114 * @param conf configuration for bundle job 115 */ 116 public BundleSubmitXCommand(boolean dryrun, Configuration conf) { 117 this(conf); 118 this.dryrun = dryrun; 119 } 120 121 /* (non-Javadoc) 122 * @see org.apache.oozie.command.SubmitTransitionXCommand#submit() 123 */ 124 @Override 125 protected String submit() throws CommandException { 126 LOG.info("STARTED Bundle Submit"); 127 try { 128 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation()); 129 130 ParameterVerifier.verifyParameters(conf, XmlUtils.parseXml(bundleBean.getOrigJobXml())); 131 132 String jobXmlWithNoComment = XmlUtils.removeComments(this.bundleBean.getOrigJobXml().toString()); 133 // Resolving all variables in the job properties. 134 // This ensures the Hadoop Configuration semantics is preserved. 135 XConfiguration resolvedVarsConf = new XConfiguration(); 136 for (Map.Entry<String, String> entry : conf) { 137 resolvedVarsConf.set(entry.getKey(), conf.get(entry.getKey())); 138 } 139 conf = resolvedVarsConf; 140 141 String resolvedJobXml = resolvedVars(jobXmlWithNoComment, conf); 142 143 //verify the uniqueness of coord names 144 verifyCoordNameUnique(resolvedJobXml); 145 this.jobId = storeToDB(bundleBean, resolvedJobXml); 146 LogUtils.setLogInfo(bundleBean, logInfo); 147 148 if (dryrun) { 149 Date startTime = bundleBean.getStartTime(); 150 long startTimeMilli = startTime.getTime(); 151 long endTimeMilli = startTimeMilli + (3600 * 1000); 152 Date jobEndTime = bundleBean.getEndTime(); 153 Date endTime = new Date(endTimeMilli); 154 if (endTime.compareTo(jobEndTime) > 0) { 155 endTime = jobEndTime; 156 } 157 jobId = bundleBean.getId(); 158 LOG.info("[" + jobId + "]: Update status to PREP"); 159 bundleBean.setStatus(Job.Status.PREP); 160 try { 161 new XConfiguration(new StringReader(bundleBean.getConf())); 162 } 163 catch (IOException e1) { 164 LOG.warn("Configuration parse error. read from DB :" + bundleBean.getConf(), e1); 165 } 166 String output = bundleBean.getJobXml() + System.getProperty("line.separator"); 167 return output; 168 } 169 else { 170 if (bundleBean.getKickoffTime() == null) { 171 // If there is no KickOffTime, default kickoff is NOW. 172 LOG.debug("Since kickoff time is not defined for job id " + jobId 173 + ". Queuing and BundleStartXCommand immediately after submission"); 174 queue(new BundleStartXCommand(jobId)); 175 } 176 } 177 } 178 catch (Exception ex) { 179 throw new CommandException(ErrorCode.E1310, ex.getMessage(), ex); 180 } 181 LOG.info("ENDED Bundle Submit"); 182 return this.jobId; 183 } 184 185 /* (non-Javadoc) 186 * @see org.apache.oozie.command.TransitionXCommand#notifyParent() 187 */ 188 @Override 189 public void notifyParent() throws CommandException { 190 } 191 192 /* (non-Javadoc) 193 * @see org.apache.oozie.command.XCommand#getEntityKey() 194 */ 195 @Override 196 public String getEntityKey() { 197 return null; 198 } 199 200 /* (non-Javadoc) 201 * @see org.apache.oozie.command.XCommand#isLockRequired() 202 */ 203 @Override 204 protected boolean isLockRequired() { 205 return false; 206 } 207 208 /* (non-Javadoc) 209 * @see org.apache.oozie.command.XCommand#loadState() 210 */ 211 @Override 212 protected void loadState() throws CommandException { 213 } 214 215 /* (non-Javadoc) 216 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 217 */ 218 @Override 219 protected void verifyPrecondition() throws CommandException, PreconditionException { 220 } 221 222 /* (non-Javadoc) 223 * @see org.apache.oozie.command.XCommand#eagerLoadState() 224 */ 225 @Override 226 protected void eagerLoadState() throws CommandException { 227 super.eagerLoadState(); 228 jpaService = Services.get().get(JPAService.class); 229 if (jpaService == null) { 230 throw new CommandException(ErrorCode.E0610); 231 } 232 } 233 234 /* (non-Javadoc) 235 * @see org.apache.oozie.command.XCommand#eagerVerifyPrecondition() 236 */ 237 @Override 238 protected void eagerVerifyPrecondition() throws CommandException, PreconditionException { 239 try { 240 super.eagerVerifyPrecondition(); 241 mergeDefaultConfig(); 242 String appXml = readAndValidateXml(); 243 bundleBean.setOrigJobXml(appXml); 244 LOG.debug("jobXml after initial validation " + XmlUtils.prettyPrint(appXml).toString()); 245 } 246 catch (BundleJobException ex) { 247 LOG.warn("BundleJobException: ", ex); 248 throw new CommandException(ex); 249 } 250 catch (IllegalArgumentException iex) { 251 LOG.warn("IllegalArgumentException: ", iex); 252 throw new CommandException(ErrorCode.E1310, iex.getMessage(), iex); 253 } 254 catch (Exception ex) { 255 LOG.warn("Exception: ", ex); 256 throw new CommandException(ErrorCode.E1310, ex.getMessage(), ex); 257 } 258 } 259 260 /** 261 * Merge default configuration with user-defined configuration. 262 * 263 * @throws CommandException thrown if failed to merge configuration 264 */ 265 protected void mergeDefaultConfig() throws CommandException { 266 Path configDefault = null; 267 try { 268 String bundleAppPathStr = conf.get(OozieClient.BUNDLE_APP_PATH); 269 Path bundleAppPath = new Path(bundleAppPathStr); 270 String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME); 271 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 272 Configuration fsConf = has.createJobConf(bundleAppPath.toUri().getAuthority()); 273 FileSystem fs = has.createFileSystem(user, bundleAppPath.toUri(), fsConf); 274 275 // app path could be a directory 276 if (!fs.isFile(bundleAppPath)) { 277 configDefault = new Path(bundleAppPath, CONFIG_DEFAULT); 278 } else { 279 configDefault = new Path(bundleAppPath.getParent(), CONFIG_DEFAULT); 280 } 281 282 if (fs.exists(configDefault)) { 283 Configuration defaultConf = new XConfiguration(fs.open(configDefault)); 284 PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES); 285 XConfiguration.injectDefaults(defaultConf, conf); 286 } 287 else { 288 LOG.info("configDefault Doesn't exist " + configDefault); 289 } 290 PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES); 291 } 292 catch (IOException e) { 293 throw new CommandException(ErrorCode.E0702, e.getMessage() + " : Problem reading default config " 294 + configDefault, e); 295 } 296 catch (HadoopAccessorException e) { 297 throw new CommandException(e); 298 } 299 LOG.debug("Merged CONF :" + XmlUtils.prettyPrint(conf).toString()); 300 } 301 302 /** 303 * Read the application XML and validate against bundle Schema 304 * 305 * @return validated bundle XML 306 * @throws BundleJobException thrown if failed to read or validate xml 307 */ 308 private String readAndValidateXml() throws BundleJobException { 309 String appPath = ParamChecker.notEmpty(conf.get(OozieClient.BUNDLE_APP_PATH), OozieClient.BUNDLE_APP_PATH); 310 String bundleXml = readDefinition(appPath); 311 validateXml(bundleXml); 312 return bundleXml; 313 } 314 315 /** 316 * Read bundle definition. 317 * 318 * @param appPath application path. 319 * @param user user name. 320 * @param group group name. 321 * @return bundle definition. 322 * @throws BundleJobException thrown if the definition could not be read. 323 */ 324 protected String readDefinition(String appPath) throws BundleJobException { 325 String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME); 326 //Configuration confHadoop = CoordUtils.getHadoopConf(conf); 327 try { 328 URI uri = new URI(appPath); 329 LOG.debug("user =" + user); 330 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 331 Configuration fsConf = has.createJobConf(uri.getAuthority()); 332 FileSystem fs = has.createFileSystem(user, uri, fsConf); 333 Path appDefPath = null; 334 335 // app path could be a directory 336 Path path = new Path(uri.getPath()); 337 if (!fs.isFile(path)) { 338 appDefPath = new Path(path, BUNDLE_XML_FILE); 339 } else { 340 appDefPath = path; 341 } 342 343 Reader reader = new InputStreamReader(fs.open(appDefPath)); 344 StringWriter writer = new StringWriter(); 345 IOUtils.copyCharStream(reader, writer); 346 return writer.toString(); 347 } 348 catch (IOException ex) { 349 LOG.warn("IOException :" + XmlUtils.prettyPrint(conf), ex); 350 throw new BundleJobException(ErrorCode.E1301, ex.getMessage(), ex); 351 } 352 catch (URISyntaxException ex) { 353 LOG.warn("URISyException :" + ex.getMessage()); 354 throw new BundleJobException(ErrorCode.E1302, appPath, ex.getMessage(), ex); 355 } 356 catch (HadoopAccessorException ex) { 357 throw new BundleJobException(ex); 358 } 359 catch (Exception ex) { 360 LOG.warn("Exception :", ex); 361 throw new BundleJobException(ErrorCode.E1301, ex.getMessage(), ex); 362 } 363 } 364 365 /** 366 * Validate against Bundle XSD file 367 * 368 * @param xmlContent input bundle xml 369 * @throws BundleJobException thrown if failed to validate xml 370 */ 371 private void validateXml(String xmlContent) throws BundleJobException { 372 javax.xml.validation.Schema schema = Services.get().get(SchemaService.class).getSchema(SchemaName.BUNDLE); 373 Validator validator = schema.newValidator(); 374 try { 375 validator.validate(new StreamSource(new StringReader(xmlContent))); 376 } 377 catch (SAXException ex) { 378 LOG.warn("SAXException :", ex); 379 throw new BundleJobException(ErrorCode.E0701, ex.getMessage(), ex); 380 } 381 catch (IOException ex) { 382 LOG.warn("IOException :", ex); 383 throw new BundleJobException(ErrorCode.E0702, ex.getMessage(), ex); 384 } 385 } 386 387 /** 388 * Write a Bundle Job into database 389 * 390 * @param Bundle job bean 391 * @return job id 392 * @throws CommandException thrown if failed to store bundle job bean to db 393 */ 394 private String storeToDB(BundleJobBean bundleJob, String resolvedJobXml) throws CommandException { 395 try { 396 jobId = Services.get().get(UUIDService.class).generateId(ApplicationType.BUNDLE); 397 398 bundleJob.setId(jobId); 399 String name = XmlUtils.parseXml(bundleBean.getOrigJobXml()).getAttributeValue("name"); 400 name = ELUtils.resolveAppName(name, conf); 401 bundleJob.setAppName(name); 402 bundleJob.setAppPath(conf.get(OozieClient.BUNDLE_APP_PATH)); 403 // bundleJob.setStatus(BundleJob.Status.PREP); //This should be set in parent class. 404 bundleJob.setCreatedTime(new Date()); 405 bundleJob.setUser(conf.get(OozieClient.USER_NAME)); 406 String group = ConfigUtils.getWithDeprecatedCheck(conf, OozieClient.JOB_ACL, OozieClient.GROUP_NAME, null); 407 bundleJob.setGroup(group); 408 bundleJob.setConf(XmlUtils.prettyPrint(conf).toString()); 409 bundleJob.setJobXml(resolvedJobXml); 410 Element jobElement = XmlUtils.parseXml(resolvedJobXml); 411 Element controlsElement = jobElement.getChild("controls", jobElement.getNamespace()); 412 if (controlsElement != null) { 413 Element kickoffTimeElement = controlsElement.getChild("kick-off-time", jobElement.getNamespace()); 414 if (kickoffTimeElement != null && !kickoffTimeElement.getValue().isEmpty()) { 415 Date kickoffTime = DateUtils.parseDateOozieTZ(kickoffTimeElement.getValue()); 416 bundleJob.setKickoffTime(kickoffTime); 417 } 418 } 419 bundleJob.setLastModifiedTime(new Date()); 420 421 if (!dryrun) { 422 jpaService.execute(new BundleJobInsertJPAExecutor(bundleJob)); 423 } 424 } 425 catch (Exception ex) { 426 throw new CommandException(ErrorCode.E1301, ex.getMessage(), ex); 427 } 428 return jobId; 429 } 430 431 /* (non-Javadoc) 432 * @see org.apache.oozie.command.TransitionXCommand#getJob() 433 */ 434 @Override 435 public Job getJob() { 436 return bundleBean; 437 } 438 439 /** 440 * Resolve job xml with conf 441 * 442 * @param bundleXml bundle job xml 443 * @param conf job configuration 444 * @return resolved job xml 445 * @throws BundleJobException thrown if failed to resolve variables 446 */ 447 private String resolvedVars(String bundleXml, Configuration conf) throws BundleJobException { 448 try { 449 ELEvaluator eval = createEvaluator(conf); 450 return eval.evaluate(bundleXml, String.class); 451 } 452 catch (Exception e) { 453 throw new BundleJobException(ErrorCode.E1004, e.getMessage(), e); 454 } 455 } 456 457 /** 458 * Create ELEvaluator 459 * 460 * @param conf job configuration 461 * @return ELEvaluator the evaluator for el function 462 * @throws BundleJobException thrown if failed to create evaluator 463 */ 464 public ELEvaluator createEvaluator(Configuration conf) throws BundleJobException { 465 ELEvaluator eval; 466 ELEvaluator.Context context; 467 try { 468 context = new ELEvaluator.Context(); 469 eval = new ELEvaluator(context); 470 for (Map.Entry<String, String> entry : conf) { 471 eval.setVariable(entry.getKey(), entry.getValue()); 472 } 473 } 474 catch (Exception e) { 475 throw new BundleJobException(ErrorCode.E1004, e.getMessage(), e); 476 } 477 return eval; 478 } 479 480 /** 481 * Verify the uniqueness of coordinator names 482 * 483 * @param resolved job xml 484 * @throws CommandException thrown if failed to verify the uniqueness of coordinator names 485 */ 486 @SuppressWarnings("unchecked") 487 private Void verifyCoordNameUnique(String resolvedJobXml) throws CommandException { 488 Set<String> set = new HashSet<String>(); 489 try { 490 Element bAppXml = XmlUtils.parseXml(resolvedJobXml); 491 List<Element> coordElems = bAppXml.getChildren("coordinator", bAppXml.getNamespace()); 492 for (Element elem : coordElems) { 493 Attribute name = elem.getAttribute("name"); 494 if (name != null) { 495 if (set.contains(name.getValue())) { 496 throw new CommandException(ErrorCode.E1304, name); 497 } 498 set.add(name.getValue()); 499 } 500 else { 501 throw new CommandException(ErrorCode.E1305); 502 } 503 } 504 } 505 catch (JDOMException jex) { 506 throw new CommandException(ErrorCode.E1301, jex.getMessage(), jex); 507 } 508 509 return null; 510 } 511 512 /* (non-Javadoc) 513 * @see org.apache.oozie.command.TransitionXCommand#updateJob() 514 */ 515 @Override 516 public void updateJob() throws CommandException { 517 } 518 519 @Override 520 public void performWrites() throws CommandException { 521 } 522 }