This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.bundle;
019
020 import java.io.IOException;
021 import java.io.InputStreamReader;
022 import java.io.Reader;
023 import java.io.StringReader;
024 import java.io.StringWriter;
025 import java.net.URI;
026 import java.net.URISyntaxException;
027 import java.util.Date;
028 import java.util.HashSet;
029 import java.util.List;
030 import java.util.Map;
031 import java.util.Set;
032
033 import javax.xml.transform.stream.StreamSource;
034 import javax.xml.validation.Validator;
035
036 import org.apache.hadoop.conf.Configuration;
037 import org.apache.hadoop.fs.FileSystem;
038 import org.apache.hadoop.fs.Path;
039 import org.apache.oozie.BundleJobBean;
040 import org.apache.oozie.ErrorCode;
041 import org.apache.oozie.client.Job;
042 import org.apache.oozie.client.OozieClient;
043 import org.apache.oozie.command.CommandException;
044 import org.apache.oozie.command.PreconditionException;
045 import org.apache.oozie.command.SubmitTransitionXCommand;
046 import org.apache.oozie.executor.jpa.BundleJobInsertJPAExecutor;
047 import org.apache.oozie.service.HadoopAccessorException;
048 import org.apache.oozie.service.HadoopAccessorService;
049 import org.apache.oozie.service.JPAService;
050 import org.apache.oozie.service.SchemaService;
051 import org.apache.oozie.service.Services;
052 import org.apache.oozie.service.UUIDService;
053 import org.apache.oozie.service.WorkflowAppService;
054 import org.apache.oozie.service.SchemaService.SchemaName;
055 import org.apache.oozie.service.UUIDService.ApplicationType;
056 import org.apache.oozie.util.ConfigUtils;
057 import org.apache.oozie.util.DateUtils;
058 import org.apache.oozie.util.ELEvaluator;
059 import org.apache.oozie.util.IOUtils;
060 import org.apache.oozie.util.InstrumentUtils;
061 import org.apache.oozie.util.LogUtils;
062 import org.apache.oozie.util.ParamChecker;
063 import org.apache.oozie.util.PropertiesUtils;
064 import org.apache.oozie.util.XConfiguration;
065 import org.apache.oozie.util.XmlUtils;
066 import org.jdom.Attribute;
067 import org.jdom.Element;
068 import org.jdom.JDOMException;
069 import org.xml.sax.SAXException;
070
071 /**
072 * This Command will submit the bundle.
073 */
074 public class BundleSubmitXCommand extends SubmitTransitionXCommand {
075
076 private Configuration conf;
077 private final String authToken;
078 public static final String CONFIG_DEFAULT = "bundle-config-default.xml";
079 public static final String BUNDLE_XML_FILE = "bundle.xml";
080 private final BundleJobBean bundleBean = new BundleJobBean();
081 private String jobId;
082 private JPAService jpaService = null;
083
084 private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>();
085 private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>();
086
087 static {
088 String[] badUserProps = { PropertiesUtils.YEAR, PropertiesUtils.MONTH, PropertiesUtils.DAY,
089 PropertiesUtils.HOUR, PropertiesUtils.MINUTE, PropertiesUtils.DAYS, PropertiesUtils.HOURS,
090 PropertiesUtils.MINUTES, PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB,
091 PropertiesUtils.TB, PropertiesUtils.PB, PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN,
092 PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN, PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS };
093 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES);
094
095 String[] badDefaultProps = { PropertiesUtils.HADOOP_USER};
096 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES);
097 PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES);
098 }
099
100 /**
101 * Constructor to create the bundle submit command.
102 *
103 * @param conf configuration for bundle job
104 * @param authToken to be used for authentication
105 */
106 public BundleSubmitXCommand(Configuration conf, String authToken) {
107 super("bundle_submit", "bundle_submit", 1);
108 this.conf = ParamChecker.notNull(conf, "conf");
109 this.authToken = ParamChecker.notEmpty(authToken, "authToken");
110 }
111
112 /**
113 * Constructor to create the bundle submit command.
114 *
115 * @param dryrun true if dryrun is enable
116 * @param conf configuration for bundle job
117 * @param authToken to be used for authentication
118 */
119 public BundleSubmitXCommand(boolean dryrun, Configuration conf, String authToken) {
120 this(conf, authToken);
121 this.dryrun = dryrun;
122 }
123
124 /* (non-Javadoc)
125 * @see org.apache.oozie.command.SubmitTransitionXCommand#submit()
126 */
127 @Override
128 protected String submit() throws CommandException {
129 LOG.info("STARTED Bundle Submit");
130 try {
131 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
132
133 XmlUtils.removeComments(this.bundleBean.getOrigJobXml().toString());
134 // Resolving all variables in the job properties.
135 // This ensures the Hadoop Configuration semantics is preserved.
136 XConfiguration resolvedVarsConf = new XConfiguration();
137 for (Map.Entry<String, String> entry : conf) {
138 resolvedVarsConf.set(entry.getKey(), conf.get(entry.getKey()));
139 }
140 conf = resolvedVarsConf;
141
142 String resolvedJobXml = resolvedVars(bundleBean.getOrigJobXml(), conf);
143
144 //verify the uniqueness of coord names
145 verifyCoordNameUnique(resolvedJobXml);
146 this.jobId = storeToDB(bundleBean, resolvedJobXml);
147 LogUtils.setLogInfo(bundleBean, logInfo);
148
149 if (dryrun) {
150 Date startTime = bundleBean.getStartTime();
151 long startTimeMilli = startTime.getTime();
152 long endTimeMilli = startTimeMilli + (3600 * 1000);
153 Date jobEndTime = bundleBean.getEndTime();
154 Date endTime = new Date(endTimeMilli);
155 if (endTime.compareTo(jobEndTime) > 0) {
156 endTime = jobEndTime;
157 }
158 jobId = bundleBean.getId();
159 LOG.info("[" + jobId + "]: Update status to PREP");
160 bundleBean.setStatus(Job.Status.PREP);
161 try {
162 new XConfiguration(new StringReader(bundleBean.getConf()));
163 }
164 catch (IOException e1) {
165 LOG.warn("Configuration parse error. read from DB :" + bundleBean.getConf(), e1);
166 }
167 String output = bundleBean.getJobXml() + System.getProperty("line.separator");
168 return output;
169 }
170 else {
171 if (bundleBean.getKickoffTime() == null) {
172 // If there is no KickOffTime, default kickoff is NOW.
173 LOG.debug("Since kickoff time is not defined for job id " + jobId
174 + ". Queuing and BundleStartXCommand immediately after submission");
175 queue(new BundleStartXCommand(jobId));
176 }
177 }
178 }
179 catch (Exception ex) {
180 throw new CommandException(ErrorCode.E1310, ex.getMessage(), ex);
181 }
182 LOG.info("ENDED Bundle Submit");
183 return this.jobId;
184 }
185
186 /* (non-Javadoc)
187 * @see org.apache.oozie.command.TransitionXCommand#notifyParent()
188 */
189 @Override
190 public void notifyParent() throws CommandException {
191 }
192
193 /* (non-Javadoc)
194 * @see org.apache.oozie.command.XCommand#getEntityKey()
195 */
196 @Override
197 public String getEntityKey() {
198 return null;
199 }
200
201 /* (non-Javadoc)
202 * @see org.apache.oozie.command.XCommand#isLockRequired()
203 */
204 @Override
205 protected boolean isLockRequired() {
206 return false;
207 }
208
209 /* (non-Javadoc)
210 * @see org.apache.oozie.command.XCommand#loadState()
211 */
212 @Override
213 protected void loadState() throws CommandException {
214 }
215
216 /* (non-Javadoc)
217 * @see org.apache.oozie.command.XCommand#verifyPrecondition()
218 */
219 @Override
220 protected void verifyPrecondition() throws CommandException, PreconditionException {
221 }
222
223 /* (non-Javadoc)
224 * @see org.apache.oozie.command.XCommand#eagerLoadState()
225 */
226 @Override
227 protected void eagerLoadState() throws CommandException {
228 super.eagerLoadState();
229 jpaService = Services.get().get(JPAService.class);
230 if (jpaService == null) {
231 throw new CommandException(ErrorCode.E0610);
232 }
233 }
234
235 /* (non-Javadoc)
236 * @see org.apache.oozie.command.XCommand#eagerVerifyPrecondition()
237 */
238 @Override
239 protected void eagerVerifyPrecondition() throws CommandException, PreconditionException {
240 try {
241 super.eagerVerifyPrecondition();
242 mergeDefaultConfig();
243 String appXml = readAndValidateXml();
244 bundleBean.setOrigJobXml(appXml);
245 LOG.debug("jobXml after initial validation " + XmlUtils.prettyPrint(appXml).toString());
246 }
247 catch (BundleJobException ex) {
248 LOG.warn("BundleJobException: ", ex);
249 throw new CommandException(ex);
250 }
251 catch (IllegalArgumentException iex) {
252 LOG.warn("IllegalArgumentException: ", iex);
253 throw new CommandException(ErrorCode.E1310, iex);
254 }
255 catch (Exception ex) {
256 LOG.warn("Exception: ", ex);
257 throw new CommandException(ErrorCode.E1310, ex);
258 }
259 }
260
261 /**
262 * Merge default configuration with user-defined configuration.
263 *
264 * @throws CommandException thrown if failed to merge configuration
265 */
266 protected void mergeDefaultConfig() throws CommandException {
267 Path configDefault = null;
268 try {
269 String bundleAppPathStr = conf.get(OozieClient.BUNDLE_APP_PATH);
270 Path bundleAppPath = new Path(bundleAppPathStr);
271 String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
272 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
273 Configuration fsConf = has.createJobConf(bundleAppPath.toUri().getAuthority());
274 FileSystem fs = has.createFileSystem(user, bundleAppPath.toUri(), fsConf);
275
276 // app path could be a directory
277 if (!fs.isFile(bundleAppPath)) {
278 configDefault = new Path(bundleAppPath, CONFIG_DEFAULT);
279 } else {
280 configDefault = new Path(bundleAppPath.getParent(), CONFIG_DEFAULT);
281 }
282
283 if (fs.exists(configDefault)) {
284 Configuration defaultConf = new XConfiguration(fs.open(configDefault));
285 PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES);
286 XConfiguration.injectDefaults(defaultConf, conf);
287 }
288 else {
289 LOG.info("configDefault Doesn't exist " + configDefault);
290 }
291 PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES);
292 }
293 catch (IOException e) {
294 throw new CommandException(ErrorCode.E0702, e.getMessage() + " : Problem reading default config "
295 + configDefault, e);
296 }
297 catch (HadoopAccessorException e) {
298 throw new CommandException(e);
299 }
300 LOG.debug("Merged CONF :" + XmlUtils.prettyPrint(conf).toString());
301 }
302
303 /**
304 * Read the application XML and validate against bundle Schema
305 *
306 * @return validated bundle XML
307 * @throws BundleJobException thrown if failed to read or validate xml
308 */
309 private String readAndValidateXml() throws BundleJobException {
310 String appPath = ParamChecker.notEmpty(conf.get(OozieClient.BUNDLE_APP_PATH), OozieClient.BUNDLE_APP_PATH);
311 String bundleXml = readDefinition(appPath);
312 validateXml(bundleXml);
313 return bundleXml;
314 }
315
316 /**
317 * Read bundle definition.
318 *
319 * @param appPath application path.
320 * @param user user name.
321 * @param group group name.
322 * @param autToken authentication token.
323 * @return bundle definition.
324 * @throws BundleJobException thrown if the definition could not be read.
325 */
326 protected String readDefinition(String appPath) throws BundleJobException {
327 String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
328 //Configuration confHadoop = CoordUtils.getHadoopConf(conf);
329 try {
330 URI uri = new URI(appPath);
331 LOG.debug("user =" + user);
332 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
333 Configuration fsConf = has.createJobConf(uri.getAuthority());
334 FileSystem fs = has.createFileSystem(user, uri, fsConf);
335 Path appDefPath = null;
336
337 // app path could be a directory
338 Path path = new Path(uri.getPath());
339 if (!fs.isFile(path)) {
340 appDefPath = new Path(path, BUNDLE_XML_FILE);
341 } else {
342 appDefPath = path;
343 }
344
345 Reader reader = new InputStreamReader(fs.open(appDefPath));
346 StringWriter writer = new StringWriter();
347 IOUtils.copyCharStream(reader, writer);
348 return writer.toString();
349 }
350 catch (IOException ex) {
351 LOG.warn("IOException :" + XmlUtils.prettyPrint(conf), ex);
352 throw new BundleJobException(ErrorCode.E1301, ex.getMessage(), ex);
353 }
354 catch (URISyntaxException ex) {
355 LOG.warn("URISyException :" + ex.getMessage());
356 throw new BundleJobException(ErrorCode.E1302, appPath, ex.getMessage(), ex);
357 }
358 catch (HadoopAccessorException ex) {
359 throw new BundleJobException(ex);
360 }
361 catch (Exception ex) {
362 LOG.warn("Exception :", ex);
363 throw new BundleJobException(ErrorCode.E1301, ex.getMessage(), ex);
364 }
365 }
366
367 /**
368 * Validate against Bundle XSD file
369 *
370 * @param xmlContent input bundle xml
371 * @throws BundleJobException thrown if failed to validate xml
372 */
373 private void validateXml(String xmlContent) throws BundleJobException {
374 javax.xml.validation.Schema schema = Services.get().get(SchemaService.class).getSchema(SchemaName.BUNDLE);
375 Validator validator = schema.newValidator();
376 try {
377 validator.validate(new StreamSource(new StringReader(xmlContent)));
378 }
379 catch (SAXException ex) {
380 LOG.warn("SAXException :", ex);
381 throw new BundleJobException(ErrorCode.E0701, ex.getMessage(), ex);
382 }
383 catch (IOException ex) {
384 LOG.warn("IOException :", ex);
385 throw new BundleJobException(ErrorCode.E0702, ex.getMessage(), ex);
386 }
387 }
388
389 /**
390 * Write a Bundle Job into database
391 *
392 * @param Bundle job bean
393 * @return job id
394 * @throws CommandException thrown if failed to store bundle job bean to db
395 */
396 private String storeToDB(BundleJobBean bundleJob, String resolvedJobXml) throws CommandException {
397 try {
398 jobId = Services.get().get(UUIDService.class).generateId(ApplicationType.BUNDLE);
399
400 bundleJob.setId(jobId);
401 bundleJob.setAuthToken(this.authToken);
402 bundleJob.setAppName(XmlUtils.parseXml(bundleBean.getOrigJobXml()).getAttributeValue("name"));
403 bundleJob.setAppName(bundleJob.getAppName());
404 bundleJob.setAppPath(conf.get(OozieClient.BUNDLE_APP_PATH));
405 // bundleJob.setStatus(BundleJob.Status.PREP); //This should be set in parent class.
406 bundleJob.setCreatedTime(new Date());
407 bundleJob.setUser(conf.get(OozieClient.USER_NAME));
408 String group = ConfigUtils.getWithDeprecatedCheck(conf, OozieClient.JOB_ACL, OozieClient.GROUP_NAME, null);
409 bundleJob.setGroup(group);
410 bundleJob.setConf(XmlUtils.prettyPrint(conf).toString());
411 bundleJob.setJobXml(resolvedJobXml);
412 Element jobElement = XmlUtils.parseXml(resolvedJobXml);
413 Element controlsElement = jobElement.getChild("controls", jobElement.getNamespace());
414 if (controlsElement != null) {
415 Element kickoffTimeElement = controlsElement.getChild("kick-off-time", jobElement.getNamespace());
416 if (kickoffTimeElement != null && !kickoffTimeElement.getValue().isEmpty()) {
417 Date kickoffTime = DateUtils.parseDateUTC(kickoffTimeElement.getValue());
418 bundleJob.setKickoffTime(kickoffTime);
419 }
420 }
421 bundleJob.setLastModifiedTime(new Date());
422
423 if (!dryrun) {
424 jpaService.execute(new BundleJobInsertJPAExecutor(bundleJob));
425 }
426 }
427 catch (Exception ex) {
428 throw new CommandException(ErrorCode.E1301, ex.getMessage(), ex);
429 }
430 return jobId;
431 }
432
433 /* (non-Javadoc)
434 * @see org.apache.oozie.command.TransitionXCommand#getJob()
435 */
436 @Override
437 public Job getJob() {
438 return bundleBean;
439 }
440
441 /**
442 * Resolve job xml with conf
443 *
444 * @param bundleXml bundle job xml
445 * @param conf job configuration
446 * @return resolved job xml
447 * @throws BundleJobException thrown if failed to resolve variables
448 */
449 private String resolvedVars(String bundleXml, Configuration conf) throws BundleJobException {
450 try {
451 ELEvaluator eval = createEvaluator(conf);
452 return eval.evaluate(bundleXml, String.class);
453 }
454 catch (Exception e) {
455 throw new BundleJobException(ErrorCode.E1004, e.getMessage(), e);
456 }
457 }
458
459 /**
460 * Create ELEvaluator
461 *
462 * @param conf job configuration
463 * @return ELEvaluator the evaluator for el function
464 * @throws BundleJobException thrown if failed to create evaluator
465 */
466 public ELEvaluator createEvaluator(Configuration conf) throws BundleJobException {
467 ELEvaluator eval;
468 ELEvaluator.Context context;
469 try {
470 context = new ELEvaluator.Context();
471 eval = new ELEvaluator(context);
472 for (Map.Entry<String, String> entry : conf) {
473 eval.setVariable(entry.getKey(), entry.getValue());
474 }
475 }
476 catch (Exception e) {
477 throw new BundleJobException(ErrorCode.E1004, e.getMessage(), e);
478 }
479 return eval;
480 }
481
482 /**
483 * Verify the uniqueness of coordinator names
484 *
485 * @param resolved job xml
486 * @throws CommandException thrown if failed to verify the uniqueness of coordinator names
487 */
488 @SuppressWarnings("unchecked")
489 private Void verifyCoordNameUnique(String resolvedJobXml) throws CommandException {
490 Set<String> set = new HashSet<String>();
491 try {
492 Element bAppXml = XmlUtils.parseXml(resolvedJobXml);
493 List<Element> coordElems = bAppXml.getChildren("coordinator", bAppXml.getNamespace());
494 for (Element elem : coordElems) {
495 Attribute name = elem.getAttribute("name");
496 if (name != null) {
497 if (set.contains(name.getValue())) {
498 throw new CommandException(ErrorCode.E1304, name);
499 }
500 set.add(name.getValue());
501 }
502 else {
503 throw new CommandException(ErrorCode.E1305);
504 }
505 }
506 }
507 catch (JDOMException jex) {
508 throw new CommandException(ErrorCode.E1301, jex);
509 }
510
511 return null;
512 }
513
514 /* (non-Javadoc)
515 * @see org.apache.oozie.command.TransitionXCommand#updateJob()
516 */
517 @Override
518 public void updateJob() throws CommandException {
519 }
520 }