001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.bundle; 019 020 import java.io.IOException; 021 import java.io.InputStreamReader; 022 import java.io.Reader; 023 import java.io.StringReader; 024 import java.io.StringWriter; 025 import java.net.URI; 026 import java.net.URISyntaxException; 027 import java.util.Date; 028 import java.util.HashSet; 029 import java.util.List; 030 import java.util.Map; 031 import java.util.Set; 032 033 import javax.xml.transform.stream.StreamSource; 034 import javax.xml.validation.Validator; 035 036 import org.apache.hadoop.conf.Configuration; 037 import org.apache.hadoop.fs.FileSystem; 038 import org.apache.hadoop.fs.Path; 039 import org.apache.oozie.BundleJobBean; 040 import org.apache.oozie.ErrorCode; 041 import org.apache.oozie.client.Job; 042 import org.apache.oozie.client.OozieClient; 043 import org.apache.oozie.command.CommandException; 044 import org.apache.oozie.command.PreconditionException; 045 import org.apache.oozie.command.SubmitTransitionXCommand; 046 import org.apache.oozie.executor.jpa.BundleJobInsertJPAExecutor; 047 import org.apache.oozie.service.HadoopAccessorException; 048 import org.apache.oozie.service.HadoopAccessorService; 049 import org.apache.oozie.service.JPAService; 050 import org.apache.oozie.service.SchemaService; 051 import org.apache.oozie.service.Services; 052 import org.apache.oozie.service.UUIDService; 053 import org.apache.oozie.service.WorkflowAppService; 054 import org.apache.oozie.service.SchemaService.SchemaName; 055 import org.apache.oozie.service.UUIDService.ApplicationType; 056 import org.apache.oozie.util.ConfigUtils; 057 import org.apache.oozie.util.DateUtils; 058 import org.apache.oozie.util.ELEvaluator; 059 import org.apache.oozie.util.IOUtils; 060 import org.apache.oozie.util.InstrumentUtils; 061 import org.apache.oozie.util.LogUtils; 062 import org.apache.oozie.util.ParamChecker; 063 import org.apache.oozie.util.PropertiesUtils; 064 import org.apache.oozie.util.XConfiguration; 065 import org.apache.oozie.util.XmlUtils; 066 import org.jdom.Attribute; 067 import org.jdom.Element; 068 import org.jdom.JDOMException; 069 import org.xml.sax.SAXException; 070 071 /** 072 * This Command will submit the bundle. 073 */ 074 public class BundleSubmitXCommand extends SubmitTransitionXCommand { 075 076 private Configuration conf; 077 private final String authToken; 078 public static final String CONFIG_DEFAULT = "bundle-config-default.xml"; 079 public static final String BUNDLE_XML_FILE = "bundle.xml"; 080 private final BundleJobBean bundleBean = new BundleJobBean(); 081 private String jobId; 082 private JPAService jpaService = null; 083 084 private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>(); 085 private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>(); 086 087 static { 088 String[] badUserProps = { PropertiesUtils.YEAR, PropertiesUtils.MONTH, PropertiesUtils.DAY, 089 PropertiesUtils.HOUR, PropertiesUtils.MINUTE, PropertiesUtils.DAYS, PropertiesUtils.HOURS, 090 PropertiesUtils.MINUTES, PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB, 091 PropertiesUtils.TB, PropertiesUtils.PB, PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN, 092 PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN, PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS }; 093 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES); 094 095 String[] badDefaultProps = { PropertiesUtils.HADOOP_USER}; 096 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES); 097 PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES); 098 } 099 100 /** 101 * Constructor to create the bundle submit command. 102 * 103 * @param conf configuration for bundle job 104 * @param authToken to be used for authentication 105 */ 106 public BundleSubmitXCommand(Configuration conf, String authToken) { 107 super("bundle_submit", "bundle_submit", 1); 108 this.conf = ParamChecker.notNull(conf, "conf"); 109 this.authToken = ParamChecker.notEmpty(authToken, "authToken"); 110 } 111 112 /** 113 * Constructor to create the bundle submit command. 114 * 115 * @param dryrun true if dryrun is enable 116 * @param conf configuration for bundle job 117 * @param authToken to be used for authentication 118 */ 119 public BundleSubmitXCommand(boolean dryrun, Configuration conf, String authToken) { 120 this(conf, authToken); 121 this.dryrun = dryrun; 122 } 123 124 /* (non-Javadoc) 125 * @see org.apache.oozie.command.SubmitTransitionXCommand#submit() 126 */ 127 @Override 128 protected String submit() throws CommandException { 129 LOG.info("STARTED Bundle Submit"); 130 try { 131 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation()); 132 133 XmlUtils.removeComments(this.bundleBean.getOrigJobXml().toString()); 134 // Resolving all variables in the job properties. 135 // This ensures the Hadoop Configuration semantics is preserved. 136 XConfiguration resolvedVarsConf = new XConfiguration(); 137 for (Map.Entry<String, String> entry : conf) { 138 resolvedVarsConf.set(entry.getKey(), conf.get(entry.getKey())); 139 } 140 conf = resolvedVarsConf; 141 142 String resolvedJobXml = resolvedVars(bundleBean.getOrigJobXml(), conf); 143 144 //verify the uniqueness of coord names 145 verifyCoordNameUnique(resolvedJobXml); 146 this.jobId = storeToDB(bundleBean, resolvedJobXml); 147 LogUtils.setLogInfo(bundleBean, logInfo); 148 149 if (dryrun) { 150 Date startTime = bundleBean.getStartTime(); 151 long startTimeMilli = startTime.getTime(); 152 long endTimeMilli = startTimeMilli + (3600 * 1000); 153 Date jobEndTime = bundleBean.getEndTime(); 154 Date endTime = new Date(endTimeMilli); 155 if (endTime.compareTo(jobEndTime) > 0) { 156 endTime = jobEndTime; 157 } 158 jobId = bundleBean.getId(); 159 LOG.info("[" + jobId + "]: Update status to PREP"); 160 bundleBean.setStatus(Job.Status.PREP); 161 try { 162 new XConfiguration(new StringReader(bundleBean.getConf())); 163 } 164 catch (IOException e1) { 165 LOG.warn("Configuration parse error. read from DB :" + bundleBean.getConf(), e1); 166 } 167 String output = bundleBean.getJobXml() + System.getProperty("line.separator"); 168 return output; 169 } 170 else { 171 if (bundleBean.getKickoffTime() == null) { 172 // If there is no KickOffTime, default kickoff is NOW. 173 LOG.debug("Since kickoff time is not defined for job id " + jobId 174 + ". Queuing and BundleStartXCommand immediately after submission"); 175 queue(new BundleStartXCommand(jobId)); 176 } 177 } 178 } 179 catch (Exception ex) { 180 throw new CommandException(ErrorCode.E1310, ex.getMessage(), ex); 181 } 182 LOG.info("ENDED Bundle Submit"); 183 return this.jobId; 184 } 185 186 /* (non-Javadoc) 187 * @see org.apache.oozie.command.TransitionXCommand#notifyParent() 188 */ 189 @Override 190 public void notifyParent() throws CommandException { 191 } 192 193 /* (non-Javadoc) 194 * @see org.apache.oozie.command.XCommand#getEntityKey() 195 */ 196 @Override 197 public String getEntityKey() { 198 return null; 199 } 200 201 /* (non-Javadoc) 202 * @see org.apache.oozie.command.XCommand#isLockRequired() 203 */ 204 @Override 205 protected boolean isLockRequired() { 206 return false; 207 } 208 209 /* (non-Javadoc) 210 * @see org.apache.oozie.command.XCommand#loadState() 211 */ 212 @Override 213 protected void loadState() throws CommandException { 214 } 215 216 /* (non-Javadoc) 217 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 218 */ 219 @Override 220 protected void verifyPrecondition() throws CommandException, PreconditionException { 221 } 222 223 /* (non-Javadoc) 224 * @see org.apache.oozie.command.XCommand#eagerLoadState() 225 */ 226 @Override 227 protected void eagerLoadState() throws CommandException { 228 super.eagerLoadState(); 229 jpaService = Services.get().get(JPAService.class); 230 if (jpaService == null) { 231 throw new CommandException(ErrorCode.E0610); 232 } 233 } 234 235 /* (non-Javadoc) 236 * @see org.apache.oozie.command.XCommand#eagerVerifyPrecondition() 237 */ 238 @Override 239 protected void eagerVerifyPrecondition() throws CommandException, PreconditionException { 240 try { 241 super.eagerVerifyPrecondition(); 242 mergeDefaultConfig(); 243 String appXml = readAndValidateXml(); 244 bundleBean.setOrigJobXml(appXml); 245 LOG.debug("jobXml after initial validation " + XmlUtils.prettyPrint(appXml).toString()); 246 } 247 catch (BundleJobException ex) { 248 LOG.warn("BundleJobException: ", ex); 249 throw new CommandException(ex); 250 } 251 catch (IllegalArgumentException iex) { 252 LOG.warn("IllegalArgumentException: ", iex); 253 throw new CommandException(ErrorCode.E1310, iex); 254 } 255 catch (Exception ex) { 256 LOG.warn("Exception: ", ex); 257 throw new CommandException(ErrorCode.E1310, ex); 258 } 259 } 260 261 /** 262 * Merge default configuration with user-defined configuration. 263 * 264 * @throws CommandException thrown if failed to merge configuration 265 */ 266 protected void mergeDefaultConfig() throws CommandException { 267 Path configDefault = null; 268 try { 269 String bundleAppPathStr = conf.get(OozieClient.BUNDLE_APP_PATH); 270 Path bundleAppPath = new Path(bundleAppPathStr); 271 String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME); 272 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 273 Configuration fsConf = has.createJobConf(bundleAppPath.toUri().getAuthority()); 274 FileSystem fs = has.createFileSystem(user, bundleAppPath.toUri(), fsConf); 275 276 // app path could be a directory 277 if (!fs.isFile(bundleAppPath)) { 278 configDefault = new Path(bundleAppPath, CONFIG_DEFAULT); 279 } else { 280 configDefault = new Path(bundleAppPath.getParent(), CONFIG_DEFAULT); 281 } 282 283 if (fs.exists(configDefault)) { 284 Configuration defaultConf = new XConfiguration(fs.open(configDefault)); 285 PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES); 286 XConfiguration.injectDefaults(defaultConf, conf); 287 } 288 else { 289 LOG.info("configDefault Doesn't exist " + configDefault); 290 } 291 PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES); 292 } 293 catch (IOException e) { 294 throw new CommandException(ErrorCode.E0702, e.getMessage() + " : Problem reading default config " 295 + configDefault, e); 296 } 297 catch (HadoopAccessorException e) { 298 throw new CommandException(e); 299 } 300 LOG.debug("Merged CONF :" + XmlUtils.prettyPrint(conf).toString()); 301 } 302 303 /** 304 * Read the application XML and validate against bundle Schema 305 * 306 * @return validated bundle XML 307 * @throws BundleJobException thrown if failed to read or validate xml 308 */ 309 private String readAndValidateXml() throws BundleJobException { 310 String appPath = ParamChecker.notEmpty(conf.get(OozieClient.BUNDLE_APP_PATH), OozieClient.BUNDLE_APP_PATH); 311 String bundleXml = readDefinition(appPath); 312 validateXml(bundleXml); 313 return bundleXml; 314 } 315 316 /** 317 * Read bundle definition. 318 * 319 * @param appPath application path. 320 * @param user user name. 321 * @param group group name. 322 * @param autToken authentication token. 323 * @return bundle definition. 324 * @throws BundleJobException thrown if the definition could not be read. 325 */ 326 protected String readDefinition(String appPath) throws BundleJobException { 327 String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME); 328 //Configuration confHadoop = CoordUtils.getHadoopConf(conf); 329 try { 330 URI uri = new URI(appPath); 331 LOG.debug("user =" + user); 332 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 333 Configuration fsConf = has.createJobConf(uri.getAuthority()); 334 FileSystem fs = has.createFileSystem(user, uri, fsConf); 335 Path appDefPath = null; 336 337 // app path could be a directory 338 Path path = new Path(uri.getPath()); 339 if (!fs.isFile(path)) { 340 appDefPath = new Path(path, BUNDLE_XML_FILE); 341 } else { 342 appDefPath = path; 343 } 344 345 Reader reader = new InputStreamReader(fs.open(appDefPath)); 346 StringWriter writer = new StringWriter(); 347 IOUtils.copyCharStream(reader, writer); 348 return writer.toString(); 349 } 350 catch (IOException ex) { 351 LOG.warn("IOException :" + XmlUtils.prettyPrint(conf), ex); 352 throw new BundleJobException(ErrorCode.E1301, ex.getMessage(), ex); 353 } 354 catch (URISyntaxException ex) { 355 LOG.warn("URISyException :" + ex.getMessage()); 356 throw new BundleJobException(ErrorCode.E1302, appPath, ex.getMessage(), ex); 357 } 358 catch (HadoopAccessorException ex) { 359 throw new BundleJobException(ex); 360 } 361 catch (Exception ex) { 362 LOG.warn("Exception :", ex); 363 throw new BundleJobException(ErrorCode.E1301, ex.getMessage(), ex); 364 } 365 } 366 367 /** 368 * Validate against Bundle XSD file 369 * 370 * @param xmlContent input bundle xml 371 * @throws BundleJobException thrown if failed to validate xml 372 */ 373 private void validateXml(String xmlContent) throws BundleJobException { 374 javax.xml.validation.Schema schema = Services.get().get(SchemaService.class).getSchema(SchemaName.BUNDLE); 375 Validator validator = schema.newValidator(); 376 try { 377 validator.validate(new StreamSource(new StringReader(xmlContent))); 378 } 379 catch (SAXException ex) { 380 LOG.warn("SAXException :", ex); 381 throw new BundleJobException(ErrorCode.E0701, ex.getMessage(), ex); 382 } 383 catch (IOException ex) { 384 LOG.warn("IOException :", ex); 385 throw new BundleJobException(ErrorCode.E0702, ex.getMessage(), ex); 386 } 387 } 388 389 /** 390 * Write a Bundle Job into database 391 * 392 * @param Bundle job bean 393 * @return job id 394 * @throws CommandException thrown if failed to store bundle job bean to db 395 */ 396 private String storeToDB(BundleJobBean bundleJob, String resolvedJobXml) throws CommandException { 397 try { 398 jobId = Services.get().get(UUIDService.class).generateId(ApplicationType.BUNDLE); 399 400 bundleJob.setId(jobId); 401 bundleJob.setAuthToken(this.authToken); 402 bundleJob.setAppName(XmlUtils.parseXml(bundleBean.getOrigJobXml()).getAttributeValue("name")); 403 bundleJob.setAppName(bundleJob.getAppName()); 404 bundleJob.setAppPath(conf.get(OozieClient.BUNDLE_APP_PATH)); 405 // bundleJob.setStatus(BundleJob.Status.PREP); //This should be set in parent class. 406 bundleJob.setCreatedTime(new Date()); 407 bundleJob.setUser(conf.get(OozieClient.USER_NAME)); 408 String group = ConfigUtils.getWithDeprecatedCheck(conf, OozieClient.JOB_ACL, OozieClient.GROUP_NAME, null); 409 bundleJob.setGroup(group); 410 bundleJob.setConf(XmlUtils.prettyPrint(conf).toString()); 411 bundleJob.setJobXml(resolvedJobXml); 412 Element jobElement = XmlUtils.parseXml(resolvedJobXml); 413 Element controlsElement = jobElement.getChild("controls", jobElement.getNamespace()); 414 if (controlsElement != null) { 415 Element kickoffTimeElement = controlsElement.getChild("kick-off-time", jobElement.getNamespace()); 416 if (kickoffTimeElement != null && !kickoffTimeElement.getValue().isEmpty()) { 417 Date kickoffTime = DateUtils.parseDateUTC(kickoffTimeElement.getValue()); 418 bundleJob.setKickoffTime(kickoffTime); 419 } 420 } 421 bundleJob.setLastModifiedTime(new Date()); 422 423 if (!dryrun) { 424 jpaService.execute(new BundleJobInsertJPAExecutor(bundleJob)); 425 } 426 } 427 catch (Exception ex) { 428 throw new CommandException(ErrorCode.E1301, ex.getMessage(), ex); 429 } 430 return jobId; 431 } 432 433 /* (non-Javadoc) 434 * @see org.apache.oozie.command.TransitionXCommand#getJob() 435 */ 436 @Override 437 public Job getJob() { 438 return bundleBean; 439 } 440 441 /** 442 * Resolve job xml with conf 443 * 444 * @param bundleXml bundle job xml 445 * @param conf job configuration 446 * @return resolved job xml 447 * @throws BundleJobException thrown if failed to resolve variables 448 */ 449 private String resolvedVars(String bundleXml, Configuration conf) throws BundleJobException { 450 try { 451 ELEvaluator eval = createEvaluator(conf); 452 return eval.evaluate(bundleXml, String.class); 453 } 454 catch (Exception e) { 455 throw new BundleJobException(ErrorCode.E1004, e.getMessage(), e); 456 } 457 } 458 459 /** 460 * Create ELEvaluator 461 * 462 * @param conf job configuration 463 * @return ELEvaluator the evaluator for el function 464 * @throws BundleJobException thrown if failed to create evaluator 465 */ 466 public ELEvaluator createEvaluator(Configuration conf) throws BundleJobException { 467 ELEvaluator eval; 468 ELEvaluator.Context context; 469 try { 470 context = new ELEvaluator.Context(); 471 eval = new ELEvaluator(context); 472 for (Map.Entry<String, String> entry : conf) { 473 eval.setVariable(entry.getKey(), entry.getValue()); 474 } 475 } 476 catch (Exception e) { 477 throw new BundleJobException(ErrorCode.E1004, e.getMessage(), e); 478 } 479 return eval; 480 } 481 482 /** 483 * Verify the uniqueness of coordinator names 484 * 485 * @param resolved job xml 486 * @throws CommandException thrown if failed to verify the uniqueness of coordinator names 487 */ 488 @SuppressWarnings("unchecked") 489 private Void verifyCoordNameUnique(String resolvedJobXml) throws CommandException { 490 Set<String> set = new HashSet<String>(); 491 try { 492 Element bAppXml = XmlUtils.parseXml(resolvedJobXml); 493 List<Element> coordElems = bAppXml.getChildren("coordinator", bAppXml.getNamespace()); 494 for (Element elem : coordElems) { 495 Attribute name = elem.getAttribute("name"); 496 if (name != null) { 497 if (set.contains(name.getValue())) { 498 throw new CommandException(ErrorCode.E1304, name); 499 } 500 set.add(name.getValue()); 501 } 502 else { 503 throw new CommandException(ErrorCode.E1305); 504 } 505 } 506 } 507 catch (JDOMException jex) { 508 throw new CommandException(ErrorCode.E1301, jex); 509 } 510 511 return null; 512 } 513 514 /* (non-Javadoc) 515 * @see org.apache.oozie.command.TransitionXCommand#updateJob() 516 */ 517 @Override 518 public void updateJob() throws CommandException { 519 } 520 }