001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.bundle;
019    
020    import java.io.IOException;
021    import java.io.InputStreamReader;
022    import java.io.Reader;
023    import java.io.StringReader;
024    import java.io.StringWriter;
025    import java.net.URI;
026    import java.net.URISyntaxException;
027    import java.util.Date;
028    import java.util.HashSet;
029    import java.util.List;
030    import java.util.Map;
031    import java.util.Set;
032    
033    import javax.xml.transform.stream.StreamSource;
034    import javax.xml.validation.Validator;
035    
036    import org.apache.hadoop.conf.Configuration;
037    import org.apache.hadoop.fs.FileSystem;
038    import org.apache.hadoop.fs.Path;
039    import org.apache.oozie.BundleJobBean;
040    import org.apache.oozie.ErrorCode;
041    import org.apache.oozie.client.Job;
042    import org.apache.oozie.client.OozieClient;
043    import org.apache.oozie.command.CommandException;
044    import org.apache.oozie.command.PreconditionException;
045    import org.apache.oozie.command.SubmitTransitionXCommand;
046    import org.apache.oozie.executor.jpa.BundleJobInsertJPAExecutor;
047    import org.apache.oozie.service.HadoopAccessorException;
048    import org.apache.oozie.service.HadoopAccessorService;
049    import org.apache.oozie.service.JPAService;
050    import org.apache.oozie.service.SchemaService;
051    import org.apache.oozie.service.Services;
052    import org.apache.oozie.service.UUIDService;
053    import org.apache.oozie.service.SchemaService.SchemaName;
054    import org.apache.oozie.service.UUIDService.ApplicationType;
055    import org.apache.oozie.util.ConfigUtils;
056    import org.apache.oozie.util.DateUtils;
057    import org.apache.oozie.util.ELEvaluator;
058    import org.apache.oozie.util.ELUtils;
059    import org.apache.oozie.util.IOUtils;
060    import org.apache.oozie.util.InstrumentUtils;
061    import org.apache.oozie.util.LogUtils;
062    import org.apache.oozie.util.ParamChecker;
063    import org.apache.oozie.util.PropertiesUtils;
064    import org.apache.oozie.util.XConfiguration;
065    import org.apache.oozie.util.XmlUtils;
066    import org.apache.oozie.util.ParameterVerifier;
067    import org.jdom.Attribute;
068    import org.jdom.Element;
069    import org.jdom.JDOMException;
070    import org.xml.sax.SAXException;
071    
072    /**
073     * This Command will submit the bundle.
074     */
075    public class BundleSubmitXCommand extends SubmitTransitionXCommand {
076    
077        private Configuration conf;
078        private final String authToken;
079        public static final String CONFIG_DEFAULT = "bundle-config-default.xml";
080        public static final String BUNDLE_XML_FILE = "bundle.xml";
081        private final BundleJobBean bundleBean = new BundleJobBean();
082        private String jobId;
083        private JPAService jpaService = null;
084    
085        private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>();
086        private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>();
087    
088        static {
089            String[] badUserProps = { PropertiesUtils.YEAR, PropertiesUtils.MONTH, PropertiesUtils.DAY,
090                    PropertiesUtils.HOUR, PropertiesUtils.MINUTE, PropertiesUtils.DAYS, PropertiesUtils.HOURS,
091                    PropertiesUtils.MINUTES, PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB,
092                    PropertiesUtils.TB, PropertiesUtils.PB, PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN,
093                    PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN, PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS };
094            PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES);
095    
096            String[] badDefaultProps = { PropertiesUtils.HADOOP_USER};
097            PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES);
098            PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES);
099        }
100    
101        /**
102         * Constructor to create the bundle submit command.
103         *
104         * @param conf configuration for bundle job
105         * @param authToken to be used for authentication
106         */
107        public BundleSubmitXCommand(Configuration conf, String authToken) {
108            super("bundle_submit", "bundle_submit", 1);
109            this.conf = ParamChecker.notNull(conf, "conf");
110            this.authToken = ParamChecker.notEmpty(authToken, "authToken");
111        }
112    
113        /**
114         * Constructor to create the bundle submit command.
115         *
116         * @param dryrun true if dryrun is enable
117         * @param conf configuration for bundle job
118         * @param authToken to be used for authentication
119         */
120        public BundleSubmitXCommand(boolean dryrun, Configuration conf, String authToken) {
121            this(conf, authToken);
122            this.dryrun = dryrun;
123        }
124    
125        /* (non-Javadoc)
126         * @see org.apache.oozie.command.SubmitTransitionXCommand#submit()
127         */
128        @Override
129        protected String submit() throws CommandException {
130            LOG.info("STARTED Bundle Submit");
131            try {
132                InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
133    
134                ParameterVerifier.verifyParameters(conf, XmlUtils.parseXml(bundleBean.getOrigJobXml()));
135                
136                XmlUtils.removeComments(this.bundleBean.getOrigJobXml().toString());
137                // Resolving all variables in the job properties.
138                // This ensures the Hadoop Configuration semantics is preserved.
139                XConfiguration resolvedVarsConf = new XConfiguration();
140                for (Map.Entry<String, String> entry : conf) {
141                    resolvedVarsConf.set(entry.getKey(), conf.get(entry.getKey()));
142                }
143                conf = resolvedVarsConf;
144    
145                String resolvedJobXml = resolvedVars(bundleBean.getOrigJobXml(), conf);
146    
147                //verify the uniqueness of coord names
148                verifyCoordNameUnique(resolvedJobXml);
149                this.jobId = storeToDB(bundleBean, resolvedJobXml);
150                LogUtils.setLogInfo(bundleBean, logInfo);
151    
152                if (dryrun) {
153                    Date startTime = bundleBean.getStartTime();
154                    long startTimeMilli = startTime.getTime();
155                    long endTimeMilli = startTimeMilli + (3600 * 1000);
156                    Date jobEndTime = bundleBean.getEndTime();
157                    Date endTime = new Date(endTimeMilli);
158                    if (endTime.compareTo(jobEndTime) > 0) {
159                        endTime = jobEndTime;
160                    }
161                    jobId = bundleBean.getId();
162                    LOG.info("[" + jobId + "]: Update status to PREP");
163                    bundleBean.setStatus(Job.Status.PREP);
164                    try {
165                        new XConfiguration(new StringReader(bundleBean.getConf()));
166                    }
167                    catch (IOException e1) {
168                        LOG.warn("Configuration parse error. read from DB :" + bundleBean.getConf(), e1);
169                    }
170                    String output = bundleBean.getJobXml() + System.getProperty("line.separator");
171                    return output;
172                }
173                else {
174                    if (bundleBean.getKickoffTime() == null) {
175                        // If there is no KickOffTime, default kickoff is NOW.
176                        LOG.debug("Since kickoff time is not defined for job id " + jobId
177                                + ". Queuing and BundleStartXCommand immediately after submission");
178                        queue(new BundleStartXCommand(jobId));
179                    }
180                }
181            }
182            catch (Exception ex) {
183                throw new CommandException(ErrorCode.E1310, ex.getMessage(), ex);
184            }
185            LOG.info("ENDED Bundle Submit");
186            return this.jobId;
187        }
188    
189        /* (non-Javadoc)
190         * @see org.apache.oozie.command.TransitionXCommand#notifyParent()
191         */
192        @Override
193        public void notifyParent() throws CommandException {
194        }
195    
196        /* (non-Javadoc)
197         * @see org.apache.oozie.command.XCommand#getEntityKey()
198         */
199        @Override
200        public String getEntityKey() {
201            return null;
202        }
203    
204        /* (non-Javadoc)
205         * @see org.apache.oozie.command.XCommand#isLockRequired()
206         */
207        @Override
208        protected boolean isLockRequired() {
209            return false;
210        }
211    
212        /* (non-Javadoc)
213         * @see org.apache.oozie.command.XCommand#loadState()
214         */
215        @Override
216        protected void loadState() throws CommandException {
217        }
218    
219        /* (non-Javadoc)
220         * @see org.apache.oozie.command.XCommand#verifyPrecondition()
221         */
222        @Override
223        protected void verifyPrecondition() throws CommandException, PreconditionException {
224        }
225    
226        /* (non-Javadoc)
227         * @see org.apache.oozie.command.XCommand#eagerLoadState()
228         */
229        @Override
230        protected void eagerLoadState() throws CommandException {
231            super.eagerLoadState();
232            jpaService = Services.get().get(JPAService.class);
233            if (jpaService == null) {
234                throw new CommandException(ErrorCode.E0610);
235            }
236        }
237    
238        /* (non-Javadoc)
239         * @see org.apache.oozie.command.XCommand#eagerVerifyPrecondition()
240         */
241        @Override
242        protected void eagerVerifyPrecondition() throws CommandException, PreconditionException {
243            try {
244                super.eagerVerifyPrecondition();
245                mergeDefaultConfig();
246                String appXml = readAndValidateXml();
247                bundleBean.setOrigJobXml(appXml);
248                LOG.debug("jobXml after initial validation " + XmlUtils.prettyPrint(appXml).toString());
249            }
250            catch (BundleJobException ex) {
251                LOG.warn("BundleJobException:  ", ex);
252                throw new CommandException(ex);
253            }
254            catch (IllegalArgumentException iex) {
255                LOG.warn("IllegalArgumentException:  ", iex);
256                throw new CommandException(ErrorCode.E1310, iex);
257            }
258            catch (Exception ex) {
259                LOG.warn("Exception:  ", ex);
260                throw new CommandException(ErrorCode.E1310, ex);
261            }
262        }
263    
264        /**
265         * Merge default configuration with user-defined configuration.
266         *
267         * @throws CommandException thrown if failed to merge configuration
268         */
269        protected void mergeDefaultConfig() throws CommandException {
270            Path configDefault = null;
271            try {
272                String bundleAppPathStr = conf.get(OozieClient.BUNDLE_APP_PATH);
273                Path bundleAppPath = new Path(bundleAppPathStr);
274                String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
275                HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
276                Configuration fsConf = has.createJobConf(bundleAppPath.toUri().getAuthority());
277                FileSystem fs = has.createFileSystem(user, bundleAppPath.toUri(), fsConf);
278    
279                // app path could be a directory
280                if (!fs.isFile(bundleAppPath)) {
281                    configDefault = new Path(bundleAppPath, CONFIG_DEFAULT);
282                } else {
283                    configDefault = new Path(bundleAppPath.getParent(), CONFIG_DEFAULT);
284                }
285    
286                if (fs.exists(configDefault)) {
287                    Configuration defaultConf = new XConfiguration(fs.open(configDefault));
288                    PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES);
289                    XConfiguration.injectDefaults(defaultConf, conf);
290                }
291                else {
292                    LOG.info("configDefault Doesn't exist " + configDefault);
293                }
294                PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES);
295            }
296            catch (IOException e) {
297                throw new CommandException(ErrorCode.E0702, e.getMessage() + " : Problem reading default config "
298                        + configDefault, e);
299            }
300            catch (HadoopAccessorException e) {
301                throw new CommandException(e);
302            }
303            LOG.debug("Merged CONF :" + XmlUtils.prettyPrint(conf).toString());
304        }
305    
306        /**
307         * Read the application XML and validate against bundle Schema
308         *
309         * @return validated bundle XML
310         * @throws BundleJobException thrown if failed to read or validate xml
311         */
312        private String readAndValidateXml() throws BundleJobException {
313            String appPath = ParamChecker.notEmpty(conf.get(OozieClient.BUNDLE_APP_PATH), OozieClient.BUNDLE_APP_PATH);
314            String bundleXml = readDefinition(appPath);
315            validateXml(bundleXml);
316            return bundleXml;
317        }
318    
319        /**
320         * Read bundle definition.
321         *
322         * @param appPath application path.
323         * @param user user name.
324         * @param group group name.
325         * @param autToken authentication token.
326         * @return bundle definition.
327         * @throws BundleJobException thrown if the definition could not be read.
328         */
329        protected String readDefinition(String appPath) throws BundleJobException {
330            String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
331            //Configuration confHadoop = CoordUtils.getHadoopConf(conf);
332            try {
333                URI uri = new URI(appPath);
334                LOG.debug("user =" + user);
335                HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
336                Configuration fsConf = has.createJobConf(uri.getAuthority());
337                FileSystem fs = has.createFileSystem(user, uri, fsConf);
338                Path appDefPath = null;
339    
340                // app path could be a directory
341                Path path = new Path(uri.getPath());
342                if (!fs.isFile(path)) {
343                    appDefPath = new Path(path, BUNDLE_XML_FILE);
344                } else {
345                    appDefPath = path;
346                }
347    
348                Reader reader = new InputStreamReader(fs.open(appDefPath));
349                StringWriter writer = new StringWriter();
350                IOUtils.copyCharStream(reader, writer);
351                return writer.toString();
352            }
353            catch (IOException ex) {
354                LOG.warn("IOException :" + XmlUtils.prettyPrint(conf), ex);
355                throw new BundleJobException(ErrorCode.E1301, ex.getMessage(), ex);
356            }
357            catch (URISyntaxException ex) {
358                LOG.warn("URISyException :" + ex.getMessage());
359                throw new BundleJobException(ErrorCode.E1302, appPath, ex.getMessage(), ex);
360            }
361            catch (HadoopAccessorException ex) {
362                throw new BundleJobException(ex);
363            }
364            catch (Exception ex) {
365                LOG.warn("Exception :", ex);
366                throw new BundleJobException(ErrorCode.E1301, ex.getMessage(), ex);
367            }
368        }
369    
370        /**
371         * Validate against Bundle XSD file
372         *
373         * @param xmlContent input bundle xml
374         * @throws BundleJobException thrown if failed to validate xml
375         */
376        private void validateXml(String xmlContent) throws BundleJobException {
377            javax.xml.validation.Schema schema = Services.get().get(SchemaService.class).getSchema(SchemaName.BUNDLE);
378            Validator validator = schema.newValidator();
379            try {
380                validator.validate(new StreamSource(new StringReader(xmlContent)));
381            }
382            catch (SAXException ex) {
383                LOG.warn("SAXException :", ex);
384                throw new BundleJobException(ErrorCode.E0701, ex.getMessage(), ex);
385            }
386            catch (IOException ex) {
387                LOG.warn("IOException :", ex);
388                throw new BundleJobException(ErrorCode.E0702, ex.getMessage(), ex);
389            }
390        }
391    
392        /**
393         * Write a Bundle Job into database
394         *
395         * @param Bundle job bean
396         * @return job id
397         * @throws CommandException thrown if failed to store bundle job bean to db
398         */
399        private String storeToDB(BundleJobBean bundleJob, String resolvedJobXml) throws CommandException {
400            try {
401                jobId = Services.get().get(UUIDService.class).generateId(ApplicationType.BUNDLE);
402    
403                bundleJob.setId(jobId);
404                bundleJob.setAuthToken(this.authToken);
405                String name = XmlUtils.parseXml(bundleBean.getOrigJobXml()).getAttributeValue("name");
406                name = ELUtils.resolveAppName(name, conf);
407                bundleJob.setAppName(name);
408                bundleJob.setAppPath(conf.get(OozieClient.BUNDLE_APP_PATH));
409                // bundleJob.setStatus(BundleJob.Status.PREP); //This should be set in parent class.
410                bundleJob.setCreatedTime(new Date());
411                bundleJob.setUser(conf.get(OozieClient.USER_NAME));
412                String group = ConfigUtils.getWithDeprecatedCheck(conf, OozieClient.JOB_ACL, OozieClient.GROUP_NAME, null);
413                bundleJob.setGroup(group);
414                bundleJob.setConf(XmlUtils.prettyPrint(conf).toString());
415                bundleJob.setJobXml(resolvedJobXml);
416                Element jobElement = XmlUtils.parseXml(resolvedJobXml);
417                Element controlsElement = jobElement.getChild("controls", jobElement.getNamespace());
418                if (controlsElement != null) {
419                    Element kickoffTimeElement = controlsElement.getChild("kick-off-time", jobElement.getNamespace());
420                    if (kickoffTimeElement != null && !kickoffTimeElement.getValue().isEmpty()) {
421                        Date kickoffTime = DateUtils.parseDateOozieTZ(kickoffTimeElement.getValue());
422                        bundleJob.setKickoffTime(kickoffTime);
423                    }
424                }
425                bundleJob.setLastModifiedTime(new Date());
426    
427                if (!dryrun) {
428                    jpaService.execute(new BundleJobInsertJPAExecutor(bundleJob));
429                }
430            }
431            catch (Exception ex) {
432                throw new CommandException(ErrorCode.E1301, ex.getMessage(), ex);
433            }
434            return jobId;
435        }
436    
437        /* (non-Javadoc)
438         * @see org.apache.oozie.command.TransitionXCommand#getJob()
439         */
440        @Override
441        public Job getJob() {
442            return bundleBean;
443        }
444    
445        /**
446         * Resolve job xml with conf
447         *
448         * @param bundleXml bundle job xml
449         * @param conf job configuration
450         * @return resolved job xml
451         * @throws BundleJobException thrown if failed to resolve variables
452         */
453        private String resolvedVars(String bundleXml, Configuration conf) throws BundleJobException {
454            try {
455                ELEvaluator eval = createEvaluator(conf);
456                return eval.evaluate(bundleXml, String.class);
457            }
458            catch (Exception e) {
459                throw new BundleJobException(ErrorCode.E1004, e.getMessage(), e);
460            }
461        }
462    
463        /**
464         * Create ELEvaluator
465         *
466         * @param conf job configuration
467         * @return ELEvaluator the evaluator for el function
468         * @throws BundleJobException thrown if failed to create evaluator
469         */
470        public ELEvaluator createEvaluator(Configuration conf) throws BundleJobException {
471            ELEvaluator eval;
472            ELEvaluator.Context context;
473            try {
474                context = new ELEvaluator.Context();
475                eval = new ELEvaluator(context);
476                for (Map.Entry<String, String> entry : conf) {
477                    eval.setVariable(entry.getKey(), entry.getValue());
478                }
479            }
480            catch (Exception e) {
481                throw new BundleJobException(ErrorCode.E1004, e.getMessage(), e);
482            }
483            return eval;
484        }
485    
486        /**
487         * Verify the uniqueness of coordinator names
488         *
489         * @param resolved job xml
490         * @throws CommandException thrown if failed to verify the uniqueness of coordinator names
491         */
492        @SuppressWarnings("unchecked")
493        private Void verifyCoordNameUnique(String resolvedJobXml) throws CommandException {
494            Set<String> set = new HashSet<String>();
495            try {
496                Element bAppXml = XmlUtils.parseXml(resolvedJobXml);
497                List<Element> coordElems = bAppXml.getChildren("coordinator", bAppXml.getNamespace());
498                for (Element elem : coordElems) {
499                    Attribute name = elem.getAttribute("name");
500                    if (name != null) {
501                        if (set.contains(name.getValue())) {
502                            throw new CommandException(ErrorCode.E1304, name);
503                        }
504                        set.add(name.getValue());
505                    }
506                    else {
507                        throw new CommandException(ErrorCode.E1305);
508                    }
509                }
510            }
511            catch (JDOMException jex) {
512                throw new CommandException(ErrorCode.E1301, jex);
513            }
514    
515            return null;
516        }
517    
518        /* (non-Javadoc)
519         * @see org.apache.oozie.command.TransitionXCommand#updateJob()
520         */
521        @Override
522        public void updateJob() throws CommandException {
523        }
524    
525        @Override
526        public void performWrites() throws CommandException {
527        }
528    }