This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.bundle;
019
020 import java.io.IOException;
021 import java.io.InputStreamReader;
022 import java.io.Reader;
023 import java.io.StringReader;
024 import java.io.StringWriter;
025 import java.net.URI;
026 import java.net.URISyntaxException;
027 import java.util.Date;
028 import java.util.HashSet;
029 import java.util.List;
030 import java.util.Map;
031 import java.util.Set;
032
033 import javax.xml.transform.stream.StreamSource;
034 import javax.xml.validation.Validator;
035
036 import org.apache.hadoop.conf.Configuration;
037 import org.apache.hadoop.fs.FileSystem;
038 import org.apache.hadoop.fs.Path;
039 import org.apache.oozie.BundleJobBean;
040 import org.apache.oozie.ErrorCode;
041 import org.apache.oozie.client.Job;
042 import org.apache.oozie.client.OozieClient;
043 import org.apache.oozie.command.CommandException;
044 import org.apache.oozie.command.PreconditionException;
045 import org.apache.oozie.command.SubmitTransitionXCommand;
046 import org.apache.oozie.executor.jpa.BundleJobInsertJPAExecutor;
047 import org.apache.oozie.service.HadoopAccessorException;
048 import org.apache.oozie.service.HadoopAccessorService;
049 import org.apache.oozie.service.JPAService;
050 import org.apache.oozie.service.SchemaService;
051 import org.apache.oozie.service.Services;
052 import org.apache.oozie.service.UUIDService;
053 import org.apache.oozie.service.SchemaService.SchemaName;
054 import org.apache.oozie.service.UUIDService.ApplicationType;
055 import org.apache.oozie.util.ConfigUtils;
056 import org.apache.oozie.util.DateUtils;
057 import org.apache.oozie.util.ELEvaluator;
058 import org.apache.oozie.util.ELUtils;
059 import org.apache.oozie.util.IOUtils;
060 import org.apache.oozie.util.InstrumentUtils;
061 import org.apache.oozie.util.LogUtils;
062 import org.apache.oozie.util.ParamChecker;
063 import org.apache.oozie.util.PropertiesUtils;
064 import org.apache.oozie.util.XConfiguration;
065 import org.apache.oozie.util.XmlUtils;
066 import org.apache.oozie.util.ParameterVerifier;
067 import org.jdom.Attribute;
068 import org.jdom.Element;
069 import org.jdom.JDOMException;
070 import org.xml.sax.SAXException;
071
072 /**
073 * This Command will submit the bundle.
074 */
075 public class BundleSubmitXCommand extends SubmitTransitionXCommand {
076
077 private Configuration conf;
078 public static final String CONFIG_DEFAULT = "bundle-config-default.xml";
079 public static final String BUNDLE_XML_FILE = "bundle.xml";
080 private final BundleJobBean bundleBean = new BundleJobBean();
081 private String jobId;
082 private JPAService jpaService = null;
083
084 private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>();
085 private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>();
086
087 static {
088 String[] badUserProps = { PropertiesUtils.YEAR, PropertiesUtils.MONTH, PropertiesUtils.DAY,
089 PropertiesUtils.HOUR, PropertiesUtils.MINUTE, PropertiesUtils.DAYS, PropertiesUtils.HOURS,
090 PropertiesUtils.MINUTES, PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB,
091 PropertiesUtils.TB, PropertiesUtils.PB, PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN,
092 PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN, PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS };
093 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES);
094
095 String[] badDefaultProps = { PropertiesUtils.HADOOP_USER};
096 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES);
097 PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES);
098 }
099
100 /**
101 * Constructor to create the bundle submit command.
102 *
103 * @param conf configuration for bundle job
104 */
105 public BundleSubmitXCommand(Configuration conf) {
106 super("bundle_submit", "bundle_submit", 1);
107 this.conf = ParamChecker.notNull(conf, "conf");
108 }
109
110 /**
111 * Constructor to create the bundle submit command.
112 *
113 * @param dryrun true if dryrun is enable
114 * @param conf configuration for bundle job
115 */
116 public BundleSubmitXCommand(boolean dryrun, Configuration conf) {
117 this(conf);
118 this.dryrun = dryrun;
119 }
120
121 /* (non-Javadoc)
122 * @see org.apache.oozie.command.SubmitTransitionXCommand#submit()
123 */
124 @Override
125 protected String submit() throws CommandException {
126 LOG.info("STARTED Bundle Submit");
127 try {
128 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
129
130 ParameterVerifier.verifyParameters(conf, XmlUtils.parseXml(bundleBean.getOrigJobXml()));
131
132 String jobXmlWithNoComment = XmlUtils.removeComments(this.bundleBean.getOrigJobXml().toString());
133 // Resolving all variables in the job properties.
134 // This ensures the Hadoop Configuration semantics is preserved.
135 XConfiguration resolvedVarsConf = new XConfiguration();
136 for (Map.Entry<String, String> entry : conf) {
137 resolvedVarsConf.set(entry.getKey(), conf.get(entry.getKey()));
138 }
139 conf = resolvedVarsConf;
140
141 String resolvedJobXml = resolvedVars(jobXmlWithNoComment, conf);
142
143 //verify the uniqueness of coord names
144 verifyCoordNameUnique(resolvedJobXml);
145 this.jobId = storeToDB(bundleBean, resolvedJobXml);
146 LogUtils.setLogInfo(bundleBean, logInfo);
147
148 if (dryrun) {
149 Date startTime = bundleBean.getStartTime();
150 long startTimeMilli = startTime.getTime();
151 long endTimeMilli = startTimeMilli + (3600 * 1000);
152 Date jobEndTime = bundleBean.getEndTime();
153 Date endTime = new Date(endTimeMilli);
154 if (endTime.compareTo(jobEndTime) > 0) {
155 endTime = jobEndTime;
156 }
157 jobId = bundleBean.getId();
158 LOG.info("[" + jobId + "]: Update status to PREP");
159 bundleBean.setStatus(Job.Status.PREP);
160 try {
161 new XConfiguration(new StringReader(bundleBean.getConf()));
162 }
163 catch (IOException e1) {
164 LOG.warn("Configuration parse error. read from DB :" + bundleBean.getConf(), e1);
165 }
166 String output = bundleBean.getJobXml() + System.getProperty("line.separator");
167 return output;
168 }
169 else {
170 if (bundleBean.getKickoffTime() == null) {
171 // If there is no KickOffTime, default kickoff is NOW.
172 LOG.debug("Since kickoff time is not defined for job id " + jobId
173 + ". Queuing and BundleStartXCommand immediately after submission");
174 queue(new BundleStartXCommand(jobId));
175 }
176 }
177 }
178 catch (Exception ex) {
179 throw new CommandException(ErrorCode.E1310, ex.getMessage(), ex);
180 }
181 LOG.info("ENDED Bundle Submit");
182 return this.jobId;
183 }
184
185 /* (non-Javadoc)
186 * @see org.apache.oozie.command.TransitionXCommand#notifyParent()
187 */
188 @Override
189 public void notifyParent() throws CommandException {
190 }
191
192 /* (non-Javadoc)
193 * @see org.apache.oozie.command.XCommand#getEntityKey()
194 */
195 @Override
196 public String getEntityKey() {
197 return null;
198 }
199
200 /* (non-Javadoc)
201 * @see org.apache.oozie.command.XCommand#isLockRequired()
202 */
203 @Override
204 protected boolean isLockRequired() {
205 return false;
206 }
207
208 /* (non-Javadoc)
209 * @see org.apache.oozie.command.XCommand#loadState()
210 */
211 @Override
212 protected void loadState() throws CommandException {
213 }
214
215 /* (non-Javadoc)
216 * @see org.apache.oozie.command.XCommand#verifyPrecondition()
217 */
218 @Override
219 protected void verifyPrecondition() throws CommandException, PreconditionException {
220 }
221
222 /* (non-Javadoc)
223 * @see org.apache.oozie.command.XCommand#eagerLoadState()
224 */
225 @Override
226 protected void eagerLoadState() throws CommandException {
227 super.eagerLoadState();
228 jpaService = Services.get().get(JPAService.class);
229 if (jpaService == null) {
230 throw new CommandException(ErrorCode.E0610);
231 }
232 }
233
234 /* (non-Javadoc)
235 * @see org.apache.oozie.command.XCommand#eagerVerifyPrecondition()
236 */
237 @Override
238 protected void eagerVerifyPrecondition() throws CommandException, PreconditionException {
239 try {
240 super.eagerVerifyPrecondition();
241 mergeDefaultConfig();
242 String appXml = readAndValidateXml();
243 bundleBean.setOrigJobXml(appXml);
244 LOG.debug("jobXml after initial validation " + XmlUtils.prettyPrint(appXml).toString());
245 }
246 catch (BundleJobException ex) {
247 LOG.warn("BundleJobException: ", ex);
248 throw new CommandException(ex);
249 }
250 catch (IllegalArgumentException iex) {
251 LOG.warn("IllegalArgumentException: ", iex);
252 throw new CommandException(ErrorCode.E1310, iex.getMessage(), iex);
253 }
254 catch (Exception ex) {
255 LOG.warn("Exception: ", ex);
256 throw new CommandException(ErrorCode.E1310, ex.getMessage(), ex);
257 }
258 }
259
260 /**
261 * Merge default configuration with user-defined configuration.
262 *
263 * @throws CommandException thrown if failed to merge configuration
264 */
265 protected void mergeDefaultConfig() throws CommandException {
266 Path configDefault = null;
267 try {
268 String bundleAppPathStr = conf.get(OozieClient.BUNDLE_APP_PATH);
269 Path bundleAppPath = new Path(bundleAppPathStr);
270 String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
271 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
272 Configuration fsConf = has.createJobConf(bundleAppPath.toUri().getAuthority());
273 FileSystem fs = has.createFileSystem(user, bundleAppPath.toUri(), fsConf);
274
275 // app path could be a directory
276 if (!fs.isFile(bundleAppPath)) {
277 configDefault = new Path(bundleAppPath, CONFIG_DEFAULT);
278 } else {
279 configDefault = new Path(bundleAppPath.getParent(), CONFIG_DEFAULT);
280 }
281
282 if (fs.exists(configDefault)) {
283 Configuration defaultConf = new XConfiguration(fs.open(configDefault));
284 PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES);
285 XConfiguration.injectDefaults(defaultConf, conf);
286 }
287 else {
288 LOG.info("configDefault Doesn't exist " + configDefault);
289 }
290 PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES);
291 }
292 catch (IOException e) {
293 throw new CommandException(ErrorCode.E0702, e.getMessage() + " : Problem reading default config "
294 + configDefault, e);
295 }
296 catch (HadoopAccessorException e) {
297 throw new CommandException(e);
298 }
299 LOG.debug("Merged CONF :" + XmlUtils.prettyPrint(conf).toString());
300 }
301
302 /**
303 * Read the application XML and validate against bundle Schema
304 *
305 * @return validated bundle XML
306 * @throws BundleJobException thrown if failed to read or validate xml
307 */
308 private String readAndValidateXml() throws BundleJobException {
309 String appPath = ParamChecker.notEmpty(conf.get(OozieClient.BUNDLE_APP_PATH), OozieClient.BUNDLE_APP_PATH);
310 String bundleXml = readDefinition(appPath);
311 validateXml(bundleXml);
312 return bundleXml;
313 }
314
315 /**
316 * Read bundle definition.
317 *
318 * @param appPath application path.
319 * @param user user name.
320 * @param group group name.
321 * @return bundle definition.
322 * @throws BundleJobException thrown if the definition could not be read.
323 */
324 protected String readDefinition(String appPath) throws BundleJobException {
325 String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
326 //Configuration confHadoop = CoordUtils.getHadoopConf(conf);
327 try {
328 URI uri = new URI(appPath);
329 LOG.debug("user =" + user);
330 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
331 Configuration fsConf = has.createJobConf(uri.getAuthority());
332 FileSystem fs = has.createFileSystem(user, uri, fsConf);
333 Path appDefPath = null;
334
335 // app path could be a directory
336 Path path = new Path(uri.getPath());
337 if (!fs.isFile(path)) {
338 appDefPath = new Path(path, BUNDLE_XML_FILE);
339 } else {
340 appDefPath = path;
341 }
342
343 Reader reader = new InputStreamReader(fs.open(appDefPath));
344 StringWriter writer = new StringWriter();
345 IOUtils.copyCharStream(reader, writer);
346 return writer.toString();
347 }
348 catch (IOException ex) {
349 LOG.warn("IOException :" + XmlUtils.prettyPrint(conf), ex);
350 throw new BundleJobException(ErrorCode.E1301, ex.getMessage(), ex);
351 }
352 catch (URISyntaxException ex) {
353 LOG.warn("URISyException :" + ex.getMessage());
354 throw new BundleJobException(ErrorCode.E1302, appPath, ex.getMessage(), ex);
355 }
356 catch (HadoopAccessorException ex) {
357 throw new BundleJobException(ex);
358 }
359 catch (Exception ex) {
360 LOG.warn("Exception :", ex);
361 throw new BundleJobException(ErrorCode.E1301, ex.getMessage(), ex);
362 }
363 }
364
365 /**
366 * Validate against Bundle XSD file
367 *
368 * @param xmlContent input bundle xml
369 * @throws BundleJobException thrown if failed to validate xml
370 */
371 private void validateXml(String xmlContent) throws BundleJobException {
372 javax.xml.validation.Schema schema = Services.get().get(SchemaService.class).getSchema(SchemaName.BUNDLE);
373 Validator validator = schema.newValidator();
374 try {
375 validator.validate(new StreamSource(new StringReader(xmlContent)));
376 }
377 catch (SAXException ex) {
378 LOG.warn("SAXException :", ex);
379 throw new BundleJobException(ErrorCode.E0701, ex.getMessage(), ex);
380 }
381 catch (IOException ex) {
382 LOG.warn("IOException :", ex);
383 throw new BundleJobException(ErrorCode.E0702, ex.getMessage(), ex);
384 }
385 }
386
387 /**
388 * Write a Bundle Job into database
389 *
390 * @param Bundle job bean
391 * @return job id
392 * @throws CommandException thrown if failed to store bundle job bean to db
393 */
394 private String storeToDB(BundleJobBean bundleJob, String resolvedJobXml) throws CommandException {
395 try {
396 jobId = Services.get().get(UUIDService.class).generateId(ApplicationType.BUNDLE);
397
398 bundleJob.setId(jobId);
399 String name = XmlUtils.parseXml(bundleBean.getOrigJobXml()).getAttributeValue("name");
400 name = ELUtils.resolveAppName(name, conf);
401 bundleJob.setAppName(name);
402 bundleJob.setAppPath(conf.get(OozieClient.BUNDLE_APP_PATH));
403 // bundleJob.setStatus(BundleJob.Status.PREP); //This should be set in parent class.
404 bundleJob.setCreatedTime(new Date());
405 bundleJob.setUser(conf.get(OozieClient.USER_NAME));
406 String group = ConfigUtils.getWithDeprecatedCheck(conf, OozieClient.JOB_ACL, OozieClient.GROUP_NAME, null);
407 bundleJob.setGroup(group);
408 bundleJob.setConf(XmlUtils.prettyPrint(conf).toString());
409 bundleJob.setJobXml(resolvedJobXml);
410 Element jobElement = XmlUtils.parseXml(resolvedJobXml);
411 Element controlsElement = jobElement.getChild("controls", jobElement.getNamespace());
412 if (controlsElement != null) {
413 Element kickoffTimeElement = controlsElement.getChild("kick-off-time", jobElement.getNamespace());
414 if (kickoffTimeElement != null && !kickoffTimeElement.getValue().isEmpty()) {
415 Date kickoffTime = DateUtils.parseDateOozieTZ(kickoffTimeElement.getValue());
416 bundleJob.setKickoffTime(kickoffTime);
417 }
418 }
419 bundleJob.setLastModifiedTime(new Date());
420
421 if (!dryrun) {
422 jpaService.execute(new BundleJobInsertJPAExecutor(bundleJob));
423 }
424 }
425 catch (Exception ex) {
426 throw new CommandException(ErrorCode.E1301, ex.getMessage(), ex);
427 }
428 return jobId;
429 }
430
431 /* (non-Javadoc)
432 * @see org.apache.oozie.command.TransitionXCommand#getJob()
433 */
434 @Override
435 public Job getJob() {
436 return bundleBean;
437 }
438
439 /**
440 * Resolve job xml with conf
441 *
442 * @param bundleXml bundle job xml
443 * @param conf job configuration
444 * @return resolved job xml
445 * @throws BundleJobException thrown if failed to resolve variables
446 */
447 private String resolvedVars(String bundleXml, Configuration conf) throws BundleJobException {
448 try {
449 ELEvaluator eval = createEvaluator(conf);
450 return eval.evaluate(bundleXml, String.class);
451 }
452 catch (Exception e) {
453 throw new BundleJobException(ErrorCode.E1004, e.getMessage(), e);
454 }
455 }
456
457 /**
458 * Create ELEvaluator
459 *
460 * @param conf job configuration
461 * @return ELEvaluator the evaluator for el function
462 * @throws BundleJobException thrown if failed to create evaluator
463 */
464 public ELEvaluator createEvaluator(Configuration conf) throws BundleJobException {
465 ELEvaluator eval;
466 ELEvaluator.Context context;
467 try {
468 context = new ELEvaluator.Context();
469 eval = new ELEvaluator(context);
470 for (Map.Entry<String, String> entry : conf) {
471 eval.setVariable(entry.getKey(), entry.getValue());
472 }
473 }
474 catch (Exception e) {
475 throw new BundleJobException(ErrorCode.E1004, e.getMessage(), e);
476 }
477 return eval;
478 }
479
480 /**
481 * Verify the uniqueness of coordinator names
482 *
483 * @param resolved job xml
484 * @throws CommandException thrown if failed to verify the uniqueness of coordinator names
485 */
486 @SuppressWarnings("unchecked")
487 private Void verifyCoordNameUnique(String resolvedJobXml) throws CommandException {
488 Set<String> set = new HashSet<String>();
489 try {
490 Element bAppXml = XmlUtils.parseXml(resolvedJobXml);
491 List<Element> coordElems = bAppXml.getChildren("coordinator", bAppXml.getNamespace());
492 for (Element elem : coordElems) {
493 Attribute name = elem.getAttribute("name");
494 if (name != null) {
495 if (set.contains(name.getValue())) {
496 throw new CommandException(ErrorCode.E1304, name);
497 }
498 set.add(name.getValue());
499 }
500 else {
501 throw new CommandException(ErrorCode.E1305);
502 }
503 }
504 }
505 catch (JDOMException jex) {
506 throw new CommandException(ErrorCode.E1301, jex.getMessage(), jex);
507 }
508
509 return null;
510 }
511
512 /* (non-Javadoc)
513 * @see org.apache.oozie.command.TransitionXCommand#updateJob()
514 */
515 @Override
516 public void updateJob() throws CommandException {
517 }
518
519 @Override
520 public void performWrites() throws CommandException {
521 }
522 }