001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.command.bundle; 019 020import java.io.IOException; 021import java.io.InputStreamReader; 022import java.io.Reader; 023import java.io.StringReader; 024import java.io.StringWriter; 025import java.net.URI; 026import java.net.URISyntaxException; 027import java.util.Date; 028import java.util.HashSet; 029import java.util.List; 030import java.util.Map; 031import java.util.Set; 032 033import javax.xml.transform.stream.StreamSource; 034import javax.xml.validation.Validator; 035 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.fs.FileSystem; 038import org.apache.hadoop.fs.Path; 039import org.apache.oozie.BundleJobBean; 040import org.apache.oozie.ErrorCode; 041import org.apache.oozie.client.Job; 042import org.apache.oozie.client.OozieClient; 043import org.apache.oozie.command.CommandException; 044import org.apache.oozie.command.PreconditionException; 045import org.apache.oozie.command.SubmitTransitionXCommand; 046import org.apache.oozie.executor.jpa.BundleJobQueryExecutor; 047import org.apache.oozie.service.HadoopAccessorException; 048import org.apache.oozie.service.HadoopAccessorService; 049import org.apache.oozie.service.SchemaService; 050import org.apache.oozie.service.Services; 051import org.apache.oozie.service.UUIDService; 052import org.apache.oozie.service.SchemaService.SchemaName; 053import org.apache.oozie.service.UUIDService.ApplicationType; 054import org.apache.oozie.util.ConfigUtils; 055import org.apache.oozie.util.DateUtils; 056import org.apache.oozie.util.ELEvaluator; 057import org.apache.oozie.util.ELUtils; 058import org.apache.oozie.util.IOUtils; 059import org.apache.oozie.util.InstrumentUtils; 060import org.apache.oozie.util.LogUtils; 061import org.apache.oozie.util.ParamChecker; 062import org.apache.oozie.util.PropertiesUtils; 063import org.apache.oozie.util.XConfiguration; 064import org.apache.oozie.util.XmlUtils; 065import org.apache.oozie.util.ParameterVerifier; 066import org.jdom.Attribute; 067import org.jdom.Element; 068import org.jdom.JDOMException; 069import org.xml.sax.SAXException; 070 071/** 072 * This Command will submit the bundle. 073 */ 074public class BundleSubmitXCommand extends SubmitTransitionXCommand { 075 076 private Configuration conf; 077 public static final String CONFIG_DEFAULT = "bundle-config-default.xml"; 078 public static final String BUNDLE_XML_FILE = "bundle.xml"; 079 private final BundleJobBean bundleBean = new BundleJobBean(); 080 private String jobId; 081 private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>(); 082 private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>(); 083 084 static { 085 String[] badUserProps = { PropertiesUtils.YEAR, PropertiesUtils.MONTH, PropertiesUtils.DAY, 086 PropertiesUtils.HOUR, PropertiesUtils.MINUTE, PropertiesUtils.DAYS, PropertiesUtils.HOURS, 087 PropertiesUtils.MINUTES, PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB, 088 PropertiesUtils.TB, PropertiesUtils.PB, PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN, 089 PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN, PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS }; 090 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES); 091 092 String[] badDefaultProps = { PropertiesUtils.HADOOP_USER}; 093 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES); 094 PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES); 095 } 096 097 /** 098 * Constructor to create the bundle submit command. 099 * 100 * @param conf configuration for bundle job 101 */ 102 public BundleSubmitXCommand(Configuration conf) { 103 super("bundle_submit", "bundle_submit", 1); 104 this.conf = ParamChecker.notNull(conf, "conf"); 105 } 106 107 /** 108 * Constructor to create the bundle submit command. 109 * 110 * @param dryrun true if dryrun is enable 111 * @param conf configuration for bundle job 112 */ 113 public BundleSubmitXCommand(boolean dryrun, Configuration conf) { 114 this(conf); 115 this.dryrun = dryrun; 116 } 117 118 /* (non-Javadoc) 119 * @see org.apache.oozie.command.SubmitTransitionXCommand#submit() 120 */ 121 @Override 122 protected String submit() throws CommandException { 123 LOG.info("STARTED Bundle Submit"); 124 try { 125 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation()); 126 127 ParameterVerifier.verifyParameters(conf, XmlUtils.parseXml(bundleBean.getOrigJobXml())); 128 129 String jobXmlWithNoComment = XmlUtils.removeComments(this.bundleBean.getOrigJobXml().toString()); 130 // Resolving all variables in the job properties. 131 // This ensures the Hadoop Configuration semantics is preserved. 132 XConfiguration resolvedVarsConf = new XConfiguration(); 133 for (Map.Entry<String, String> entry : conf) { 134 resolvedVarsConf.set(entry.getKey(), conf.get(entry.getKey())); 135 } 136 conf = resolvedVarsConf; 137 138 String resolvedJobXml = resolvedVars(jobXmlWithNoComment, conf); 139 140 //verify the uniqueness of coord names 141 verifyCoordNameUnique(resolvedJobXml); 142 this.jobId = storeToDB(bundleBean, resolvedJobXml); 143 LogUtils.setLogInfo(bundleBean, logInfo); 144 145 if (dryrun) { 146 Date startTime = bundleBean.getStartTime(); 147 long startTimeMilli = startTime.getTime(); 148 long endTimeMilli = startTimeMilli + (3600 * 1000); 149 Date jobEndTime = bundleBean.getEndTime(); 150 Date endTime = new Date(endTimeMilli); 151 if (endTime.compareTo(jobEndTime) > 0) { 152 endTime = jobEndTime; 153 } 154 jobId = bundleBean.getId(); 155 LOG.info("[" + jobId + "]: Update status to PREP"); 156 bundleBean.setStatus(Job.Status.PREP); 157 try { 158 new XConfiguration(new StringReader(bundleBean.getConf())); 159 } 160 catch (IOException e1) { 161 LOG.warn("Configuration parse error. read from DB :" + bundleBean.getConf(), e1); 162 } 163 String output = bundleBean.getJobXml() + System.getProperty("line.separator"); 164 return output; 165 } 166 else { 167 if (bundleBean.getKickoffTime() == null) { 168 // If there is no KickOffTime, default kickoff is NOW. 169 LOG.debug("Since kickoff time is not defined for job id " + jobId 170 + ". Queuing and BundleStartXCommand immediately after submission"); 171 queue(new BundleStartXCommand(jobId)); 172 } 173 } 174 } 175 catch (Exception ex) { 176 throw new CommandException(ErrorCode.E1310, ex.getMessage(), ex); 177 } 178 LOG.info("ENDED Bundle Submit"); 179 return this.jobId; 180 } 181 182 /* (non-Javadoc) 183 * @see org.apache.oozie.command.TransitionXCommand#notifyParent() 184 */ 185 @Override 186 public void notifyParent() throws CommandException { 187 } 188 189 /* (non-Javadoc) 190 * @see org.apache.oozie.command.XCommand#getEntityKey() 191 */ 192 @Override 193 public String getEntityKey() { 194 return null; 195 } 196 197 /* (non-Javadoc) 198 * @see org.apache.oozie.command.XCommand#isLockRequired() 199 */ 200 @Override 201 protected boolean isLockRequired() { 202 return false; 203 } 204 205 @Override 206 protected void loadState() throws CommandException { 207 } 208 209 @Override 210 protected void verifyPrecondition() throws CommandException, PreconditionException { 211 } 212 213 @Override 214 protected void eagerLoadState() throws CommandException { 215 } 216 217 @Override 218 protected void eagerVerifyPrecondition() throws CommandException, PreconditionException { 219 try { 220 mergeDefaultConfig(); 221 String appXml = readAndValidateXml(); 222 bundleBean.setOrigJobXml(appXml); 223 LOG.debug("jobXml after initial validation " + XmlUtils.prettyPrint(appXml).toString()); 224 } 225 catch (BundleJobException ex) { 226 LOG.warn("BundleJobException: ", ex); 227 throw new CommandException(ex); 228 } 229 catch (IllegalArgumentException iex) { 230 LOG.warn("IllegalArgumentException: ", iex); 231 throw new CommandException(ErrorCode.E1310, iex.getMessage(), iex); 232 } 233 catch (Exception ex) { 234 LOG.warn("Exception: ", ex); 235 throw new CommandException(ErrorCode.E1310, ex.getMessage(), ex); 236 } 237 } 238 239 /** 240 * Merge default configuration with user-defined configuration. 241 * 242 * @throws CommandException thrown if failed to merge configuration 243 */ 244 protected void mergeDefaultConfig() throws CommandException { 245 Path configDefault = null; 246 try { 247 String bundleAppPathStr = conf.get(OozieClient.BUNDLE_APP_PATH); 248 Path bundleAppPath = new Path(bundleAppPathStr); 249 String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME); 250 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 251 Configuration fsConf = has.createJobConf(bundleAppPath.toUri().getAuthority()); 252 FileSystem fs = has.createFileSystem(user, bundleAppPath.toUri(), fsConf); 253 254 // app path could be a directory 255 if (!fs.isFile(bundleAppPath)) { 256 configDefault = new Path(bundleAppPath, CONFIG_DEFAULT); 257 } else { 258 configDefault = new Path(bundleAppPath.getParent(), CONFIG_DEFAULT); 259 } 260 261 if (fs.exists(configDefault)) { 262 Configuration defaultConf = new XConfiguration(fs.open(configDefault)); 263 PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES); 264 XConfiguration.injectDefaults(defaultConf, conf); 265 } 266 else { 267 LOG.info("configDefault Doesn't exist " + configDefault); 268 } 269 PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES); 270 } 271 catch (IOException e) { 272 throw new CommandException(ErrorCode.E0702, e.getMessage() + " : Problem reading default config " 273 + configDefault, e); 274 } 275 catch (HadoopAccessorException e) { 276 throw new CommandException(e); 277 } 278 LOG.debug("Merged CONF :" + XmlUtils.prettyPrint(conf).toString()); 279 } 280 281 /** 282 * Read the application XML and validate against bundle Schema 283 * 284 * @return validated bundle XML 285 * @throws BundleJobException thrown if failed to read or validate xml 286 */ 287 private String readAndValidateXml() throws BundleJobException { 288 String appPath = ParamChecker.notEmpty(conf.get(OozieClient.BUNDLE_APP_PATH), OozieClient.BUNDLE_APP_PATH); 289 String bundleXml = readDefinition(appPath); 290 validateXml(bundleXml); 291 return bundleXml; 292 } 293 294 /** 295 * Read bundle definition. 296 * 297 * @param appPath application path. 298 * @param user user name. 299 * @param group group name. 300 * @return bundle definition. 301 * @throws BundleJobException thrown if the definition could not be read. 302 */ 303 protected String readDefinition(String appPath) throws BundleJobException { 304 String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME); 305 //Configuration confHadoop = CoordUtils.getHadoopConf(conf); 306 try { 307 URI uri = new URI(appPath); 308 LOG.debug("user =" + user); 309 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 310 Configuration fsConf = has.createJobConf(uri.getAuthority()); 311 FileSystem fs = has.createFileSystem(user, uri, fsConf); 312 Path appDefPath = null; 313 314 // app path could be a directory 315 Path path = new Path(uri.getPath()); 316 if (!fs.isFile(path)) { 317 appDefPath = new Path(path, BUNDLE_XML_FILE); 318 } else { 319 appDefPath = path; 320 } 321 322 Reader reader = new InputStreamReader(fs.open(appDefPath)); 323 StringWriter writer = new StringWriter(); 324 IOUtils.copyCharStream(reader, writer); 325 return writer.toString(); 326 } 327 catch (IOException ex) { 328 LOG.warn("IOException :" + XmlUtils.prettyPrint(conf), ex); 329 throw new BundleJobException(ErrorCode.E1301, ex.getMessage(), ex); 330 } 331 catch (URISyntaxException ex) { 332 LOG.warn("URISyException :" + ex.getMessage()); 333 throw new BundleJobException(ErrorCode.E1302, appPath, ex.getMessage(), ex); 334 } 335 catch (HadoopAccessorException ex) { 336 throw new BundleJobException(ex); 337 } 338 catch (Exception ex) { 339 LOG.warn("Exception :", ex); 340 throw new BundleJobException(ErrorCode.E1301, ex.getMessage(), ex); 341 } 342 } 343 344 /** 345 * Validate against Bundle XSD file 346 * 347 * @param xmlContent input bundle xml 348 * @throws BundleJobException thrown if failed to validate xml 349 */ 350 private void validateXml(String xmlContent) throws BundleJobException { 351 javax.xml.validation.Schema schema = Services.get().get(SchemaService.class).getSchema(SchemaName.BUNDLE); 352 Validator validator = schema.newValidator(); 353 try { 354 validator.validate(new StreamSource(new StringReader(xmlContent))); 355 } 356 catch (SAXException ex) { 357 LOG.warn("SAXException :", ex); 358 throw new BundleJobException(ErrorCode.E0701, ex.getMessage(), ex); 359 } 360 catch (IOException ex) { 361 LOG.warn("IOException :", ex); 362 throw new BundleJobException(ErrorCode.E0702, ex.getMessage(), ex); 363 } 364 } 365 366 /** 367 * Write a Bundle Job into database 368 * 369 * @param Bundle job bean 370 * @return job id 371 * @throws CommandException thrown if failed to store bundle job bean to db 372 */ 373 private String storeToDB(BundleJobBean bundleJob, String resolvedJobXml) throws CommandException { 374 try { 375 jobId = Services.get().get(UUIDService.class).generateId(ApplicationType.BUNDLE); 376 377 bundleJob.setId(jobId); 378 String name = XmlUtils.parseXml(bundleBean.getOrigJobXml()).getAttributeValue("name"); 379 name = ELUtils.resolveAppName(name, conf); 380 bundleJob.setAppName(name); 381 bundleJob.setAppPath(conf.get(OozieClient.BUNDLE_APP_PATH)); 382 // bundleJob.setStatus(BundleJob.Status.PREP); //This should be set in parent class. 383 bundleJob.setCreatedTime(new Date()); 384 bundleJob.setUser(conf.get(OozieClient.USER_NAME)); 385 String group = ConfigUtils.getWithDeprecatedCheck(conf, OozieClient.JOB_ACL, OozieClient.GROUP_NAME, null); 386 bundleJob.setGroup(group); 387 bundleJob.setConf(XmlUtils.prettyPrint(conf).toString()); 388 bundleJob.setJobXml(resolvedJobXml); 389 Element jobElement = XmlUtils.parseXml(resolvedJobXml); 390 Element controlsElement = jobElement.getChild("controls", jobElement.getNamespace()); 391 if (controlsElement != null) { 392 Element kickoffTimeElement = controlsElement.getChild("kick-off-time", jobElement.getNamespace()); 393 if (kickoffTimeElement != null && !kickoffTimeElement.getValue().isEmpty()) { 394 Date kickoffTime = DateUtils.parseDateOozieTZ(kickoffTimeElement.getValue()); 395 bundleJob.setKickoffTime(kickoffTime); 396 } 397 } 398 bundleJob.setLastModifiedTime(new Date()); 399 400 if (!dryrun) { 401 BundleJobQueryExecutor.getInstance().insert(bundleJob); 402 } 403 } 404 catch (Exception ex) { 405 throw new CommandException(ErrorCode.E1301, ex.getMessage(), ex); 406 } 407 return jobId; 408 } 409 410 /* (non-Javadoc) 411 * @see org.apache.oozie.command.TransitionXCommand#getJob() 412 */ 413 @Override 414 public Job getJob() { 415 return bundleBean; 416 } 417 418 /** 419 * Resolve job xml with conf 420 * 421 * @param bundleXml bundle job xml 422 * @param conf job configuration 423 * @return resolved job xml 424 * @throws BundleJobException thrown if failed to resolve variables 425 */ 426 private String resolvedVars(String bundleXml, Configuration conf) throws BundleJobException { 427 try { 428 ELEvaluator eval = createEvaluator(conf); 429 return eval.evaluate(bundleXml, String.class); 430 } 431 catch (Exception e) { 432 throw new BundleJobException(ErrorCode.E1004, e.getMessage(), e); 433 } 434 } 435 436 /** 437 * Create ELEvaluator 438 * 439 * @param conf job configuration 440 * @return ELEvaluator the evaluator for el function 441 * @throws BundleJobException thrown if failed to create evaluator 442 */ 443 public ELEvaluator createEvaluator(Configuration conf) throws BundleJobException { 444 ELEvaluator eval; 445 ELEvaluator.Context context; 446 try { 447 context = new ELEvaluator.Context(); 448 eval = new ELEvaluator(context); 449 for (Map.Entry<String, String> entry : conf) { 450 eval.setVariable(entry.getKey(), entry.getValue()); 451 } 452 } 453 catch (Exception e) { 454 throw new BundleJobException(ErrorCode.E1004, e.getMessage(), e); 455 } 456 return eval; 457 } 458 459 /** 460 * Verify the uniqueness of coordinator names 461 * 462 * @param resolved job xml 463 * @throws CommandException thrown if failed to verify the uniqueness of coordinator names 464 */ 465 @SuppressWarnings("unchecked") 466 private Void verifyCoordNameUnique(String resolvedJobXml) throws CommandException { 467 Set<String> set = new HashSet<String>(); 468 try { 469 Element bAppXml = XmlUtils.parseXml(resolvedJobXml); 470 List<Element> coordElems = bAppXml.getChildren("coordinator", bAppXml.getNamespace()); 471 for (Element elem : coordElems) { 472 Attribute name = elem.getAttribute("name"); 473 if (name != null) { 474 if (set.contains(name.getValue())) { 475 throw new CommandException(ErrorCode.E1304, name); 476 } 477 set.add(name.getValue()); 478 } 479 else { 480 throw new CommandException(ErrorCode.E1305); 481 } 482 } 483 } 484 catch (JDOMException jex) { 485 throw new CommandException(ErrorCode.E1301, jex.getMessage(), jex); 486 } 487 488 return null; 489 } 490 491 /* (non-Javadoc) 492 * @see org.apache.oozie.command.TransitionXCommand#updateJob() 493 */ 494 @Override 495 public void updateJob() throws CommandException { 496 } 497 498 @Override 499 public void performWrites() throws CommandException { 500 } 501}