This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.bundle;
019
020 import java.io.IOException;
021 import java.io.InputStreamReader;
022 import java.io.Reader;
023 import java.io.StringReader;
024 import java.io.StringWriter;
025 import java.net.URI;
026 import java.net.URISyntaxException;
027 import java.util.Date;
028 import java.util.HashSet;
029 import java.util.List;
030 import java.util.Map;
031 import java.util.Set;
032
033 import javax.xml.transform.stream.StreamSource;
034 import javax.xml.validation.Validator;
035
036 import org.apache.hadoop.conf.Configuration;
037 import org.apache.hadoop.fs.FileSystem;
038 import org.apache.hadoop.fs.Path;
039 import org.apache.oozie.BundleJobBean;
040 import org.apache.oozie.ErrorCode;
041 import org.apache.oozie.client.Job;
042 import org.apache.oozie.client.OozieClient;
043 import org.apache.oozie.command.CommandException;
044 import org.apache.oozie.command.PreconditionException;
045 import org.apache.oozie.command.SubmitTransitionXCommand;
046 import org.apache.oozie.executor.jpa.BundleJobInsertJPAExecutor;
047 import org.apache.oozie.service.HadoopAccessorException;
048 import org.apache.oozie.service.HadoopAccessorService;
049 import org.apache.oozie.service.JPAService;
050 import org.apache.oozie.service.SchemaService;
051 import org.apache.oozie.service.Services;
052 import org.apache.oozie.service.UUIDService;
053 import org.apache.oozie.service.WorkflowAppService;
054 import org.apache.oozie.service.SchemaService.SchemaName;
055 import org.apache.oozie.service.UUIDService.ApplicationType;
056 import org.apache.oozie.util.DateUtils;
057 import org.apache.oozie.util.ELEvaluator;
058 import org.apache.oozie.util.IOUtils;
059 import org.apache.oozie.util.InstrumentUtils;
060 import org.apache.oozie.util.LogUtils;
061 import org.apache.oozie.util.ParamChecker;
062 import org.apache.oozie.util.PropertiesUtils;
063 import org.apache.oozie.util.XConfiguration;
064 import org.apache.oozie.util.XmlUtils;
065 import org.jdom.Attribute;
066 import org.jdom.Element;
067 import org.jdom.JDOMException;
068 import org.xml.sax.SAXException;
069
070 /**
071 * This Command will submit the bundle.
072 */
073 public class BundleSubmitXCommand extends SubmitTransitionXCommand {
074
075 private Configuration conf;
076 private final String authToken;
077 public static final String CONFIG_DEFAULT = "bundle-config-default.xml";
078 public static final String BUNDLE_XML_FILE = "bundle.xml";
079 private final BundleJobBean bundleBean = new BundleJobBean();
080 private String jobId;
081 private JPAService jpaService = null;
082
083 private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>();
084 private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>();
085
086 static {
087 String[] badUserProps = { PropertiesUtils.YEAR, PropertiesUtils.MONTH, PropertiesUtils.DAY,
088 PropertiesUtils.HOUR, PropertiesUtils.MINUTE, PropertiesUtils.DAYS, PropertiesUtils.HOURS,
089 PropertiesUtils.MINUTES, PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB,
090 PropertiesUtils.TB, PropertiesUtils.PB, PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN,
091 PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN, PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS };
092 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES);
093
094 String[] badDefaultProps = { PropertiesUtils.HADOOP_USER, PropertiesUtils.HADOOP_UGI,
095 WorkflowAppService.HADOOP_JT_KERBEROS_NAME, WorkflowAppService.HADOOP_NN_KERBEROS_NAME };
096 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES);
097 PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES);
098 }
099
100 /**
101 * Constructor to create the bundle submit command.
102 *
103 * @param conf configuration for bundle job
104 * @param authToken to be used for authentication
105 */
106 public BundleSubmitXCommand(Configuration conf, String authToken) {
107 super("bundle_submit", "bundle_submit", 1);
108 this.conf = ParamChecker.notNull(conf, "conf");
109 this.authToken = ParamChecker.notEmpty(authToken, "authToken");
110 }
111
112 /**
113 * Constructor to create the bundle submit command.
114 *
115 * @param dryrun true if dryrun is enable
116 * @param conf configuration for bundle job
117 * @param authToken to be used for authentication
118 */
119 public BundleSubmitXCommand(boolean dryrun, Configuration conf, String authToken) {
120 this(conf, authToken);
121 this.dryrun = dryrun;
122 }
123
124 /* (non-Javadoc)
125 * @see org.apache.oozie.command.SubmitTransitionXCommand#submit()
126 */
127 @Override
128 protected String submit() throws CommandException {
129 LOG.info("STARTED Bundle Submit");
130 try {
131 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
132
133 XmlUtils.removeComments(this.bundleBean.getOrigJobXml().toString());
134 // Resolving all variables in the job properties.
135 // This ensures the Hadoop Configuration semantics is preserved.
136 XConfiguration resolvedVarsConf = new XConfiguration();
137 for (Map.Entry<String, String> entry : conf) {
138 resolvedVarsConf.set(entry.getKey(), conf.get(entry.getKey()));
139 }
140 conf = resolvedVarsConf;
141
142 String resolvedJobXml = resolvedVars(bundleBean.getOrigJobXml(), conf);
143
144 //verify the uniqueness of coord names
145 verifyCoordNameUnique(resolvedJobXml);
146 this.jobId = storeToDB(bundleBean, resolvedJobXml);
147 LogUtils.setLogInfo(bundleBean, logInfo);
148
149 if (dryrun) {
150 Date startTime = bundleBean.getStartTime();
151 long startTimeMilli = startTime.getTime();
152 long endTimeMilli = startTimeMilli + (3600 * 1000);
153 Date jobEndTime = bundleBean.getEndTime();
154 Date endTime = new Date(endTimeMilli);
155 if (endTime.compareTo(jobEndTime) > 0) {
156 endTime = jobEndTime;
157 }
158 jobId = bundleBean.getId();
159 LOG.info("[" + jobId + "]: Update status to PREP");
160 bundleBean.setStatus(Job.Status.PREP);
161 try {
162 new XConfiguration(new StringReader(bundleBean.getConf()));
163 }
164 catch (IOException e1) {
165 LOG.warn("Configuration parse error. read from DB :" + bundleBean.getConf(), e1);
166 }
167 String output = bundleBean.getJobXml() + System.getProperty("line.separator");
168 return output;
169 }
170 else {
171 if (bundleBean.getKickoffTime() == null) {
172 // If there is no KickOffTime, default kickoff is NOW.
173 LOG.debug("Since kickoff time is not defined for job id " + jobId
174 + ". Queuing and BundleStartXCommand immediately after submission");
175 queue(new BundleStartXCommand(jobId));
176 }
177 }
178 }
179 catch (Exception ex) {
180 throw new CommandException(ErrorCode.E1310, ex.getMessage(), ex);
181 }
182 LOG.info("ENDED Bundle Submit");
183 return this.jobId;
184 }
185
186 /* (non-Javadoc)
187 * @see org.apache.oozie.command.TransitionXCommand#notifyParent()
188 */
189 @Override
190 public void notifyParent() throws CommandException {
191 }
192
193 /* (non-Javadoc)
194 * @see org.apache.oozie.command.XCommand#getEntityKey()
195 */
196 @Override
197 protected String getEntityKey() {
198 return null;
199 }
200
201 /* (non-Javadoc)
202 * @see org.apache.oozie.command.XCommand#isLockRequired()
203 */
204 @Override
205 protected boolean isLockRequired() {
206 return false;
207 }
208
209 /* (non-Javadoc)
210 * @see org.apache.oozie.command.XCommand#loadState()
211 */
212 @Override
213 protected void loadState() throws CommandException {
214 }
215
216 /* (non-Javadoc)
217 * @see org.apache.oozie.command.XCommand#verifyPrecondition()
218 */
219 @Override
220 protected void verifyPrecondition() throws CommandException, PreconditionException {
221 }
222
223 /* (non-Javadoc)
224 * @see org.apache.oozie.command.XCommand#eagerLoadState()
225 */
226 @Override
227 protected void eagerLoadState() throws CommandException {
228 super.eagerLoadState();
229 jpaService = Services.get().get(JPAService.class);
230 if (jpaService == null) {
231 throw new CommandException(ErrorCode.E0610);
232 }
233 }
234
235 /* (non-Javadoc)
236 * @see org.apache.oozie.command.XCommand#eagerVerifyPrecondition()
237 */
238 @Override
239 protected void eagerVerifyPrecondition() throws CommandException, PreconditionException {
240 try {
241 super.eagerVerifyPrecondition();
242 mergeDefaultConfig();
243 String appXml = readAndValidateXml();
244 bundleBean.setOrigJobXml(appXml);
245 LOG.debug("jobXml after initial validation " + XmlUtils.prettyPrint(appXml).toString());
246 }
247 catch (BundleJobException ex) {
248 LOG.warn("BundleJobException: ", ex);
249 throw new CommandException(ex);
250 }
251 catch (IllegalArgumentException iex) {
252 LOG.warn("IllegalArgumentException: ", iex);
253 throw new CommandException(ErrorCode.E1310, iex);
254 }
255 catch (Exception ex) {
256 LOG.warn("Exception: ", ex);
257 throw new CommandException(ErrorCode.E1310, ex);
258 }
259 }
260
261 /**
262 * Merge default configuration with user-defined configuration.
263 *
264 * @throws CommandException thrown if failed to merge configuration
265 */
266 protected void mergeDefaultConfig() throws CommandException {
267 Path configDefault = null;
268 try {
269 String bundleAppPathStr = conf.get(OozieClient.BUNDLE_APP_PATH);
270 Path bundleAppPath = new Path(bundleAppPathStr);
271 String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
272 String group = ParamChecker.notEmpty(conf.get(OozieClient.GROUP_NAME), OozieClient.GROUP_NAME);
273 FileSystem fs = Services.get().get(HadoopAccessorService.class).createFileSystem(user, group, bundleAppPath.toUri(),
274 new Configuration());
275
276 // app path could be a directory
277 if (!fs.isFile(bundleAppPath)) {
278 configDefault = new Path(bundleAppPath, CONFIG_DEFAULT);
279 } else {
280 configDefault = new Path(bundleAppPath.getParent(), CONFIG_DEFAULT);
281 }
282
283 if (fs.exists(configDefault)) {
284 Configuration defaultConf = new XConfiguration(fs.open(configDefault));
285 PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES);
286 XConfiguration.injectDefaults(defaultConf, conf);
287 }
288 else {
289 LOG.info("configDefault Doesn't exist " + configDefault);
290 }
291 PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES);
292 }
293 catch (IOException e) {
294 throw new CommandException(ErrorCode.E0702, e.getMessage() + " : Problem reading default config "
295 + configDefault, e);
296 }
297 catch (HadoopAccessorException e) {
298 throw new CommandException(e);
299 }
300 LOG.debug("Merged CONF :" + XmlUtils.prettyPrint(conf).toString());
301 }
302
303 /**
304 * Read the application XML and validate against bundle Schema
305 *
306 * @return validated bundle XML
307 * @throws BundleJobException thrown if failed to read or validate xml
308 */
309 private String readAndValidateXml() throws BundleJobException {
310 String appPath = ParamChecker.notEmpty(conf.get(OozieClient.BUNDLE_APP_PATH), OozieClient.BUNDLE_APP_PATH);
311 String bundleXml = readDefinition(appPath);
312 validateXml(bundleXml);
313 return bundleXml;
314 }
315
316 /**
317 * Read bundle definition.
318 *
319 * @param appPath application path.
320 * @param user user name.
321 * @param group group name.
322 * @param autToken authentication token.
323 * @return bundle definition.
324 * @throws BundleJobException thrown if the definition could not be read.
325 */
326 protected String readDefinition(String appPath) throws BundleJobException {
327 String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
328 String group = ParamChecker.notEmpty(conf.get(OozieClient.GROUP_NAME), OozieClient.GROUP_NAME);
329 //Configuration confHadoop = CoordUtils.getHadoopConf(conf);
330 try {
331 URI uri = new URI(appPath);
332 LOG.debug("user =" + user + " group =" + group);
333 FileSystem fs = Services.get().get(HadoopAccessorService.class).createFileSystem(user, group, uri,
334 new Configuration());
335 Path appDefPath = null;
336
337 // app path could be a directory
338 Path path = new Path(uri.getPath());
339 if (!fs.isFile(path)) {
340 appDefPath = new Path(path, BUNDLE_XML_FILE);
341 } else {
342 appDefPath = path;
343 }
344
345 Reader reader = new InputStreamReader(fs.open(appDefPath));
346 StringWriter writer = new StringWriter();
347 IOUtils.copyCharStream(reader, writer);
348 return writer.toString();
349 }
350 catch (IOException ex) {
351 LOG.warn("IOException :" + XmlUtils.prettyPrint(conf), ex);
352 throw new BundleJobException(ErrorCode.E1301, ex.getMessage(), ex);
353 }
354 catch (URISyntaxException ex) {
355 LOG.warn("URISyException :" + ex.getMessage());
356 throw new BundleJobException(ErrorCode.E1302, appPath, ex.getMessage(), ex);
357 }
358 catch (HadoopAccessorException ex) {
359 throw new BundleJobException(ex);
360 }
361 catch (Exception ex) {
362 LOG.warn("Exception :", ex);
363 throw new BundleJobException(ErrorCode.E1301, ex.getMessage(), ex);
364 }
365 }
366
367 /**
368 * Validate against Bundle XSD file
369 *
370 * @param xmlContent input bundle xml
371 * @throws BundleJobException thrown if failed to validate xml
372 */
373 private void validateXml(String xmlContent) throws BundleJobException {
374 javax.xml.validation.Schema schema = Services.get().get(SchemaService.class).getSchema(SchemaName.BUNDLE);
375 Validator validator = schema.newValidator();
376 try {
377 validator.validate(new StreamSource(new StringReader(xmlContent)));
378 }
379 catch (SAXException ex) {
380 LOG.warn("SAXException :", ex);
381 throw new BundleJobException(ErrorCode.E0701, ex.getMessage(), ex);
382 }
383 catch (IOException ex) {
384 LOG.warn("IOException :", ex);
385 throw new BundleJobException(ErrorCode.E0702, ex.getMessage(), ex);
386 }
387 }
388
389 /**
390 * Write a Bundle Job into database
391 *
392 * @param Bundle job bean
393 * @return job id
394 * @throws CommandException thrown if failed to store bundle job bean to db
395 */
396 private String storeToDB(BundleJobBean bundleJob, String resolvedJobXml) throws CommandException {
397 try {
398 jobId = Services.get().get(UUIDService.class).generateId(ApplicationType.BUNDLE);
399
400 bundleJob.setId(jobId);
401 bundleJob.setAuthToken(this.authToken);
402 bundleJob.setAppName(XmlUtils.parseXml(bundleBean.getOrigJobXml()).getAttributeValue("name"));
403 bundleJob.setAppName(bundleJob.getAppName());
404 bundleJob.setAppPath(conf.get(OozieClient.BUNDLE_APP_PATH));
405 // bundleJob.setStatus(BundleJob.Status.PREP); //This should be set in parent class.
406 bundleJob.setCreatedTime(new Date());
407 bundleJob.setUser(conf.get(OozieClient.USER_NAME));
408 bundleJob.setGroup(conf.get(OozieClient.GROUP_NAME));
409 bundleJob.setConf(XmlUtils.prettyPrint(conf).toString());
410 bundleJob.setJobXml(resolvedJobXml);
411 Element jobElement = XmlUtils.parseXml(resolvedJobXml);
412 Element controlsElement = jobElement.getChild("controls", jobElement.getNamespace());
413 if (controlsElement != null) {
414 Element kickoffTimeElement = controlsElement.getChild("kick-off-time", jobElement.getNamespace());
415 if (kickoffTimeElement != null && !kickoffTimeElement.getValue().isEmpty()) {
416 Date kickoffTime = DateUtils.parseDateUTC(kickoffTimeElement.getValue());
417 bundleJob.setKickoffTime(kickoffTime);
418 }
419 }
420 bundleJob.setLastModifiedTime(new Date());
421
422 if (!dryrun) {
423 jpaService.execute(new BundleJobInsertJPAExecutor(bundleJob));
424 }
425 }
426 catch (Exception ex) {
427 throw new CommandException(ErrorCode.E1301, ex.getMessage(), ex);
428 }
429 return jobId;
430 }
431
432 /* (non-Javadoc)
433 * @see org.apache.oozie.command.TransitionXCommand#getJob()
434 */
435 @Override
436 public Job getJob() {
437 return bundleBean;
438 }
439
440 /**
441 * Resolve job xml with conf
442 *
443 * @param bundleXml bundle job xml
444 * @param conf job configuration
445 * @return resolved job xml
446 * @throws BundleJobException thrown if failed to resolve variables
447 */
448 private String resolvedVars(String bundleXml, Configuration conf) throws BundleJobException {
449 try {
450 ELEvaluator eval = createEvaluator(conf);
451 return eval.evaluate(bundleXml, String.class);
452 }
453 catch (Exception e) {
454 throw new BundleJobException(ErrorCode.E1004, e.getMessage(), e);
455 }
456 }
457
458 /**
459 * Create ELEvaluator
460 *
461 * @param conf job configuration
462 * @return ELEvaluator the evaluator for el function
463 * @throws BundleJobException thrown if failed to create evaluator
464 */
465 public ELEvaluator createEvaluator(Configuration conf) throws BundleJobException {
466 ELEvaluator eval;
467 ELEvaluator.Context context;
468 try {
469 context = new ELEvaluator.Context();
470 eval = new ELEvaluator(context);
471 for (Map.Entry<String, String> entry : conf) {
472 eval.setVariable(entry.getKey(), entry.getValue());
473 }
474 }
475 catch (Exception e) {
476 throw new BundleJobException(ErrorCode.E1004, e.getMessage(), e);
477 }
478 return eval;
479 }
480
481 /**
482 * Verify the uniqueness of coordinator names
483 *
484 * @param resolved job xml
485 * @throws CommandException thrown if failed to verify the uniqueness of coordinator names
486 */
487 @SuppressWarnings("unchecked")
488 private Void verifyCoordNameUnique(String resolvedJobXml) throws CommandException {
489 Set<String> set = new HashSet<String>();
490 try {
491 Element bAppXml = XmlUtils.parseXml(resolvedJobXml);
492 List<Element> coordElems = bAppXml.getChildren("coordinator", bAppXml.getNamespace());
493 for (Element elem : coordElems) {
494 Attribute name = elem.getAttribute("name");
495 if (name != null) {
496 if (set.contains(name.getValue())) {
497 throw new CommandException(ErrorCode.E1304, name);
498 }
499 set.add(name.getValue());
500 }
501 else {
502 throw new CommandException(ErrorCode.E1305);
503 }
504 }
505 }
506 catch (JDOMException jex) {
507 throw new CommandException(ErrorCode.E1301, jex);
508 }
509
510 return null;
511 }
512
513 /* (non-Javadoc)
514 * @see org.apache.oozie.command.TransitionXCommand#updateJob()
515 */
516 @Override
517 public void updateJob() throws CommandException {
518 }
519 }