001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.bundle; 019 020 import java.io.IOException; 021 import java.io.InputStreamReader; 022 import java.io.Reader; 023 import java.io.StringReader; 024 import java.io.StringWriter; 025 import java.net.URI; 026 import java.net.URISyntaxException; 027 import java.util.Date; 028 import java.util.HashSet; 029 import java.util.List; 030 import java.util.Map; 031 import java.util.Set; 032 033 import javax.xml.transform.stream.StreamSource; 034 import javax.xml.validation.Validator; 035 036 import org.apache.hadoop.conf.Configuration; 037 import org.apache.hadoop.fs.FileSystem; 038 import org.apache.hadoop.fs.Path; 039 import org.apache.oozie.BundleJobBean; 040 import org.apache.oozie.ErrorCode; 041 import org.apache.oozie.client.Job; 042 import org.apache.oozie.client.OozieClient; 043 import org.apache.oozie.command.CommandException; 044 import org.apache.oozie.command.PreconditionException; 045 import org.apache.oozie.command.SubmitTransitionXCommand; 046 import org.apache.oozie.executor.jpa.BundleJobInsertJPAExecutor; 047 import org.apache.oozie.service.HadoopAccessorException; 048 import org.apache.oozie.service.HadoopAccessorService; 049 import org.apache.oozie.service.JPAService; 050 import org.apache.oozie.service.SchemaService; 051 import org.apache.oozie.service.Services; 052 import org.apache.oozie.service.UUIDService; 053 import org.apache.oozie.service.WorkflowAppService; 054 import org.apache.oozie.service.SchemaService.SchemaName; 055 import org.apache.oozie.service.UUIDService.ApplicationType; 056 import org.apache.oozie.util.DateUtils; 057 import org.apache.oozie.util.ELEvaluator; 058 import org.apache.oozie.util.IOUtils; 059 import org.apache.oozie.util.InstrumentUtils; 060 import org.apache.oozie.util.LogUtils; 061 import org.apache.oozie.util.ParamChecker; 062 import org.apache.oozie.util.PropertiesUtils; 063 import org.apache.oozie.util.XConfiguration; 064 import org.apache.oozie.util.XmlUtils; 065 import org.jdom.Attribute; 066 import org.jdom.Element; 067 import org.jdom.JDOMException; 068 import org.xml.sax.SAXException; 069 070 /** 071 * This Command will submit the bundle. 072 */ 073 public class BundleSubmitXCommand extends SubmitTransitionXCommand { 074 075 private Configuration conf; 076 private final String authToken; 077 public static final String CONFIG_DEFAULT = "bundle-config-default.xml"; 078 public static final String BUNDLE_XML_FILE = "bundle.xml"; 079 private final BundleJobBean bundleBean = new BundleJobBean(); 080 private String jobId; 081 private JPAService jpaService = null; 082 083 private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>(); 084 private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>(); 085 086 static { 087 String[] badUserProps = { PropertiesUtils.YEAR, PropertiesUtils.MONTH, PropertiesUtils.DAY, 088 PropertiesUtils.HOUR, PropertiesUtils.MINUTE, PropertiesUtils.DAYS, PropertiesUtils.HOURS, 089 PropertiesUtils.MINUTES, PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB, 090 PropertiesUtils.TB, PropertiesUtils.PB, PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN, 091 PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN, PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS }; 092 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES); 093 094 String[] badDefaultProps = { PropertiesUtils.HADOOP_USER, PropertiesUtils.HADOOP_UGI, 095 WorkflowAppService.HADOOP_JT_KERBEROS_NAME, WorkflowAppService.HADOOP_NN_KERBEROS_NAME }; 096 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES); 097 PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES); 098 } 099 100 /** 101 * Constructor to create the bundle submit command. 102 * 103 * @param conf configuration for bundle job 104 * @param authToken to be used for authentication 105 */ 106 public BundleSubmitXCommand(Configuration conf, String authToken) { 107 super("bundle_submit", "bundle_submit", 1); 108 this.conf = ParamChecker.notNull(conf, "conf"); 109 this.authToken = ParamChecker.notEmpty(authToken, "authToken"); 110 } 111 112 /** 113 * Constructor to create the bundle submit command. 114 * 115 * @param dryrun true if dryrun is enable 116 * @param conf configuration for bundle job 117 * @param authToken to be used for authentication 118 */ 119 public BundleSubmitXCommand(boolean dryrun, Configuration conf, String authToken) { 120 this(conf, authToken); 121 this.dryrun = dryrun; 122 } 123 124 /* (non-Javadoc) 125 * @see org.apache.oozie.command.SubmitTransitionXCommand#submit() 126 */ 127 @Override 128 protected String submit() throws CommandException { 129 LOG.info("STARTED Bundle Submit"); 130 try { 131 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation()); 132 133 XmlUtils.removeComments(this.bundleBean.getOrigJobXml().toString()); 134 // Resolving all variables in the job properties. 135 // This ensures the Hadoop Configuration semantics is preserved. 136 XConfiguration resolvedVarsConf = new XConfiguration(); 137 for (Map.Entry<String, String> entry : conf) { 138 resolvedVarsConf.set(entry.getKey(), conf.get(entry.getKey())); 139 } 140 conf = resolvedVarsConf; 141 142 String resolvedJobXml = resolvedVars(bundleBean.getOrigJobXml(), conf); 143 144 //verify the uniqueness of coord names 145 verifyCoordNameUnique(resolvedJobXml); 146 this.jobId = storeToDB(bundleBean, resolvedJobXml); 147 LogUtils.setLogInfo(bundleBean, logInfo); 148 149 if (dryrun) { 150 Date startTime = bundleBean.getStartTime(); 151 long startTimeMilli = startTime.getTime(); 152 long endTimeMilli = startTimeMilli + (3600 * 1000); 153 Date jobEndTime = bundleBean.getEndTime(); 154 Date endTime = new Date(endTimeMilli); 155 if (endTime.compareTo(jobEndTime) > 0) { 156 endTime = jobEndTime; 157 } 158 jobId = bundleBean.getId(); 159 LOG.info("[" + jobId + "]: Update status to PREP"); 160 bundleBean.setStatus(Job.Status.PREP); 161 try { 162 new XConfiguration(new StringReader(bundleBean.getConf())); 163 } 164 catch (IOException e1) { 165 LOG.warn("Configuration parse error. read from DB :" + bundleBean.getConf(), e1); 166 } 167 String output = bundleBean.getJobXml() + System.getProperty("line.separator"); 168 return output; 169 } 170 else { 171 if (bundleBean.getKickoffTime() == null) { 172 // If there is no KickOffTime, default kickoff is NOW. 173 LOG.debug("Since kickoff time is not defined for job id " + jobId 174 + ". Queuing and BundleStartXCommand immediately after submission"); 175 queue(new BundleStartXCommand(jobId)); 176 } 177 } 178 } 179 catch (Exception ex) { 180 throw new CommandException(ErrorCode.E1310, ex.getMessage(), ex); 181 } 182 LOG.info("ENDED Bundle Submit"); 183 return this.jobId; 184 } 185 186 /* (non-Javadoc) 187 * @see org.apache.oozie.command.TransitionXCommand#notifyParent() 188 */ 189 @Override 190 public void notifyParent() throws CommandException { 191 } 192 193 /* (non-Javadoc) 194 * @see org.apache.oozie.command.XCommand#getEntityKey() 195 */ 196 @Override 197 protected String getEntityKey() { 198 return null; 199 } 200 201 /* (non-Javadoc) 202 * @see org.apache.oozie.command.XCommand#isLockRequired() 203 */ 204 @Override 205 protected boolean isLockRequired() { 206 return false; 207 } 208 209 /* (non-Javadoc) 210 * @see org.apache.oozie.command.XCommand#loadState() 211 */ 212 @Override 213 protected void loadState() throws CommandException { 214 } 215 216 /* (non-Javadoc) 217 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 218 */ 219 @Override 220 protected void verifyPrecondition() throws CommandException, PreconditionException { 221 } 222 223 /* (non-Javadoc) 224 * @see org.apache.oozie.command.XCommand#eagerLoadState() 225 */ 226 @Override 227 protected void eagerLoadState() throws CommandException { 228 super.eagerLoadState(); 229 jpaService = Services.get().get(JPAService.class); 230 if (jpaService == null) { 231 throw new CommandException(ErrorCode.E0610); 232 } 233 } 234 235 /* (non-Javadoc) 236 * @see org.apache.oozie.command.XCommand#eagerVerifyPrecondition() 237 */ 238 @Override 239 protected void eagerVerifyPrecondition() throws CommandException, PreconditionException { 240 try { 241 super.eagerVerifyPrecondition(); 242 mergeDefaultConfig(); 243 String appXml = readAndValidateXml(); 244 bundleBean.setOrigJobXml(appXml); 245 LOG.debug("jobXml after initial validation " + XmlUtils.prettyPrint(appXml).toString()); 246 } 247 catch (BundleJobException ex) { 248 LOG.warn("BundleJobException: ", ex); 249 throw new CommandException(ex); 250 } 251 catch (IllegalArgumentException iex) { 252 LOG.warn("IllegalArgumentException: ", iex); 253 throw new CommandException(ErrorCode.E1310, iex); 254 } 255 catch (Exception ex) { 256 LOG.warn("Exception: ", ex); 257 throw new CommandException(ErrorCode.E1310, ex); 258 } 259 } 260 261 /** 262 * Merge default configuration with user-defined configuration. 263 * 264 * @throws CommandException thrown if failed to merge configuration 265 */ 266 protected void mergeDefaultConfig() throws CommandException { 267 Path configDefault = null; 268 try { 269 String bundleAppPathStr = conf.get(OozieClient.BUNDLE_APP_PATH); 270 Path bundleAppPath = new Path(bundleAppPathStr); 271 String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME); 272 String group = ParamChecker.notEmpty(conf.get(OozieClient.GROUP_NAME), OozieClient.GROUP_NAME); 273 FileSystem fs = Services.get().get(HadoopAccessorService.class).createFileSystem(user, group, bundleAppPath.toUri(), 274 new Configuration()); 275 276 // app path could be a directory 277 if (!fs.isFile(bundleAppPath)) { 278 configDefault = new Path(bundleAppPath, CONFIG_DEFAULT); 279 } else { 280 configDefault = new Path(bundleAppPath.getParent(), CONFIG_DEFAULT); 281 } 282 283 if (fs.exists(configDefault)) { 284 Configuration defaultConf = new XConfiguration(fs.open(configDefault)); 285 PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES); 286 XConfiguration.injectDefaults(defaultConf, conf); 287 } 288 else { 289 LOG.info("configDefault Doesn't exist " + configDefault); 290 } 291 PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES); 292 } 293 catch (IOException e) { 294 throw new CommandException(ErrorCode.E0702, e.getMessage() + " : Problem reading default config " 295 + configDefault, e); 296 } 297 catch (HadoopAccessorException e) { 298 throw new CommandException(e); 299 } 300 LOG.debug("Merged CONF :" + XmlUtils.prettyPrint(conf).toString()); 301 } 302 303 /** 304 * Read the application XML and validate against bundle Schema 305 * 306 * @return validated bundle XML 307 * @throws BundleJobException thrown if failed to read or validate xml 308 */ 309 private String readAndValidateXml() throws BundleJobException { 310 String appPath = ParamChecker.notEmpty(conf.get(OozieClient.BUNDLE_APP_PATH), OozieClient.BUNDLE_APP_PATH); 311 String bundleXml = readDefinition(appPath); 312 validateXml(bundleXml); 313 return bundleXml; 314 } 315 316 /** 317 * Read bundle definition. 318 * 319 * @param appPath application path. 320 * @param user user name. 321 * @param group group name. 322 * @param autToken authentication token. 323 * @return bundle definition. 324 * @throws BundleJobException thrown if the definition could not be read. 325 */ 326 protected String readDefinition(String appPath) throws BundleJobException { 327 String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME); 328 String group = ParamChecker.notEmpty(conf.get(OozieClient.GROUP_NAME), OozieClient.GROUP_NAME); 329 //Configuration confHadoop = CoordUtils.getHadoopConf(conf); 330 try { 331 URI uri = new URI(appPath); 332 LOG.debug("user =" + user + " group =" + group); 333 FileSystem fs = Services.get().get(HadoopAccessorService.class).createFileSystem(user, group, uri, 334 new Configuration()); 335 Path appDefPath = null; 336 337 // app path could be a directory 338 Path path = new Path(uri.getPath()); 339 if (!fs.isFile(path)) { 340 appDefPath = new Path(path, BUNDLE_XML_FILE); 341 } else { 342 appDefPath = path; 343 } 344 345 Reader reader = new InputStreamReader(fs.open(appDefPath)); 346 StringWriter writer = new StringWriter(); 347 IOUtils.copyCharStream(reader, writer); 348 return writer.toString(); 349 } 350 catch (IOException ex) { 351 LOG.warn("IOException :" + XmlUtils.prettyPrint(conf), ex); 352 throw new BundleJobException(ErrorCode.E1301, ex.getMessage(), ex); 353 } 354 catch (URISyntaxException ex) { 355 LOG.warn("URISyException :" + ex.getMessage()); 356 throw new BundleJobException(ErrorCode.E1302, appPath, ex.getMessage(), ex); 357 } 358 catch (HadoopAccessorException ex) { 359 throw new BundleJobException(ex); 360 } 361 catch (Exception ex) { 362 LOG.warn("Exception :", ex); 363 throw new BundleJobException(ErrorCode.E1301, ex.getMessage(), ex); 364 } 365 } 366 367 /** 368 * Validate against Bundle XSD file 369 * 370 * @param xmlContent input bundle xml 371 * @throws BundleJobException thrown if failed to validate xml 372 */ 373 private void validateXml(String xmlContent) throws BundleJobException { 374 javax.xml.validation.Schema schema = Services.get().get(SchemaService.class).getSchema(SchemaName.BUNDLE); 375 Validator validator = schema.newValidator(); 376 try { 377 validator.validate(new StreamSource(new StringReader(xmlContent))); 378 } 379 catch (SAXException ex) { 380 LOG.warn("SAXException :", ex); 381 throw new BundleJobException(ErrorCode.E0701, ex.getMessage(), ex); 382 } 383 catch (IOException ex) { 384 LOG.warn("IOException :", ex); 385 throw new BundleJobException(ErrorCode.E0702, ex.getMessage(), ex); 386 } 387 } 388 389 /** 390 * Write a Bundle Job into database 391 * 392 * @param Bundle job bean 393 * @return job id 394 * @throws CommandException thrown if failed to store bundle job bean to db 395 */ 396 private String storeToDB(BundleJobBean bundleJob, String resolvedJobXml) throws CommandException { 397 try { 398 jobId = Services.get().get(UUIDService.class).generateId(ApplicationType.BUNDLE); 399 400 bundleJob.setId(jobId); 401 bundleJob.setAuthToken(this.authToken); 402 bundleJob.setAppName(XmlUtils.parseXml(bundleBean.getOrigJobXml()).getAttributeValue("name")); 403 bundleJob.setAppName(bundleJob.getAppName()); 404 bundleJob.setAppPath(conf.get(OozieClient.BUNDLE_APP_PATH)); 405 // bundleJob.setStatus(BundleJob.Status.PREP); //This should be set in parent class. 406 bundleJob.setCreatedTime(new Date()); 407 bundleJob.setUser(conf.get(OozieClient.USER_NAME)); 408 bundleJob.setGroup(conf.get(OozieClient.GROUP_NAME)); 409 bundleJob.setConf(XmlUtils.prettyPrint(conf).toString()); 410 bundleJob.setJobXml(resolvedJobXml); 411 Element jobElement = XmlUtils.parseXml(resolvedJobXml); 412 Element controlsElement = jobElement.getChild("controls", jobElement.getNamespace()); 413 if (controlsElement != null) { 414 Element kickoffTimeElement = controlsElement.getChild("kick-off-time", jobElement.getNamespace()); 415 if (kickoffTimeElement != null && !kickoffTimeElement.getValue().isEmpty()) { 416 Date kickoffTime = DateUtils.parseDateUTC(kickoffTimeElement.getValue()); 417 bundleJob.setKickoffTime(kickoffTime); 418 } 419 } 420 bundleJob.setLastModifiedTime(new Date()); 421 422 if (!dryrun) { 423 jpaService.execute(new BundleJobInsertJPAExecutor(bundleJob)); 424 } 425 } 426 catch (Exception ex) { 427 throw new CommandException(ErrorCode.E1301, ex.getMessage(), ex); 428 } 429 return jobId; 430 } 431 432 /* (non-Javadoc) 433 * @see org.apache.oozie.command.TransitionXCommand#getJob() 434 */ 435 @Override 436 public Job getJob() { 437 return bundleBean; 438 } 439 440 /** 441 * Resolve job xml with conf 442 * 443 * @param bundleXml bundle job xml 444 * @param conf job configuration 445 * @return resolved job xml 446 * @throws BundleJobException thrown if failed to resolve variables 447 */ 448 private String resolvedVars(String bundleXml, Configuration conf) throws BundleJobException { 449 try { 450 ELEvaluator eval = createEvaluator(conf); 451 return eval.evaluate(bundleXml, String.class); 452 } 453 catch (Exception e) { 454 throw new BundleJobException(ErrorCode.E1004, e.getMessage(), e); 455 } 456 } 457 458 /** 459 * Create ELEvaluator 460 * 461 * @param conf job configuration 462 * @return ELEvaluator the evaluator for el function 463 * @throws BundleJobException thrown if failed to create evaluator 464 */ 465 public ELEvaluator createEvaluator(Configuration conf) throws BundleJobException { 466 ELEvaluator eval; 467 ELEvaluator.Context context; 468 try { 469 context = new ELEvaluator.Context(); 470 eval = new ELEvaluator(context); 471 for (Map.Entry<String, String> entry : conf) { 472 eval.setVariable(entry.getKey(), entry.getValue()); 473 } 474 } 475 catch (Exception e) { 476 throw new BundleJobException(ErrorCode.E1004, e.getMessage(), e); 477 } 478 return eval; 479 } 480 481 /** 482 * Verify the uniqueness of coordinator names 483 * 484 * @param resolved job xml 485 * @throws CommandException thrown if failed to verify the uniqueness of coordinator names 486 */ 487 @SuppressWarnings("unchecked") 488 private Void verifyCoordNameUnique(String resolvedJobXml) throws CommandException { 489 Set<String> set = new HashSet<String>(); 490 try { 491 Element bAppXml = XmlUtils.parseXml(resolvedJobXml); 492 List<Element> coordElems = bAppXml.getChildren("coordinator", bAppXml.getNamespace()); 493 for (Element elem : coordElems) { 494 Attribute name = elem.getAttribute("name"); 495 if (name != null) { 496 if (set.contains(name.getValue())) { 497 throw new CommandException(ErrorCode.E1304, name); 498 } 499 set.add(name.getValue()); 500 } 501 else { 502 throw new CommandException(ErrorCode.E1305); 503 } 504 } 505 } 506 catch (JDOMException jex) { 507 throw new CommandException(ErrorCode.E1301, jex); 508 } 509 510 return null; 511 } 512 513 /* (non-Javadoc) 514 * @see org.apache.oozie.command.TransitionXCommand#updateJob() 515 */ 516 @Override 517 public void updateJob() throws CommandException { 518 } 519 }