001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.bundle;
019    
020    import java.io.IOException;
021    import java.io.InputStreamReader;
022    import java.io.Reader;
023    import java.io.StringReader;
024    import java.io.StringWriter;
025    import java.net.URI;
026    import java.net.URISyntaxException;
027    import java.util.Date;
028    import java.util.HashSet;
029    import java.util.List;
030    import java.util.Map;
031    import java.util.Set;
032    
033    import javax.xml.transform.stream.StreamSource;
034    import javax.xml.validation.Validator;
035    
036    import org.apache.hadoop.conf.Configuration;
037    import org.apache.hadoop.fs.FileSystem;
038    import org.apache.hadoop.fs.Path;
039    import org.apache.oozie.BundleJobBean;
040    import org.apache.oozie.ErrorCode;
041    import org.apache.oozie.client.Job;
042    import org.apache.oozie.client.OozieClient;
043    import org.apache.oozie.command.CommandException;
044    import org.apache.oozie.command.PreconditionException;
045    import org.apache.oozie.command.SubmitTransitionXCommand;
046    import org.apache.oozie.executor.jpa.BundleJobInsertJPAExecutor;
047    import org.apache.oozie.service.HadoopAccessorException;
048    import org.apache.oozie.service.HadoopAccessorService;
049    import org.apache.oozie.service.JPAService;
050    import org.apache.oozie.service.SchemaService;
051    import org.apache.oozie.service.Services;
052    import org.apache.oozie.service.UUIDService;
053    import org.apache.oozie.service.WorkflowAppService;
054    import org.apache.oozie.service.SchemaService.SchemaName;
055    import org.apache.oozie.service.UUIDService.ApplicationType;
056    import org.apache.oozie.util.ConfigUtils;
057    import org.apache.oozie.util.DateUtils;
058    import org.apache.oozie.util.ELEvaluator;
059    import org.apache.oozie.util.IOUtils;
060    import org.apache.oozie.util.InstrumentUtils;
061    import org.apache.oozie.util.LogUtils;
062    import org.apache.oozie.util.ParamChecker;
063    import org.apache.oozie.util.PropertiesUtils;
064    import org.apache.oozie.util.XConfiguration;
065    import org.apache.oozie.util.XmlUtils;
066    import org.jdom.Attribute;
067    import org.jdom.Element;
068    import org.jdom.JDOMException;
069    import org.xml.sax.SAXException;
070    
071    /**
072     * This Command will submit the bundle.
073     */
074    public class BundleSubmitXCommand extends SubmitTransitionXCommand {
075    
076        private Configuration conf;
077        private final String authToken;
078        public static final String CONFIG_DEFAULT = "bundle-config-default.xml";
079        public static final String BUNDLE_XML_FILE = "bundle.xml";
080        private final BundleJobBean bundleBean = new BundleJobBean();
081        private String jobId;
082        private JPAService jpaService = null;
083    
084        private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>();
085        private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>();
086    
087        static {
088            String[] badUserProps = { PropertiesUtils.YEAR, PropertiesUtils.MONTH, PropertiesUtils.DAY,
089                    PropertiesUtils.HOUR, PropertiesUtils.MINUTE, PropertiesUtils.DAYS, PropertiesUtils.HOURS,
090                    PropertiesUtils.MINUTES, PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB,
091                    PropertiesUtils.TB, PropertiesUtils.PB, PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN,
092                    PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN, PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS };
093            PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES);
094    
095            String[] badDefaultProps = { PropertiesUtils.HADOOP_USER};
096            PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES);
097            PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES);
098        }
099    
100        /**
101         * Constructor to create the bundle submit command.
102         *
103         * @param conf configuration for bundle job
104         * @param authToken to be used for authentication
105         */
106        public BundleSubmitXCommand(Configuration conf, String authToken) {
107            super("bundle_submit", "bundle_submit", 1);
108            this.conf = ParamChecker.notNull(conf, "conf");
109            this.authToken = ParamChecker.notEmpty(authToken, "authToken");
110        }
111    
112        /**
113         * Constructor to create the bundle submit command.
114         *
115         * @param dryrun true if dryrun is enable
116         * @param conf configuration for bundle job
117         * @param authToken to be used for authentication
118         */
119        public BundleSubmitXCommand(boolean dryrun, Configuration conf, String authToken) {
120            this(conf, authToken);
121            this.dryrun = dryrun;
122        }
123    
124        /* (non-Javadoc)
125         * @see org.apache.oozie.command.SubmitTransitionXCommand#submit()
126         */
127        @Override
128        protected String submit() throws CommandException {
129            LOG.info("STARTED Bundle Submit");
130            try {
131                InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
132    
133                XmlUtils.removeComments(this.bundleBean.getOrigJobXml().toString());
134                // Resolving all variables in the job properties.
135                // This ensures the Hadoop Configuration semantics is preserved.
136                XConfiguration resolvedVarsConf = new XConfiguration();
137                for (Map.Entry<String, String> entry : conf) {
138                    resolvedVarsConf.set(entry.getKey(), conf.get(entry.getKey()));
139                }
140                conf = resolvedVarsConf;
141    
142                String resolvedJobXml = resolvedVars(bundleBean.getOrigJobXml(), conf);
143    
144                //verify the uniqueness of coord names
145                verifyCoordNameUnique(resolvedJobXml);
146                this.jobId = storeToDB(bundleBean, resolvedJobXml);
147                LogUtils.setLogInfo(bundleBean, logInfo);
148    
149                if (dryrun) {
150                    Date startTime = bundleBean.getStartTime();
151                    long startTimeMilli = startTime.getTime();
152                    long endTimeMilli = startTimeMilli + (3600 * 1000);
153                    Date jobEndTime = bundleBean.getEndTime();
154                    Date endTime = new Date(endTimeMilli);
155                    if (endTime.compareTo(jobEndTime) > 0) {
156                        endTime = jobEndTime;
157                    }
158                    jobId = bundleBean.getId();
159                    LOG.info("[" + jobId + "]: Update status to PREP");
160                    bundleBean.setStatus(Job.Status.PREP);
161                    try {
162                        new XConfiguration(new StringReader(bundleBean.getConf()));
163                    }
164                    catch (IOException e1) {
165                        LOG.warn("Configuration parse error. read from DB :" + bundleBean.getConf(), e1);
166                    }
167                    String output = bundleBean.getJobXml() + System.getProperty("line.separator");
168                    return output;
169                }
170                else {
171                    if (bundleBean.getKickoffTime() == null) {
172                        // If there is no KickOffTime, default kickoff is NOW.
173                        LOG.debug("Since kickoff time is not defined for job id " + jobId
174                                + ". Queuing and BundleStartXCommand immediately after submission");
175                        queue(new BundleStartXCommand(jobId));
176                    }
177                }
178            }
179            catch (Exception ex) {
180                throw new CommandException(ErrorCode.E1310, ex.getMessage(), ex);
181            }
182            LOG.info("ENDED Bundle Submit");
183            return this.jobId;
184        }
185    
186        /* (non-Javadoc)
187         * @see org.apache.oozie.command.TransitionXCommand#notifyParent()
188         */
189        @Override
190        public void notifyParent() throws CommandException {
191        }
192    
193        /* (non-Javadoc)
194         * @see org.apache.oozie.command.XCommand#getEntityKey()
195         */
196        @Override
197        public String getEntityKey() {
198            return null;
199        }
200    
201        /* (non-Javadoc)
202         * @see org.apache.oozie.command.XCommand#isLockRequired()
203         */
204        @Override
205        protected boolean isLockRequired() {
206            return false;
207        }
208    
209        /* (non-Javadoc)
210         * @see org.apache.oozie.command.XCommand#loadState()
211         */
212        @Override
213        protected void loadState() throws CommandException {
214        }
215    
216        /* (non-Javadoc)
217         * @see org.apache.oozie.command.XCommand#verifyPrecondition()
218         */
219        @Override
220        protected void verifyPrecondition() throws CommandException, PreconditionException {
221        }
222    
223        /* (non-Javadoc)
224         * @see org.apache.oozie.command.XCommand#eagerLoadState()
225         */
226        @Override
227        protected void eagerLoadState() throws CommandException {
228            super.eagerLoadState();
229            jpaService = Services.get().get(JPAService.class);
230            if (jpaService == null) {
231                throw new CommandException(ErrorCode.E0610);
232            }
233        }
234    
235        /* (non-Javadoc)
236         * @see org.apache.oozie.command.XCommand#eagerVerifyPrecondition()
237         */
238        @Override
239        protected void eagerVerifyPrecondition() throws CommandException, PreconditionException {
240            try {
241                super.eagerVerifyPrecondition();
242                mergeDefaultConfig();
243                String appXml = readAndValidateXml();
244                bundleBean.setOrigJobXml(appXml);
245                LOG.debug("jobXml after initial validation " + XmlUtils.prettyPrint(appXml).toString());
246            }
247            catch (BundleJobException ex) {
248                LOG.warn("BundleJobException:  ", ex);
249                throw new CommandException(ex);
250            }
251            catch (IllegalArgumentException iex) {
252                LOG.warn("IllegalArgumentException:  ", iex);
253                throw new CommandException(ErrorCode.E1310, iex);
254            }
255            catch (Exception ex) {
256                LOG.warn("Exception:  ", ex);
257                throw new CommandException(ErrorCode.E1310, ex);
258            }
259        }
260    
261        /**
262         * Merge default configuration with user-defined configuration.
263         *
264         * @throws CommandException thrown if failed to merge configuration
265         */
266        protected void mergeDefaultConfig() throws CommandException {
267            Path configDefault = null;
268            try {
269                String bundleAppPathStr = conf.get(OozieClient.BUNDLE_APP_PATH);
270                Path bundleAppPath = new Path(bundleAppPathStr);
271                String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
272                HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
273                Configuration fsConf = has.createJobConf(bundleAppPath.toUri().getAuthority());
274                FileSystem fs = has.createFileSystem(user, bundleAppPath.toUri(), fsConf);
275    
276                // app path could be a directory
277                if (!fs.isFile(bundleAppPath)) {
278                    configDefault = new Path(bundleAppPath, CONFIG_DEFAULT);
279                } else {
280                    configDefault = new Path(bundleAppPath.getParent(), CONFIG_DEFAULT);
281                }
282    
283                if (fs.exists(configDefault)) {
284                    Configuration defaultConf = new XConfiguration(fs.open(configDefault));
285                    PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES);
286                    XConfiguration.injectDefaults(defaultConf, conf);
287                }
288                else {
289                    LOG.info("configDefault Doesn't exist " + configDefault);
290                }
291                PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES);
292            }
293            catch (IOException e) {
294                throw new CommandException(ErrorCode.E0702, e.getMessage() + " : Problem reading default config "
295                        + configDefault, e);
296            }
297            catch (HadoopAccessorException e) {
298                throw new CommandException(e);
299            }
300            LOG.debug("Merged CONF :" + XmlUtils.prettyPrint(conf).toString());
301        }
302    
303        /**
304         * Read the application XML and validate against bundle Schema
305         *
306         * @return validated bundle XML
307         * @throws BundleJobException thrown if failed to read or validate xml
308         */
309        private String readAndValidateXml() throws BundleJobException {
310            String appPath = ParamChecker.notEmpty(conf.get(OozieClient.BUNDLE_APP_PATH), OozieClient.BUNDLE_APP_PATH);
311            String bundleXml = readDefinition(appPath);
312            validateXml(bundleXml);
313            return bundleXml;
314        }
315    
316        /**
317         * Read bundle definition.
318         *
319         * @param appPath application path.
320         * @param user user name.
321         * @param group group name.
322         * @param autToken authentication token.
323         * @return bundle definition.
324         * @throws BundleJobException thrown if the definition could not be read.
325         */
326        protected String readDefinition(String appPath) throws BundleJobException {
327            String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
328            //Configuration confHadoop = CoordUtils.getHadoopConf(conf);
329            try {
330                URI uri = new URI(appPath);
331                LOG.debug("user =" + user);
332                HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
333                Configuration fsConf = has.createJobConf(uri.getAuthority());
334                FileSystem fs = has.createFileSystem(user, uri, fsConf);
335                Path appDefPath = null;
336    
337                // app path could be a directory
338                Path path = new Path(uri.getPath());
339                if (!fs.isFile(path)) {
340                    appDefPath = new Path(path, BUNDLE_XML_FILE);
341                } else {
342                    appDefPath = path;
343                }
344    
345                Reader reader = new InputStreamReader(fs.open(appDefPath));
346                StringWriter writer = new StringWriter();
347                IOUtils.copyCharStream(reader, writer);
348                return writer.toString();
349            }
350            catch (IOException ex) {
351                LOG.warn("IOException :" + XmlUtils.prettyPrint(conf), ex);
352                throw new BundleJobException(ErrorCode.E1301, ex.getMessage(), ex);
353            }
354            catch (URISyntaxException ex) {
355                LOG.warn("URISyException :" + ex.getMessage());
356                throw new BundleJobException(ErrorCode.E1302, appPath, ex.getMessage(), ex);
357            }
358            catch (HadoopAccessorException ex) {
359                throw new BundleJobException(ex);
360            }
361            catch (Exception ex) {
362                LOG.warn("Exception :", ex);
363                throw new BundleJobException(ErrorCode.E1301, ex.getMessage(), ex);
364            }
365        }
366    
367        /**
368         * Validate against Bundle XSD file
369         *
370         * @param xmlContent input bundle xml
371         * @throws BundleJobException thrown if failed to validate xml
372         */
373        private void validateXml(String xmlContent) throws BundleJobException {
374            javax.xml.validation.Schema schema = Services.get().get(SchemaService.class).getSchema(SchemaName.BUNDLE);
375            Validator validator = schema.newValidator();
376            try {
377                validator.validate(new StreamSource(new StringReader(xmlContent)));
378            }
379            catch (SAXException ex) {
380                LOG.warn("SAXException :", ex);
381                throw new BundleJobException(ErrorCode.E0701, ex.getMessage(), ex);
382            }
383            catch (IOException ex) {
384                LOG.warn("IOException :", ex);
385                throw new BundleJobException(ErrorCode.E0702, ex.getMessage(), ex);
386            }
387        }
388    
389        /**
390         * Write a Bundle Job into database
391         *
392         * @param Bundle job bean
393         * @return job id
394         * @throws CommandException thrown if failed to store bundle job bean to db
395         */
396        private String storeToDB(BundleJobBean bundleJob, String resolvedJobXml) throws CommandException {
397            try {
398                jobId = Services.get().get(UUIDService.class).generateId(ApplicationType.BUNDLE);
399    
400                bundleJob.setId(jobId);
401                bundleJob.setAuthToken(this.authToken);
402                bundleJob.setAppName(XmlUtils.parseXml(bundleBean.getOrigJobXml()).getAttributeValue("name"));
403                bundleJob.setAppName(bundleJob.getAppName());
404                bundleJob.setAppPath(conf.get(OozieClient.BUNDLE_APP_PATH));
405                // bundleJob.setStatus(BundleJob.Status.PREP); //This should be set in parent class.
406                bundleJob.setCreatedTime(new Date());
407                bundleJob.setUser(conf.get(OozieClient.USER_NAME));
408                String group = ConfigUtils.getWithDeprecatedCheck(conf, OozieClient.JOB_ACL, OozieClient.GROUP_NAME, null);
409                bundleJob.setGroup(group);
410                bundleJob.setConf(XmlUtils.prettyPrint(conf).toString());
411                bundleJob.setJobXml(resolvedJobXml);
412                Element jobElement = XmlUtils.parseXml(resolvedJobXml);
413                Element controlsElement = jobElement.getChild("controls", jobElement.getNamespace());
414                if (controlsElement != null) {
415                    Element kickoffTimeElement = controlsElement.getChild("kick-off-time", jobElement.getNamespace());
416                    if (kickoffTimeElement != null && !kickoffTimeElement.getValue().isEmpty()) {
417                        Date kickoffTime = DateUtils.parseDateUTC(kickoffTimeElement.getValue());
418                        bundleJob.setKickoffTime(kickoffTime);
419                    }
420                }
421                bundleJob.setLastModifiedTime(new Date());
422    
423                if (!dryrun) {
424                    jpaService.execute(new BundleJobInsertJPAExecutor(bundleJob));
425                }
426            }
427            catch (Exception ex) {
428                throw new CommandException(ErrorCode.E1301, ex.getMessage(), ex);
429            }
430            return jobId;
431        }
432    
433        /* (non-Javadoc)
434         * @see org.apache.oozie.command.TransitionXCommand#getJob()
435         */
436        @Override
437        public Job getJob() {
438            return bundleBean;
439        }
440    
441        /**
442         * Resolve job xml with conf
443         *
444         * @param bundleXml bundle job xml
445         * @param conf job configuration
446         * @return resolved job xml
447         * @throws BundleJobException thrown if failed to resolve variables
448         */
449        private String resolvedVars(String bundleXml, Configuration conf) throws BundleJobException {
450            try {
451                ELEvaluator eval = createEvaluator(conf);
452                return eval.evaluate(bundleXml, String.class);
453            }
454            catch (Exception e) {
455                throw new BundleJobException(ErrorCode.E1004, e.getMessage(), e);
456            }
457        }
458    
459        /**
460         * Create ELEvaluator
461         *
462         * @param conf job configuration
463         * @return ELEvaluator the evaluator for el function
464         * @throws BundleJobException thrown if failed to create evaluator
465         */
466        public ELEvaluator createEvaluator(Configuration conf) throws BundleJobException {
467            ELEvaluator eval;
468            ELEvaluator.Context context;
469            try {
470                context = new ELEvaluator.Context();
471                eval = new ELEvaluator(context);
472                for (Map.Entry<String, String> entry : conf) {
473                    eval.setVariable(entry.getKey(), entry.getValue());
474                }
475            }
476            catch (Exception e) {
477                throw new BundleJobException(ErrorCode.E1004, e.getMessage(), e);
478            }
479            return eval;
480        }
481    
482        /**
483         * Verify the uniqueness of coordinator names
484         *
485         * @param resolved job xml
486         * @throws CommandException thrown if failed to verify the uniqueness of coordinator names
487         */
488        @SuppressWarnings("unchecked")
489        private Void verifyCoordNameUnique(String resolvedJobXml) throws CommandException {
490            Set<String> set = new HashSet<String>();
491            try {
492                Element bAppXml = XmlUtils.parseXml(resolvedJobXml);
493                List<Element> coordElems = bAppXml.getChildren("coordinator", bAppXml.getNamespace());
494                for (Element elem : coordElems) {
495                    Attribute name = elem.getAttribute("name");
496                    if (name != null) {
497                        if (set.contains(name.getValue())) {
498                            throw new CommandException(ErrorCode.E1304, name);
499                        }
500                        set.add(name.getValue());
501                    }
502                    else {
503                        throw new CommandException(ErrorCode.E1305);
504                    }
505                }
506            }
507            catch (JDOMException jex) {
508                throw new CommandException(ErrorCode.E1301, jex);
509            }
510    
511            return null;
512        }
513    
514        /* (non-Javadoc)
515         * @see org.apache.oozie.command.TransitionXCommand#updateJob()
516         */
517        @Override
518        public void updateJob() throws CommandException {
519        }
520    }