001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.bundle;
019    
020    import java.io.IOException;
021    import java.io.InputStreamReader;
022    import java.io.Reader;
023    import java.io.StringReader;
024    import java.io.StringWriter;
025    import java.net.URI;
026    import java.net.URISyntaxException;
027    import java.util.Date;
028    import java.util.HashSet;
029    import java.util.List;
030    import java.util.Map;
031    import java.util.Set;
032    
033    import javax.xml.transform.stream.StreamSource;
034    import javax.xml.validation.Validator;
035    
036    import org.apache.hadoop.conf.Configuration;
037    import org.apache.hadoop.fs.FileSystem;
038    import org.apache.hadoop.fs.Path;
039    import org.apache.oozie.BundleJobBean;
040    import org.apache.oozie.ErrorCode;
041    import org.apache.oozie.client.Job;
042    import org.apache.oozie.client.OozieClient;
043    import org.apache.oozie.command.CommandException;
044    import org.apache.oozie.command.PreconditionException;
045    import org.apache.oozie.command.SubmitTransitionXCommand;
046    import org.apache.oozie.executor.jpa.BundleJobInsertJPAExecutor;
047    import org.apache.oozie.service.HadoopAccessorException;
048    import org.apache.oozie.service.HadoopAccessorService;
049    import org.apache.oozie.service.JPAService;
050    import org.apache.oozie.service.SchemaService;
051    import org.apache.oozie.service.Services;
052    import org.apache.oozie.service.UUIDService;
053    import org.apache.oozie.service.SchemaService.SchemaName;
054    import org.apache.oozie.service.UUIDService.ApplicationType;
055    import org.apache.oozie.util.ConfigUtils;
056    import org.apache.oozie.util.DateUtils;
057    import org.apache.oozie.util.ELEvaluator;
058    import org.apache.oozie.util.ELUtils;
059    import org.apache.oozie.util.IOUtils;
060    import org.apache.oozie.util.InstrumentUtils;
061    import org.apache.oozie.util.LogUtils;
062    import org.apache.oozie.util.ParamChecker;
063    import org.apache.oozie.util.PropertiesUtils;
064    import org.apache.oozie.util.XConfiguration;
065    import org.apache.oozie.util.XmlUtils;
066    import org.apache.oozie.util.ParameterVerifier;
067    import org.jdom.Attribute;
068    import org.jdom.Element;
069    import org.jdom.JDOMException;
070    import org.xml.sax.SAXException;
071    
072    /**
073     * This Command will submit the bundle.
074     */
075    public class BundleSubmitXCommand extends SubmitTransitionXCommand {
076    
077        private Configuration conf;
078        public static final String CONFIG_DEFAULT = "bundle-config-default.xml";
079        public static final String BUNDLE_XML_FILE = "bundle.xml";
080        private final BundleJobBean bundleBean = new BundleJobBean();
081        private String jobId;
082        private JPAService jpaService = null;
083    
084        private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>();
085        private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>();
086    
087        static {
088            String[] badUserProps = { PropertiesUtils.YEAR, PropertiesUtils.MONTH, PropertiesUtils.DAY,
089                    PropertiesUtils.HOUR, PropertiesUtils.MINUTE, PropertiesUtils.DAYS, PropertiesUtils.HOURS,
090                    PropertiesUtils.MINUTES, PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB,
091                    PropertiesUtils.TB, PropertiesUtils.PB, PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN,
092                    PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN, PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS };
093            PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES);
094    
095            String[] badDefaultProps = { PropertiesUtils.HADOOP_USER};
096            PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES);
097            PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES);
098        }
099    
100        /**
101         * Constructor to create the bundle submit command.
102         *
103         * @param conf configuration for bundle job
104         */
105        public BundleSubmitXCommand(Configuration conf) {
106            super("bundle_submit", "bundle_submit", 1);
107            this.conf = ParamChecker.notNull(conf, "conf");
108        }
109    
110        /**
111         * Constructor to create the bundle submit command.
112         *
113         * @param dryrun true if dryrun is enable
114         * @param conf configuration for bundle job
115         */
116        public BundleSubmitXCommand(boolean dryrun, Configuration conf) {
117            this(conf);
118            this.dryrun = dryrun;
119        }
120    
121        /* (non-Javadoc)
122         * @see org.apache.oozie.command.SubmitTransitionXCommand#submit()
123         */
124        @Override
125        protected String submit() throws CommandException {
126            LOG.info("STARTED Bundle Submit");
127            try {
128                InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
129    
130                ParameterVerifier.verifyParameters(conf, XmlUtils.parseXml(bundleBean.getOrigJobXml()));
131                
132                String jobXmlWithNoComment = XmlUtils.removeComments(this.bundleBean.getOrigJobXml().toString());
133                // Resolving all variables in the job properties.
134                // This ensures the Hadoop Configuration semantics is preserved.
135                XConfiguration resolvedVarsConf = new XConfiguration();
136                for (Map.Entry<String, String> entry : conf) {
137                    resolvedVarsConf.set(entry.getKey(), conf.get(entry.getKey()));
138                }
139                conf = resolvedVarsConf;
140    
141                String resolvedJobXml = resolvedVars(jobXmlWithNoComment, conf);
142    
143                //verify the uniqueness of coord names
144                verifyCoordNameUnique(resolvedJobXml);
145                this.jobId = storeToDB(bundleBean, resolvedJobXml);
146                LogUtils.setLogInfo(bundleBean, logInfo);
147    
148                if (dryrun) {
149                    Date startTime = bundleBean.getStartTime();
150                    long startTimeMilli = startTime.getTime();
151                    long endTimeMilli = startTimeMilli + (3600 * 1000);
152                    Date jobEndTime = bundleBean.getEndTime();
153                    Date endTime = new Date(endTimeMilli);
154                    if (endTime.compareTo(jobEndTime) > 0) {
155                        endTime = jobEndTime;
156                    }
157                    jobId = bundleBean.getId();
158                    LOG.info("[" + jobId + "]: Update status to PREP");
159                    bundleBean.setStatus(Job.Status.PREP);
160                    try {
161                        new XConfiguration(new StringReader(bundleBean.getConf()));
162                    }
163                    catch (IOException e1) {
164                        LOG.warn("Configuration parse error. read from DB :" + bundleBean.getConf(), e1);
165                    }
166                    String output = bundleBean.getJobXml() + System.getProperty("line.separator");
167                    return output;
168                }
169                else {
170                    if (bundleBean.getKickoffTime() == null) {
171                        // If there is no KickOffTime, default kickoff is NOW.
172                        LOG.debug("Since kickoff time is not defined for job id " + jobId
173                                + ". Queuing and BundleStartXCommand immediately after submission");
174                        queue(new BundleStartXCommand(jobId));
175                    }
176                }
177            }
178            catch (Exception ex) {
179                throw new CommandException(ErrorCode.E1310, ex.getMessage(), ex);
180            }
181            LOG.info("ENDED Bundle Submit");
182            return this.jobId;
183        }
184    
185        /* (non-Javadoc)
186         * @see org.apache.oozie.command.TransitionXCommand#notifyParent()
187         */
188        @Override
189        public void notifyParent() throws CommandException {
190        }
191    
192        /* (non-Javadoc)
193         * @see org.apache.oozie.command.XCommand#getEntityKey()
194         */
195        @Override
196        public String getEntityKey() {
197            return null;
198        }
199    
200        /* (non-Javadoc)
201         * @see org.apache.oozie.command.XCommand#isLockRequired()
202         */
203        @Override
204        protected boolean isLockRequired() {
205            return false;
206        }
207    
208        /* (non-Javadoc)
209         * @see org.apache.oozie.command.XCommand#loadState()
210         */
211        @Override
212        protected void loadState() throws CommandException {
213        }
214    
215        /* (non-Javadoc)
216         * @see org.apache.oozie.command.XCommand#verifyPrecondition()
217         */
218        @Override
219        protected void verifyPrecondition() throws CommandException, PreconditionException {
220        }
221    
222        /* (non-Javadoc)
223         * @see org.apache.oozie.command.XCommand#eagerLoadState()
224         */
225        @Override
226        protected void eagerLoadState() throws CommandException {
227            super.eagerLoadState();
228            jpaService = Services.get().get(JPAService.class);
229            if (jpaService == null) {
230                throw new CommandException(ErrorCode.E0610);
231            }
232        }
233    
234        /* (non-Javadoc)
235         * @see org.apache.oozie.command.XCommand#eagerVerifyPrecondition()
236         */
237        @Override
238        protected void eagerVerifyPrecondition() throws CommandException, PreconditionException {
239            try {
240                super.eagerVerifyPrecondition();
241                mergeDefaultConfig();
242                String appXml = readAndValidateXml();
243                bundleBean.setOrigJobXml(appXml);
244                LOG.debug("jobXml after initial validation " + XmlUtils.prettyPrint(appXml).toString());
245            }
246            catch (BundleJobException ex) {
247                LOG.warn("BundleJobException:  ", ex);
248                throw new CommandException(ex);
249            }
250            catch (IllegalArgumentException iex) {
251                LOG.warn("IllegalArgumentException:  ", iex);
252                throw new CommandException(ErrorCode.E1310, iex.getMessage(), iex);
253            }
254            catch (Exception ex) {
255                LOG.warn("Exception:  ", ex);
256                throw new CommandException(ErrorCode.E1310, ex.getMessage(), ex);
257            }
258        }
259    
260        /**
261         * Merge default configuration with user-defined configuration.
262         *
263         * @throws CommandException thrown if failed to merge configuration
264         */
265        protected void mergeDefaultConfig() throws CommandException {
266            Path configDefault = null;
267            try {
268                String bundleAppPathStr = conf.get(OozieClient.BUNDLE_APP_PATH);
269                Path bundleAppPath = new Path(bundleAppPathStr);
270                String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
271                HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
272                Configuration fsConf = has.createJobConf(bundleAppPath.toUri().getAuthority());
273                FileSystem fs = has.createFileSystem(user, bundleAppPath.toUri(), fsConf);
274    
275                // app path could be a directory
276                if (!fs.isFile(bundleAppPath)) {
277                    configDefault = new Path(bundleAppPath, CONFIG_DEFAULT);
278                } else {
279                    configDefault = new Path(bundleAppPath.getParent(), CONFIG_DEFAULT);
280                }
281    
282                if (fs.exists(configDefault)) {
283                    Configuration defaultConf = new XConfiguration(fs.open(configDefault));
284                    PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES);
285                    XConfiguration.injectDefaults(defaultConf, conf);
286                }
287                else {
288                    LOG.info("configDefault Doesn't exist " + configDefault);
289                }
290                PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES);
291            }
292            catch (IOException e) {
293                throw new CommandException(ErrorCode.E0702, e.getMessage() + " : Problem reading default config "
294                        + configDefault, e);
295            }
296            catch (HadoopAccessorException e) {
297                throw new CommandException(e);
298            }
299            LOG.debug("Merged CONF :" + XmlUtils.prettyPrint(conf).toString());
300        }
301    
302        /**
303         * Read the application XML and validate against bundle Schema
304         *
305         * @return validated bundle XML
306         * @throws BundleJobException thrown if failed to read or validate xml
307         */
308        private String readAndValidateXml() throws BundleJobException {
309            String appPath = ParamChecker.notEmpty(conf.get(OozieClient.BUNDLE_APP_PATH), OozieClient.BUNDLE_APP_PATH);
310            String bundleXml = readDefinition(appPath);
311            validateXml(bundleXml);
312            return bundleXml;
313        }
314    
315        /**
316         * Read bundle definition.
317         *
318         * @param appPath application path.
319         * @param user user name.
320         * @param group group name.
321         * @return bundle definition.
322         * @throws BundleJobException thrown if the definition could not be read.
323         */
324        protected String readDefinition(String appPath) throws BundleJobException {
325            String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
326            //Configuration confHadoop = CoordUtils.getHadoopConf(conf);
327            try {
328                URI uri = new URI(appPath);
329                LOG.debug("user =" + user);
330                HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
331                Configuration fsConf = has.createJobConf(uri.getAuthority());
332                FileSystem fs = has.createFileSystem(user, uri, fsConf);
333                Path appDefPath = null;
334    
335                // app path could be a directory
336                Path path = new Path(uri.getPath());
337                if (!fs.isFile(path)) {
338                    appDefPath = new Path(path, BUNDLE_XML_FILE);
339                } else {
340                    appDefPath = path;
341                }
342    
343                Reader reader = new InputStreamReader(fs.open(appDefPath));
344                StringWriter writer = new StringWriter();
345                IOUtils.copyCharStream(reader, writer);
346                return writer.toString();
347            }
348            catch (IOException ex) {
349                LOG.warn("IOException :" + XmlUtils.prettyPrint(conf), ex);
350                throw new BundleJobException(ErrorCode.E1301, ex.getMessage(), ex);
351            }
352            catch (URISyntaxException ex) {
353                LOG.warn("URISyException :" + ex.getMessage());
354                throw new BundleJobException(ErrorCode.E1302, appPath, ex.getMessage(), ex);
355            }
356            catch (HadoopAccessorException ex) {
357                throw new BundleJobException(ex);
358            }
359            catch (Exception ex) {
360                LOG.warn("Exception :", ex);
361                throw new BundleJobException(ErrorCode.E1301, ex.getMessage(), ex);
362            }
363        }
364    
365        /**
366         * Validate against Bundle XSD file
367         *
368         * @param xmlContent input bundle xml
369         * @throws BundleJobException thrown if failed to validate xml
370         */
371        private void validateXml(String xmlContent) throws BundleJobException {
372            javax.xml.validation.Schema schema = Services.get().get(SchemaService.class).getSchema(SchemaName.BUNDLE);
373            Validator validator = schema.newValidator();
374            try {
375                validator.validate(new StreamSource(new StringReader(xmlContent)));
376            }
377            catch (SAXException ex) {
378                LOG.warn("SAXException :", ex);
379                throw new BundleJobException(ErrorCode.E0701, ex.getMessage(), ex);
380            }
381            catch (IOException ex) {
382                LOG.warn("IOException :", ex);
383                throw new BundleJobException(ErrorCode.E0702, ex.getMessage(), ex);
384            }
385        }
386    
387        /**
388         * Write a Bundle Job into database
389         *
390         * @param Bundle job bean
391         * @return job id
392         * @throws CommandException thrown if failed to store bundle job bean to db
393         */
394        private String storeToDB(BundleJobBean bundleJob, String resolvedJobXml) throws CommandException {
395            try {
396                jobId = Services.get().get(UUIDService.class).generateId(ApplicationType.BUNDLE);
397    
398                bundleJob.setId(jobId);
399                String name = XmlUtils.parseXml(bundleBean.getOrigJobXml()).getAttributeValue("name");
400                name = ELUtils.resolveAppName(name, conf);
401                bundleJob.setAppName(name);
402                bundleJob.setAppPath(conf.get(OozieClient.BUNDLE_APP_PATH));
403                // bundleJob.setStatus(BundleJob.Status.PREP); //This should be set in parent class.
404                bundleJob.setCreatedTime(new Date());
405                bundleJob.setUser(conf.get(OozieClient.USER_NAME));
406                String group = ConfigUtils.getWithDeprecatedCheck(conf, OozieClient.JOB_ACL, OozieClient.GROUP_NAME, null);
407                bundleJob.setGroup(group);
408                bundleJob.setConf(XmlUtils.prettyPrint(conf).toString());
409                bundleJob.setJobXml(resolvedJobXml);
410                Element jobElement = XmlUtils.parseXml(resolvedJobXml);
411                Element controlsElement = jobElement.getChild("controls", jobElement.getNamespace());
412                if (controlsElement != null) {
413                    Element kickoffTimeElement = controlsElement.getChild("kick-off-time", jobElement.getNamespace());
414                    if (kickoffTimeElement != null && !kickoffTimeElement.getValue().isEmpty()) {
415                        Date kickoffTime = DateUtils.parseDateOozieTZ(kickoffTimeElement.getValue());
416                        bundleJob.setKickoffTime(kickoffTime);
417                    }
418                }
419                bundleJob.setLastModifiedTime(new Date());
420    
421                if (!dryrun) {
422                    jpaService.execute(new BundleJobInsertJPAExecutor(bundleJob));
423                }
424            }
425            catch (Exception ex) {
426                throw new CommandException(ErrorCode.E1301, ex.getMessage(), ex);
427            }
428            return jobId;
429        }
430    
431        /* (non-Javadoc)
432         * @see org.apache.oozie.command.TransitionXCommand#getJob()
433         */
434        @Override
435        public Job getJob() {
436            return bundleBean;
437        }
438    
439        /**
440         * Resolve job xml with conf
441         *
442         * @param bundleXml bundle job xml
443         * @param conf job configuration
444         * @return resolved job xml
445         * @throws BundleJobException thrown if failed to resolve variables
446         */
447        private String resolvedVars(String bundleXml, Configuration conf) throws BundleJobException {
448            try {
449                ELEvaluator eval = createEvaluator(conf);
450                return eval.evaluate(bundleXml, String.class);
451            }
452            catch (Exception e) {
453                throw new BundleJobException(ErrorCode.E1004, e.getMessage(), e);
454            }
455        }
456    
457        /**
458         * Create ELEvaluator
459         *
460         * @param conf job configuration
461         * @return ELEvaluator the evaluator for el function
462         * @throws BundleJobException thrown if failed to create evaluator
463         */
464        public ELEvaluator createEvaluator(Configuration conf) throws BundleJobException {
465            ELEvaluator eval;
466            ELEvaluator.Context context;
467            try {
468                context = new ELEvaluator.Context();
469                eval = new ELEvaluator(context);
470                for (Map.Entry<String, String> entry : conf) {
471                    eval.setVariable(entry.getKey(), entry.getValue());
472                }
473            }
474            catch (Exception e) {
475                throw new BundleJobException(ErrorCode.E1004, e.getMessage(), e);
476            }
477            return eval;
478        }
479    
480        /**
481         * Verify the uniqueness of coordinator names
482         *
483         * @param resolved job xml
484         * @throws CommandException thrown if failed to verify the uniqueness of coordinator names
485         */
486        @SuppressWarnings("unchecked")
487        private Void verifyCoordNameUnique(String resolvedJobXml) throws CommandException {
488            Set<String> set = new HashSet<String>();
489            try {
490                Element bAppXml = XmlUtils.parseXml(resolvedJobXml);
491                List<Element> coordElems = bAppXml.getChildren("coordinator", bAppXml.getNamespace());
492                for (Element elem : coordElems) {
493                    Attribute name = elem.getAttribute("name");
494                    if (name != null) {
495                        if (set.contains(name.getValue())) {
496                            throw new CommandException(ErrorCode.E1304, name);
497                        }
498                        set.add(name.getValue());
499                    }
500                    else {
501                        throw new CommandException(ErrorCode.E1305);
502                    }
503                }
504            }
505            catch (JDOMException jex) {
506                throw new CommandException(ErrorCode.E1301, jex.getMessage(), jex);
507            }
508    
509            return null;
510        }
511    
512        /* (non-Javadoc)
513         * @see org.apache.oozie.command.TransitionXCommand#updateJob()
514         */
515        @Override
516        public void updateJob() throws CommandException {
517        }
518    
519        @Override
520        public void performWrites() throws CommandException {
521        }
522    }