This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.bundle;
019
020 import java.io.IOException;
021 import java.io.InputStreamReader;
022 import java.io.Reader;
023 import java.io.StringReader;
024 import java.io.StringWriter;
025 import java.net.URI;
026 import java.net.URISyntaxException;
027 import java.util.Date;
028 import java.util.HashSet;
029 import java.util.List;
030 import java.util.Map;
031 import java.util.Set;
032
033 import javax.xml.transform.stream.StreamSource;
034 import javax.xml.validation.Validator;
035
036 import org.apache.hadoop.conf.Configuration;
037 import org.apache.hadoop.fs.FileSystem;
038 import org.apache.hadoop.fs.Path;
039 import org.apache.oozie.BundleJobBean;
040 import org.apache.oozie.ErrorCode;
041 import org.apache.oozie.client.Job;
042 import org.apache.oozie.client.OozieClient;
043 import org.apache.oozie.command.CommandException;
044 import org.apache.oozie.command.PreconditionException;
045 import org.apache.oozie.command.SubmitTransitionXCommand;
046 import org.apache.oozie.executor.jpa.BundleJobInsertJPAExecutor;
047 import org.apache.oozie.service.HadoopAccessorException;
048 import org.apache.oozie.service.HadoopAccessorService;
049 import org.apache.oozie.service.JPAService;
050 import org.apache.oozie.service.SchemaService;
051 import org.apache.oozie.service.Services;
052 import org.apache.oozie.service.UUIDService;
053 import org.apache.oozie.service.SchemaService.SchemaName;
054 import org.apache.oozie.service.UUIDService.ApplicationType;
055 import org.apache.oozie.util.ConfigUtils;
056 import org.apache.oozie.util.DateUtils;
057 import org.apache.oozie.util.ELEvaluator;
058 import org.apache.oozie.util.ELUtils;
059 import org.apache.oozie.util.IOUtils;
060 import org.apache.oozie.util.InstrumentUtils;
061 import org.apache.oozie.util.LogUtils;
062 import org.apache.oozie.util.ParamChecker;
063 import org.apache.oozie.util.PropertiesUtils;
064 import org.apache.oozie.util.XConfiguration;
065 import org.apache.oozie.util.XmlUtils;
066 import org.apache.oozie.util.ParameterVerifier;
067 import org.jdom.Attribute;
068 import org.jdom.Element;
069 import org.jdom.JDOMException;
070 import org.xml.sax.SAXException;
071
072 /**
073 * This Command will submit the bundle.
074 */
075 public class BundleSubmitXCommand extends SubmitTransitionXCommand {
076
077 private Configuration conf;
078 private final String authToken;
079 public static final String CONFIG_DEFAULT = "bundle-config-default.xml";
080 public static final String BUNDLE_XML_FILE = "bundle.xml";
081 private final BundleJobBean bundleBean = new BundleJobBean();
082 private String jobId;
083 private JPAService jpaService = null;
084
085 private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>();
086 private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>();
087
088 static {
089 String[] badUserProps = { PropertiesUtils.YEAR, PropertiesUtils.MONTH, PropertiesUtils.DAY,
090 PropertiesUtils.HOUR, PropertiesUtils.MINUTE, PropertiesUtils.DAYS, PropertiesUtils.HOURS,
091 PropertiesUtils.MINUTES, PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB,
092 PropertiesUtils.TB, PropertiesUtils.PB, PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN,
093 PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN, PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS };
094 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES);
095
096 String[] badDefaultProps = { PropertiesUtils.HADOOP_USER};
097 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES);
098 PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES);
099 }
100
101 /**
102 * Constructor to create the bundle submit command.
103 *
104 * @param conf configuration for bundle job
105 * @param authToken to be used for authentication
106 */
107 public BundleSubmitXCommand(Configuration conf, String authToken) {
108 super("bundle_submit", "bundle_submit", 1);
109 this.conf = ParamChecker.notNull(conf, "conf");
110 this.authToken = ParamChecker.notEmpty(authToken, "authToken");
111 }
112
113 /**
114 * Constructor to create the bundle submit command.
115 *
116 * @param dryrun true if dryrun is enable
117 * @param conf configuration for bundle job
118 * @param authToken to be used for authentication
119 */
120 public BundleSubmitXCommand(boolean dryrun, Configuration conf, String authToken) {
121 this(conf, authToken);
122 this.dryrun = dryrun;
123 }
124
125 /* (non-Javadoc)
126 * @see org.apache.oozie.command.SubmitTransitionXCommand#submit()
127 */
128 @Override
129 protected String submit() throws CommandException {
130 LOG.info("STARTED Bundle Submit");
131 try {
132 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
133
134 ParameterVerifier.verifyParameters(conf, XmlUtils.parseXml(bundleBean.getOrigJobXml()));
135
136 XmlUtils.removeComments(this.bundleBean.getOrigJobXml().toString());
137 // Resolving all variables in the job properties.
138 // This ensures the Hadoop Configuration semantics is preserved.
139 XConfiguration resolvedVarsConf = new XConfiguration();
140 for (Map.Entry<String, String> entry : conf) {
141 resolvedVarsConf.set(entry.getKey(), conf.get(entry.getKey()));
142 }
143 conf = resolvedVarsConf;
144
145 String resolvedJobXml = resolvedVars(bundleBean.getOrigJobXml(), conf);
146
147 //verify the uniqueness of coord names
148 verifyCoordNameUnique(resolvedJobXml);
149 this.jobId = storeToDB(bundleBean, resolvedJobXml);
150 LogUtils.setLogInfo(bundleBean, logInfo);
151
152 if (dryrun) {
153 Date startTime = bundleBean.getStartTime();
154 long startTimeMilli = startTime.getTime();
155 long endTimeMilli = startTimeMilli + (3600 * 1000);
156 Date jobEndTime = bundleBean.getEndTime();
157 Date endTime = new Date(endTimeMilli);
158 if (endTime.compareTo(jobEndTime) > 0) {
159 endTime = jobEndTime;
160 }
161 jobId = bundleBean.getId();
162 LOG.info("[" + jobId + "]: Update status to PREP");
163 bundleBean.setStatus(Job.Status.PREP);
164 try {
165 new XConfiguration(new StringReader(bundleBean.getConf()));
166 }
167 catch (IOException e1) {
168 LOG.warn("Configuration parse error. read from DB :" + bundleBean.getConf(), e1);
169 }
170 String output = bundleBean.getJobXml() + System.getProperty("line.separator");
171 return output;
172 }
173 else {
174 if (bundleBean.getKickoffTime() == null) {
175 // If there is no KickOffTime, default kickoff is NOW.
176 LOG.debug("Since kickoff time is not defined for job id " + jobId
177 + ". Queuing and BundleStartXCommand immediately after submission");
178 queue(new BundleStartXCommand(jobId));
179 }
180 }
181 }
182 catch (Exception ex) {
183 throw new CommandException(ErrorCode.E1310, ex.getMessage(), ex);
184 }
185 LOG.info("ENDED Bundle Submit");
186 return this.jobId;
187 }
188
189 /* (non-Javadoc)
190 * @see org.apache.oozie.command.TransitionXCommand#notifyParent()
191 */
192 @Override
193 public void notifyParent() throws CommandException {
194 }
195
196 /* (non-Javadoc)
197 * @see org.apache.oozie.command.XCommand#getEntityKey()
198 */
199 @Override
200 public String getEntityKey() {
201 return null;
202 }
203
204 /* (non-Javadoc)
205 * @see org.apache.oozie.command.XCommand#isLockRequired()
206 */
207 @Override
208 protected boolean isLockRequired() {
209 return false;
210 }
211
212 /* (non-Javadoc)
213 * @see org.apache.oozie.command.XCommand#loadState()
214 */
215 @Override
216 protected void loadState() throws CommandException {
217 }
218
219 /* (non-Javadoc)
220 * @see org.apache.oozie.command.XCommand#verifyPrecondition()
221 */
222 @Override
223 protected void verifyPrecondition() throws CommandException, PreconditionException {
224 }
225
226 /* (non-Javadoc)
227 * @see org.apache.oozie.command.XCommand#eagerLoadState()
228 */
229 @Override
230 protected void eagerLoadState() throws CommandException {
231 super.eagerLoadState();
232 jpaService = Services.get().get(JPAService.class);
233 if (jpaService == null) {
234 throw new CommandException(ErrorCode.E0610);
235 }
236 }
237
238 /* (non-Javadoc)
239 * @see org.apache.oozie.command.XCommand#eagerVerifyPrecondition()
240 */
241 @Override
242 protected void eagerVerifyPrecondition() throws CommandException, PreconditionException {
243 try {
244 super.eagerVerifyPrecondition();
245 mergeDefaultConfig();
246 String appXml = readAndValidateXml();
247 bundleBean.setOrigJobXml(appXml);
248 LOG.debug("jobXml after initial validation " + XmlUtils.prettyPrint(appXml).toString());
249 }
250 catch (BundleJobException ex) {
251 LOG.warn("BundleJobException: ", ex);
252 throw new CommandException(ex);
253 }
254 catch (IllegalArgumentException iex) {
255 LOG.warn("IllegalArgumentException: ", iex);
256 throw new CommandException(ErrorCode.E1310, iex);
257 }
258 catch (Exception ex) {
259 LOG.warn("Exception: ", ex);
260 throw new CommandException(ErrorCode.E1310, ex);
261 }
262 }
263
264 /**
265 * Merge default configuration with user-defined configuration.
266 *
267 * @throws CommandException thrown if failed to merge configuration
268 */
269 protected void mergeDefaultConfig() throws CommandException {
270 Path configDefault = null;
271 try {
272 String bundleAppPathStr = conf.get(OozieClient.BUNDLE_APP_PATH);
273 Path bundleAppPath = new Path(bundleAppPathStr);
274 String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
275 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
276 Configuration fsConf = has.createJobConf(bundleAppPath.toUri().getAuthority());
277 FileSystem fs = has.createFileSystem(user, bundleAppPath.toUri(), fsConf);
278
279 // app path could be a directory
280 if (!fs.isFile(bundleAppPath)) {
281 configDefault = new Path(bundleAppPath, CONFIG_DEFAULT);
282 } else {
283 configDefault = new Path(bundleAppPath.getParent(), CONFIG_DEFAULT);
284 }
285
286 if (fs.exists(configDefault)) {
287 Configuration defaultConf = new XConfiguration(fs.open(configDefault));
288 PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES);
289 XConfiguration.injectDefaults(defaultConf, conf);
290 }
291 else {
292 LOG.info("configDefault Doesn't exist " + configDefault);
293 }
294 PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES);
295 }
296 catch (IOException e) {
297 throw new CommandException(ErrorCode.E0702, e.getMessage() + " : Problem reading default config "
298 + configDefault, e);
299 }
300 catch (HadoopAccessorException e) {
301 throw new CommandException(e);
302 }
303 LOG.debug("Merged CONF :" + XmlUtils.prettyPrint(conf).toString());
304 }
305
306 /**
307 * Read the application XML and validate against bundle Schema
308 *
309 * @return validated bundle XML
310 * @throws BundleJobException thrown if failed to read or validate xml
311 */
312 private String readAndValidateXml() throws BundleJobException {
313 String appPath = ParamChecker.notEmpty(conf.get(OozieClient.BUNDLE_APP_PATH), OozieClient.BUNDLE_APP_PATH);
314 String bundleXml = readDefinition(appPath);
315 validateXml(bundleXml);
316 return bundleXml;
317 }
318
319 /**
320 * Read bundle definition.
321 *
322 * @param appPath application path.
323 * @param user user name.
324 * @param group group name.
325 * @param autToken authentication token.
326 * @return bundle definition.
327 * @throws BundleJobException thrown if the definition could not be read.
328 */
329 protected String readDefinition(String appPath) throws BundleJobException {
330 String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
331 //Configuration confHadoop = CoordUtils.getHadoopConf(conf);
332 try {
333 URI uri = new URI(appPath);
334 LOG.debug("user =" + user);
335 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
336 Configuration fsConf = has.createJobConf(uri.getAuthority());
337 FileSystem fs = has.createFileSystem(user, uri, fsConf);
338 Path appDefPath = null;
339
340 // app path could be a directory
341 Path path = new Path(uri.getPath());
342 if (!fs.isFile(path)) {
343 appDefPath = new Path(path, BUNDLE_XML_FILE);
344 } else {
345 appDefPath = path;
346 }
347
348 Reader reader = new InputStreamReader(fs.open(appDefPath));
349 StringWriter writer = new StringWriter();
350 IOUtils.copyCharStream(reader, writer);
351 return writer.toString();
352 }
353 catch (IOException ex) {
354 LOG.warn("IOException :" + XmlUtils.prettyPrint(conf), ex);
355 throw new BundleJobException(ErrorCode.E1301, ex.getMessage(), ex);
356 }
357 catch (URISyntaxException ex) {
358 LOG.warn("URISyException :" + ex.getMessage());
359 throw new BundleJobException(ErrorCode.E1302, appPath, ex.getMessage(), ex);
360 }
361 catch (HadoopAccessorException ex) {
362 throw new BundleJobException(ex);
363 }
364 catch (Exception ex) {
365 LOG.warn("Exception :", ex);
366 throw new BundleJobException(ErrorCode.E1301, ex.getMessage(), ex);
367 }
368 }
369
370 /**
371 * Validate against Bundle XSD file
372 *
373 * @param xmlContent input bundle xml
374 * @throws BundleJobException thrown if failed to validate xml
375 */
376 private void validateXml(String xmlContent) throws BundleJobException {
377 javax.xml.validation.Schema schema = Services.get().get(SchemaService.class).getSchema(SchemaName.BUNDLE);
378 Validator validator = schema.newValidator();
379 try {
380 validator.validate(new StreamSource(new StringReader(xmlContent)));
381 }
382 catch (SAXException ex) {
383 LOG.warn("SAXException :", ex);
384 throw new BundleJobException(ErrorCode.E0701, ex.getMessage(), ex);
385 }
386 catch (IOException ex) {
387 LOG.warn("IOException :", ex);
388 throw new BundleJobException(ErrorCode.E0702, ex.getMessage(), ex);
389 }
390 }
391
392 /**
393 * Write a Bundle Job into database
394 *
395 * @param Bundle job bean
396 * @return job id
397 * @throws CommandException thrown if failed to store bundle job bean to db
398 */
399 private String storeToDB(BundleJobBean bundleJob, String resolvedJobXml) throws CommandException {
400 try {
401 jobId = Services.get().get(UUIDService.class).generateId(ApplicationType.BUNDLE);
402
403 bundleJob.setId(jobId);
404 bundleJob.setAuthToken(this.authToken);
405 String name = XmlUtils.parseXml(bundleBean.getOrigJobXml()).getAttributeValue("name");
406 name = ELUtils.resolveAppName(name, conf);
407 bundleJob.setAppName(name);
408 bundleJob.setAppPath(conf.get(OozieClient.BUNDLE_APP_PATH));
409 // bundleJob.setStatus(BundleJob.Status.PREP); //This should be set in parent class.
410 bundleJob.setCreatedTime(new Date());
411 bundleJob.setUser(conf.get(OozieClient.USER_NAME));
412 String group = ConfigUtils.getWithDeprecatedCheck(conf, OozieClient.JOB_ACL, OozieClient.GROUP_NAME, null);
413 bundleJob.setGroup(group);
414 bundleJob.setConf(XmlUtils.prettyPrint(conf).toString());
415 bundleJob.setJobXml(resolvedJobXml);
416 Element jobElement = XmlUtils.parseXml(resolvedJobXml);
417 Element controlsElement = jobElement.getChild("controls", jobElement.getNamespace());
418 if (controlsElement != null) {
419 Element kickoffTimeElement = controlsElement.getChild("kick-off-time", jobElement.getNamespace());
420 if (kickoffTimeElement != null && !kickoffTimeElement.getValue().isEmpty()) {
421 Date kickoffTime = DateUtils.parseDateOozieTZ(kickoffTimeElement.getValue());
422 bundleJob.setKickoffTime(kickoffTime);
423 }
424 }
425 bundleJob.setLastModifiedTime(new Date());
426
427 if (!dryrun) {
428 jpaService.execute(new BundleJobInsertJPAExecutor(bundleJob));
429 }
430 }
431 catch (Exception ex) {
432 throw new CommandException(ErrorCode.E1301, ex.getMessage(), ex);
433 }
434 return jobId;
435 }
436
437 /* (non-Javadoc)
438 * @see org.apache.oozie.command.TransitionXCommand#getJob()
439 */
440 @Override
441 public Job getJob() {
442 return bundleBean;
443 }
444
445 /**
446 * Resolve job xml with conf
447 *
448 * @param bundleXml bundle job xml
449 * @param conf job configuration
450 * @return resolved job xml
451 * @throws BundleJobException thrown if failed to resolve variables
452 */
453 private String resolvedVars(String bundleXml, Configuration conf) throws BundleJobException {
454 try {
455 ELEvaluator eval = createEvaluator(conf);
456 return eval.evaluate(bundleXml, String.class);
457 }
458 catch (Exception e) {
459 throw new BundleJobException(ErrorCode.E1004, e.getMessage(), e);
460 }
461 }
462
463 /**
464 * Create ELEvaluator
465 *
466 * @param conf job configuration
467 * @return ELEvaluator the evaluator for el function
468 * @throws BundleJobException thrown if failed to create evaluator
469 */
470 public ELEvaluator createEvaluator(Configuration conf) throws BundleJobException {
471 ELEvaluator eval;
472 ELEvaluator.Context context;
473 try {
474 context = new ELEvaluator.Context();
475 eval = new ELEvaluator(context);
476 for (Map.Entry<String, String> entry : conf) {
477 eval.setVariable(entry.getKey(), entry.getValue());
478 }
479 }
480 catch (Exception e) {
481 throw new BundleJobException(ErrorCode.E1004, e.getMessage(), e);
482 }
483 return eval;
484 }
485
486 /**
487 * Verify the uniqueness of coordinator names
488 *
489 * @param resolved job xml
490 * @throws CommandException thrown if failed to verify the uniqueness of coordinator names
491 */
492 @SuppressWarnings("unchecked")
493 private Void verifyCoordNameUnique(String resolvedJobXml) throws CommandException {
494 Set<String> set = new HashSet<String>();
495 try {
496 Element bAppXml = XmlUtils.parseXml(resolvedJobXml);
497 List<Element> coordElems = bAppXml.getChildren("coordinator", bAppXml.getNamespace());
498 for (Element elem : coordElems) {
499 Attribute name = elem.getAttribute("name");
500 if (name != null) {
501 if (set.contains(name.getValue())) {
502 throw new CommandException(ErrorCode.E1304, name);
503 }
504 set.add(name.getValue());
505 }
506 else {
507 throw new CommandException(ErrorCode.E1305);
508 }
509 }
510 }
511 catch (JDOMException jex) {
512 throw new CommandException(ErrorCode.E1301, jex);
513 }
514
515 return null;
516 }
517
518 /* (non-Javadoc)
519 * @see org.apache.oozie.command.TransitionXCommand#updateJob()
520 */
521 @Override
522 public void updateJob() throws CommandException {
523 }
524
525 @Override
526 public void performWrites() throws CommandException {
527 }
528 }