001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.bundle; 019 020 import java.io.IOException; 021 import java.io.InputStreamReader; 022 import java.io.Reader; 023 import java.io.StringReader; 024 import java.io.StringWriter; 025 import java.net.URI; 026 import java.net.URISyntaxException; 027 import java.util.Date; 028 import java.util.HashSet; 029 import java.util.List; 030 import java.util.Map; 031 import java.util.Set; 032 033 import javax.xml.transform.stream.StreamSource; 034 import javax.xml.validation.Validator; 035 036 import org.apache.hadoop.conf.Configuration; 037 import org.apache.hadoop.fs.FileSystem; 038 import org.apache.hadoop.fs.Path; 039 import org.apache.oozie.BundleJobBean; 040 import org.apache.oozie.ErrorCode; 041 import org.apache.oozie.client.Job; 042 import org.apache.oozie.client.OozieClient; 043 import org.apache.oozie.command.CommandException; 044 import org.apache.oozie.command.PreconditionException; 045 import org.apache.oozie.command.SubmitTransitionXCommand; 046 import org.apache.oozie.executor.jpa.BundleJobInsertJPAExecutor; 047 import org.apache.oozie.service.HadoopAccessorException; 048 import org.apache.oozie.service.HadoopAccessorService; 049 import org.apache.oozie.service.JPAService; 050 import org.apache.oozie.service.SchemaService; 051 import org.apache.oozie.service.Services; 052 import org.apache.oozie.service.UUIDService; 053 import org.apache.oozie.service.SchemaService.SchemaName; 054 import org.apache.oozie.service.UUIDService.ApplicationType; 055 import org.apache.oozie.util.ConfigUtils; 056 import org.apache.oozie.util.DateUtils; 057 import org.apache.oozie.util.ELEvaluator; 058 import org.apache.oozie.util.ELUtils; 059 import org.apache.oozie.util.IOUtils; 060 import org.apache.oozie.util.InstrumentUtils; 061 import org.apache.oozie.util.LogUtils; 062 import org.apache.oozie.util.ParamChecker; 063 import org.apache.oozie.util.PropertiesUtils; 064 import org.apache.oozie.util.XConfiguration; 065 import org.apache.oozie.util.XmlUtils; 066 import org.apache.oozie.util.ParameterVerifier; 067 import org.jdom.Attribute; 068 import org.jdom.Element; 069 import org.jdom.JDOMException; 070 import org.xml.sax.SAXException; 071 072 /** 073 * This Command will submit the bundle. 074 */ 075 public class BundleSubmitXCommand extends SubmitTransitionXCommand { 076 077 private Configuration conf; 078 private final String authToken; 079 public static final String CONFIG_DEFAULT = "bundle-config-default.xml"; 080 public static final String BUNDLE_XML_FILE = "bundle.xml"; 081 private final BundleJobBean bundleBean = new BundleJobBean(); 082 private String jobId; 083 private JPAService jpaService = null; 084 085 private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>(); 086 private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>(); 087 088 static { 089 String[] badUserProps = { PropertiesUtils.YEAR, PropertiesUtils.MONTH, PropertiesUtils.DAY, 090 PropertiesUtils.HOUR, PropertiesUtils.MINUTE, PropertiesUtils.DAYS, PropertiesUtils.HOURS, 091 PropertiesUtils.MINUTES, PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB, 092 PropertiesUtils.TB, PropertiesUtils.PB, PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN, 093 PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN, PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS }; 094 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES); 095 096 String[] badDefaultProps = { PropertiesUtils.HADOOP_USER}; 097 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES); 098 PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES); 099 } 100 101 /** 102 * Constructor to create the bundle submit command. 103 * 104 * @param conf configuration for bundle job 105 * @param authToken to be used for authentication 106 */ 107 public BundleSubmitXCommand(Configuration conf, String authToken) { 108 super("bundle_submit", "bundle_submit", 1); 109 this.conf = ParamChecker.notNull(conf, "conf"); 110 this.authToken = ParamChecker.notEmpty(authToken, "authToken"); 111 } 112 113 /** 114 * Constructor to create the bundle submit command. 115 * 116 * @param dryrun true if dryrun is enable 117 * @param conf configuration for bundle job 118 * @param authToken to be used for authentication 119 */ 120 public BundleSubmitXCommand(boolean dryrun, Configuration conf, String authToken) { 121 this(conf, authToken); 122 this.dryrun = dryrun; 123 } 124 125 /* (non-Javadoc) 126 * @see org.apache.oozie.command.SubmitTransitionXCommand#submit() 127 */ 128 @Override 129 protected String submit() throws CommandException { 130 LOG.info("STARTED Bundle Submit"); 131 try { 132 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation()); 133 134 ParameterVerifier.verifyParameters(conf, XmlUtils.parseXml(bundleBean.getOrigJobXml())); 135 136 String jobXmlWithNoComment = XmlUtils.removeComments(this.bundleBean.getOrigJobXml().toString()); 137 // Resolving all variables in the job properties. 138 // This ensures the Hadoop Configuration semantics is preserved. 139 XConfiguration resolvedVarsConf = new XConfiguration(); 140 for (Map.Entry<String, String> entry : conf) { 141 resolvedVarsConf.set(entry.getKey(), conf.get(entry.getKey())); 142 } 143 conf = resolvedVarsConf; 144 145 String resolvedJobXml = resolvedVars(jobXmlWithNoComment, conf); 146 147 //verify the uniqueness of coord names 148 verifyCoordNameUnique(resolvedJobXml); 149 this.jobId = storeToDB(bundleBean, resolvedJobXml); 150 LogUtils.setLogInfo(bundleBean, logInfo); 151 152 if (dryrun) { 153 Date startTime = bundleBean.getStartTime(); 154 long startTimeMilli = startTime.getTime(); 155 long endTimeMilli = startTimeMilli + (3600 * 1000); 156 Date jobEndTime = bundleBean.getEndTime(); 157 Date endTime = new Date(endTimeMilli); 158 if (endTime.compareTo(jobEndTime) > 0) { 159 endTime = jobEndTime; 160 } 161 jobId = bundleBean.getId(); 162 LOG.info("[" + jobId + "]: Update status to PREP"); 163 bundleBean.setStatus(Job.Status.PREP); 164 try { 165 new XConfiguration(new StringReader(bundleBean.getConf())); 166 } 167 catch (IOException e1) { 168 LOG.warn("Configuration parse error. read from DB :" + bundleBean.getConf(), e1); 169 } 170 String output = bundleBean.getJobXml() + System.getProperty("line.separator"); 171 return output; 172 } 173 else { 174 if (bundleBean.getKickoffTime() == null) { 175 // If there is no KickOffTime, default kickoff is NOW. 176 LOG.debug("Since kickoff time is not defined for job id " + jobId 177 + ". Queuing and BundleStartXCommand immediately after submission"); 178 queue(new BundleStartXCommand(jobId)); 179 } 180 } 181 } 182 catch (Exception ex) { 183 throw new CommandException(ErrorCode.E1310, ex.getMessage(), ex); 184 } 185 LOG.info("ENDED Bundle Submit"); 186 return this.jobId; 187 } 188 189 /* (non-Javadoc) 190 * @see org.apache.oozie.command.TransitionXCommand#notifyParent() 191 */ 192 @Override 193 public void notifyParent() throws CommandException { 194 } 195 196 /* (non-Javadoc) 197 * @see org.apache.oozie.command.XCommand#getEntityKey() 198 */ 199 @Override 200 public String getEntityKey() { 201 return null; 202 } 203 204 /* (non-Javadoc) 205 * @see org.apache.oozie.command.XCommand#isLockRequired() 206 */ 207 @Override 208 protected boolean isLockRequired() { 209 return false; 210 } 211 212 /* (non-Javadoc) 213 * @see org.apache.oozie.command.XCommand#loadState() 214 */ 215 @Override 216 protected void loadState() throws CommandException { 217 } 218 219 /* (non-Javadoc) 220 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 221 */ 222 @Override 223 protected void verifyPrecondition() throws CommandException, PreconditionException { 224 } 225 226 /* (non-Javadoc) 227 * @see org.apache.oozie.command.XCommand#eagerLoadState() 228 */ 229 @Override 230 protected void eagerLoadState() throws CommandException { 231 super.eagerLoadState(); 232 jpaService = Services.get().get(JPAService.class); 233 if (jpaService == null) { 234 throw new CommandException(ErrorCode.E0610); 235 } 236 } 237 238 /* (non-Javadoc) 239 * @see org.apache.oozie.command.XCommand#eagerVerifyPrecondition() 240 */ 241 @Override 242 protected void eagerVerifyPrecondition() throws CommandException, PreconditionException { 243 try { 244 super.eagerVerifyPrecondition(); 245 mergeDefaultConfig(); 246 String appXml = readAndValidateXml(); 247 bundleBean.setOrigJobXml(appXml); 248 LOG.debug("jobXml after initial validation " + XmlUtils.prettyPrint(appXml).toString()); 249 } 250 catch (BundleJobException ex) { 251 LOG.warn("BundleJobException: ", ex); 252 throw new CommandException(ex); 253 } 254 catch (IllegalArgumentException iex) { 255 LOG.warn("IllegalArgumentException: ", iex); 256 throw new CommandException(ErrorCode.E1310, iex.getMessage(), iex); 257 } 258 catch (Exception ex) { 259 LOG.warn("Exception: ", ex); 260 throw new CommandException(ErrorCode.E1310, ex.getMessage(), ex); 261 } 262 } 263 264 /** 265 * Merge default configuration with user-defined configuration. 266 * 267 * @throws CommandException thrown if failed to merge configuration 268 */ 269 protected void mergeDefaultConfig() throws CommandException { 270 Path configDefault = null; 271 try { 272 String bundleAppPathStr = conf.get(OozieClient.BUNDLE_APP_PATH); 273 Path bundleAppPath = new Path(bundleAppPathStr); 274 String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME); 275 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 276 Configuration fsConf = has.createJobConf(bundleAppPath.toUri().getAuthority()); 277 FileSystem fs = has.createFileSystem(user, bundleAppPath.toUri(), fsConf); 278 279 // app path could be a directory 280 if (!fs.isFile(bundleAppPath)) { 281 configDefault = new Path(bundleAppPath, CONFIG_DEFAULT); 282 } else { 283 configDefault = new Path(bundleAppPath.getParent(), CONFIG_DEFAULT); 284 } 285 286 if (fs.exists(configDefault)) { 287 Configuration defaultConf = new XConfiguration(fs.open(configDefault)); 288 PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES); 289 XConfiguration.injectDefaults(defaultConf, conf); 290 } 291 else { 292 LOG.info("configDefault Doesn't exist " + configDefault); 293 } 294 PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES); 295 } 296 catch (IOException e) { 297 throw new CommandException(ErrorCode.E0702, e.getMessage() + " : Problem reading default config " 298 + configDefault, e); 299 } 300 catch (HadoopAccessorException e) { 301 throw new CommandException(e); 302 } 303 LOG.debug("Merged CONF :" + XmlUtils.prettyPrint(conf).toString()); 304 } 305 306 /** 307 * Read the application XML and validate against bundle Schema 308 * 309 * @return validated bundle XML 310 * @throws BundleJobException thrown if failed to read or validate xml 311 */ 312 private String readAndValidateXml() throws BundleJobException { 313 String appPath = ParamChecker.notEmpty(conf.get(OozieClient.BUNDLE_APP_PATH), OozieClient.BUNDLE_APP_PATH); 314 String bundleXml = readDefinition(appPath); 315 validateXml(bundleXml); 316 return bundleXml; 317 } 318 319 /** 320 * Read bundle definition. 321 * 322 * @param appPath application path. 323 * @param user user name. 324 * @param group group name. 325 * @param autToken authentication token. 326 * @return bundle definition. 327 * @throws BundleJobException thrown if the definition could not be read. 328 */ 329 protected String readDefinition(String appPath) throws BundleJobException { 330 String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME); 331 //Configuration confHadoop = CoordUtils.getHadoopConf(conf); 332 try { 333 URI uri = new URI(appPath); 334 LOG.debug("user =" + user); 335 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 336 Configuration fsConf = has.createJobConf(uri.getAuthority()); 337 FileSystem fs = has.createFileSystem(user, uri, fsConf); 338 Path appDefPath = null; 339 340 // app path could be a directory 341 Path path = new Path(uri.getPath()); 342 if (!fs.isFile(path)) { 343 appDefPath = new Path(path, BUNDLE_XML_FILE); 344 } else { 345 appDefPath = path; 346 } 347 348 Reader reader = new InputStreamReader(fs.open(appDefPath)); 349 StringWriter writer = new StringWriter(); 350 IOUtils.copyCharStream(reader, writer); 351 return writer.toString(); 352 } 353 catch (IOException ex) { 354 LOG.warn("IOException :" + XmlUtils.prettyPrint(conf), ex); 355 throw new BundleJobException(ErrorCode.E1301, ex.getMessage(), ex); 356 } 357 catch (URISyntaxException ex) { 358 LOG.warn("URISyException :" + ex.getMessage()); 359 throw new BundleJobException(ErrorCode.E1302, appPath, ex.getMessage(), ex); 360 } 361 catch (HadoopAccessorException ex) { 362 throw new BundleJobException(ex); 363 } 364 catch (Exception ex) { 365 LOG.warn("Exception :", ex); 366 throw new BundleJobException(ErrorCode.E1301, ex.getMessage(), ex); 367 } 368 } 369 370 /** 371 * Validate against Bundle XSD file 372 * 373 * @param xmlContent input bundle xml 374 * @throws BundleJobException thrown if failed to validate xml 375 */ 376 private void validateXml(String xmlContent) throws BundleJobException { 377 javax.xml.validation.Schema schema = Services.get().get(SchemaService.class).getSchema(SchemaName.BUNDLE); 378 Validator validator = schema.newValidator(); 379 try { 380 validator.validate(new StreamSource(new StringReader(xmlContent))); 381 } 382 catch (SAXException ex) { 383 LOG.warn("SAXException :", ex); 384 throw new BundleJobException(ErrorCode.E0701, ex.getMessage(), ex); 385 } 386 catch (IOException ex) { 387 LOG.warn("IOException :", ex); 388 throw new BundleJobException(ErrorCode.E0702, ex.getMessage(), ex); 389 } 390 } 391 392 /** 393 * Write a Bundle Job into database 394 * 395 * @param Bundle job bean 396 * @return job id 397 * @throws CommandException thrown if failed to store bundle job bean to db 398 */ 399 private String storeToDB(BundleJobBean bundleJob, String resolvedJobXml) throws CommandException { 400 try { 401 jobId = Services.get().get(UUIDService.class).generateId(ApplicationType.BUNDLE); 402 403 bundleJob.setId(jobId); 404 bundleJob.setAuthToken(this.authToken); 405 String name = XmlUtils.parseXml(bundleBean.getOrigJobXml()).getAttributeValue("name"); 406 name = ELUtils.resolveAppName(name, conf); 407 bundleJob.setAppName(name); 408 bundleJob.setAppPath(conf.get(OozieClient.BUNDLE_APP_PATH)); 409 // bundleJob.setStatus(BundleJob.Status.PREP); //This should be set in parent class. 410 bundleJob.setCreatedTime(new Date()); 411 bundleJob.setUser(conf.get(OozieClient.USER_NAME)); 412 String group = ConfigUtils.getWithDeprecatedCheck(conf, OozieClient.JOB_ACL, OozieClient.GROUP_NAME, null); 413 bundleJob.setGroup(group); 414 bundleJob.setConf(XmlUtils.prettyPrint(conf).toString()); 415 bundleJob.setJobXml(resolvedJobXml); 416 Element jobElement = XmlUtils.parseXml(resolvedJobXml); 417 Element controlsElement = jobElement.getChild("controls", jobElement.getNamespace()); 418 if (controlsElement != null) { 419 Element kickoffTimeElement = controlsElement.getChild("kick-off-time", jobElement.getNamespace()); 420 if (kickoffTimeElement != null && !kickoffTimeElement.getValue().isEmpty()) { 421 Date kickoffTime = DateUtils.parseDateOozieTZ(kickoffTimeElement.getValue()); 422 bundleJob.setKickoffTime(kickoffTime); 423 } 424 } 425 bundleJob.setLastModifiedTime(new Date()); 426 427 if (!dryrun) { 428 jpaService.execute(new BundleJobInsertJPAExecutor(bundleJob)); 429 } 430 } 431 catch (Exception ex) { 432 throw new CommandException(ErrorCode.E1301, ex.getMessage(), ex); 433 } 434 return jobId; 435 } 436 437 /* (non-Javadoc) 438 * @see org.apache.oozie.command.TransitionXCommand#getJob() 439 */ 440 @Override 441 public Job getJob() { 442 return bundleBean; 443 } 444 445 /** 446 * Resolve job xml with conf 447 * 448 * @param bundleXml bundle job xml 449 * @param conf job configuration 450 * @return resolved job xml 451 * @throws BundleJobException thrown if failed to resolve variables 452 */ 453 private String resolvedVars(String bundleXml, Configuration conf) throws BundleJobException { 454 try { 455 ELEvaluator eval = createEvaluator(conf); 456 return eval.evaluate(bundleXml, String.class); 457 } 458 catch (Exception e) { 459 throw new BundleJobException(ErrorCode.E1004, e.getMessage(), e); 460 } 461 } 462 463 /** 464 * Create ELEvaluator 465 * 466 * @param conf job configuration 467 * @return ELEvaluator the evaluator for el function 468 * @throws BundleJobException thrown if failed to create evaluator 469 */ 470 public ELEvaluator createEvaluator(Configuration conf) throws BundleJobException { 471 ELEvaluator eval; 472 ELEvaluator.Context context; 473 try { 474 context = new ELEvaluator.Context(); 475 eval = new ELEvaluator(context); 476 for (Map.Entry<String, String> entry : conf) { 477 eval.setVariable(entry.getKey(), entry.getValue()); 478 } 479 } 480 catch (Exception e) { 481 throw new BundleJobException(ErrorCode.E1004, e.getMessage(), e); 482 } 483 return eval; 484 } 485 486 /** 487 * Verify the uniqueness of coordinator names 488 * 489 * @param resolved job xml 490 * @throws CommandException thrown if failed to verify the uniqueness of coordinator names 491 */ 492 @SuppressWarnings("unchecked") 493 private Void verifyCoordNameUnique(String resolvedJobXml) throws CommandException { 494 Set<String> set = new HashSet<String>(); 495 try { 496 Element bAppXml = XmlUtils.parseXml(resolvedJobXml); 497 List<Element> coordElems = bAppXml.getChildren("coordinator", bAppXml.getNamespace()); 498 for (Element elem : coordElems) { 499 Attribute name = elem.getAttribute("name"); 500 if (name != null) { 501 if (set.contains(name.getValue())) { 502 throw new CommandException(ErrorCode.E1304, name); 503 } 504 set.add(name.getValue()); 505 } 506 else { 507 throw new CommandException(ErrorCode.E1305); 508 } 509 } 510 } 511 catch (JDOMException jex) { 512 throw new CommandException(ErrorCode.E1301, jex.getMessage(), jex); 513 } 514 515 return null; 516 } 517 518 /* (non-Javadoc) 519 * @see org.apache.oozie.command.TransitionXCommand#updateJob() 520 */ 521 @Override 522 public void updateJob() throws CommandException { 523 } 524 525 @Override 526 public void performWrites() throws CommandException { 527 } 528 }