001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.bundle;
019    
020    import java.io.IOException;
021    import java.io.InputStreamReader;
022    import java.io.Reader;
023    import java.io.StringReader;
024    import java.io.StringWriter;
025    import java.net.URI;
026    import java.net.URISyntaxException;
027    import java.util.Date;
028    import java.util.HashSet;
029    import java.util.List;
030    import java.util.Map;
031    import java.util.Set;
032    
033    import javax.xml.transform.stream.StreamSource;
034    import javax.xml.validation.Validator;
035    
036    import org.apache.hadoop.conf.Configuration;
037    import org.apache.hadoop.fs.FileSystem;
038    import org.apache.hadoop.fs.Path;
039    import org.apache.oozie.BundleJobBean;
040    import org.apache.oozie.ErrorCode;
041    import org.apache.oozie.client.Job;
042    import org.apache.oozie.client.OozieClient;
043    import org.apache.oozie.command.CommandException;
044    import org.apache.oozie.command.PreconditionException;
045    import org.apache.oozie.command.SubmitTransitionXCommand;
046    import org.apache.oozie.executor.jpa.BundleJobInsertJPAExecutor;
047    import org.apache.oozie.service.HadoopAccessorException;
048    import org.apache.oozie.service.HadoopAccessorService;
049    import org.apache.oozie.service.JPAService;
050    import org.apache.oozie.service.SchemaService;
051    import org.apache.oozie.service.Services;
052    import org.apache.oozie.service.UUIDService;
053    import org.apache.oozie.service.WorkflowAppService;
054    import org.apache.oozie.service.SchemaService.SchemaName;
055    import org.apache.oozie.service.UUIDService.ApplicationType;
056    import org.apache.oozie.util.DateUtils;
057    import org.apache.oozie.util.ELEvaluator;
058    import org.apache.oozie.util.IOUtils;
059    import org.apache.oozie.util.InstrumentUtils;
060    import org.apache.oozie.util.LogUtils;
061    import org.apache.oozie.util.ParamChecker;
062    import org.apache.oozie.util.PropertiesUtils;
063    import org.apache.oozie.util.XConfiguration;
064    import org.apache.oozie.util.XmlUtils;
065    import org.jdom.Attribute;
066    import org.jdom.Element;
067    import org.jdom.JDOMException;
068    import org.xml.sax.SAXException;
069    
070    /**
071     * This Command will submit the bundle.
072     */
073    public class BundleSubmitXCommand extends SubmitTransitionXCommand {
074    
075        private Configuration conf;
076        private final String authToken;
077        public static final String CONFIG_DEFAULT = "bundle-config-default.xml";
078        public static final String BUNDLE_XML_FILE = "bundle.xml";
079        private final BundleJobBean bundleBean = new BundleJobBean();
080        private String jobId;
081        private JPAService jpaService = null;
082    
083        private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>();
084        private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>();
085    
086        static {
087            String[] badUserProps = { PropertiesUtils.YEAR, PropertiesUtils.MONTH, PropertiesUtils.DAY,
088                    PropertiesUtils.HOUR, PropertiesUtils.MINUTE, PropertiesUtils.DAYS, PropertiesUtils.HOURS,
089                    PropertiesUtils.MINUTES, PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB,
090                    PropertiesUtils.TB, PropertiesUtils.PB, PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN,
091                    PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN, PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS };
092            PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES);
093    
094            String[] badDefaultProps = { PropertiesUtils.HADOOP_USER, PropertiesUtils.HADOOP_UGI,
095                    WorkflowAppService.HADOOP_JT_KERBEROS_NAME, WorkflowAppService.HADOOP_NN_KERBEROS_NAME };
096            PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES);
097            PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES);
098        }
099    
100        /**
101         * Constructor to create the bundle submit command.
102         *
103         * @param conf configuration for bundle job
104         * @param authToken to be used for authentication
105         */
106        public BundleSubmitXCommand(Configuration conf, String authToken) {
107            super("bundle_submit", "bundle_submit", 1);
108            this.conf = ParamChecker.notNull(conf, "conf");
109            this.authToken = ParamChecker.notEmpty(authToken, "authToken");
110        }
111    
112        /**
113         * Constructor to create the bundle submit command.
114         *
115         * @param dryrun true if dryrun is enable
116         * @param conf configuration for bundle job
117         * @param authToken to be used for authentication
118         */
119        public BundleSubmitXCommand(boolean dryrun, Configuration conf, String authToken) {
120            this(conf, authToken);
121            this.dryrun = dryrun;
122        }
123    
124        /* (non-Javadoc)
125         * @see org.apache.oozie.command.SubmitTransitionXCommand#submit()
126         */
127        @Override
128        protected String submit() throws CommandException {
129            LOG.info("STARTED Bundle Submit");
130            try {
131                InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
132    
133                XmlUtils.removeComments(this.bundleBean.getOrigJobXml().toString());
134                // Resolving all variables in the job properties.
135                // This ensures the Hadoop Configuration semantics is preserved.
136                XConfiguration resolvedVarsConf = new XConfiguration();
137                for (Map.Entry<String, String> entry : conf) {
138                    resolvedVarsConf.set(entry.getKey(), conf.get(entry.getKey()));
139                }
140                conf = resolvedVarsConf;
141    
142                String resolvedJobXml = resolvedVars(bundleBean.getOrigJobXml(), conf);
143    
144                //verify the uniqueness of coord names
145                verifyCoordNameUnique(resolvedJobXml);
146                this.jobId = storeToDB(bundleBean, resolvedJobXml);
147                LogUtils.setLogInfo(bundleBean, logInfo);
148    
149                if (dryrun) {
150                    Date startTime = bundleBean.getStartTime();
151                    long startTimeMilli = startTime.getTime();
152                    long endTimeMilli = startTimeMilli + (3600 * 1000);
153                    Date jobEndTime = bundleBean.getEndTime();
154                    Date endTime = new Date(endTimeMilli);
155                    if (endTime.compareTo(jobEndTime) > 0) {
156                        endTime = jobEndTime;
157                    }
158                    jobId = bundleBean.getId();
159                    LOG.info("[" + jobId + "]: Update status to PREP");
160                    bundleBean.setStatus(Job.Status.PREP);
161                    try {
162                        new XConfiguration(new StringReader(bundleBean.getConf()));
163                    }
164                    catch (IOException e1) {
165                        LOG.warn("Configuration parse error. read from DB :" + bundleBean.getConf(), e1);
166                    }
167                    String output = bundleBean.getJobXml() + System.getProperty("line.separator");
168                    return output;
169                }
170                else {
171                    if (bundleBean.getKickoffTime() == null) {
172                        // If there is no KickOffTime, default kickoff is NOW.
173                        LOG.debug("Since kickoff time is not defined for job id " + jobId
174                                + ". Queuing and BundleStartXCommand immediately after submission");
175                        queue(new BundleStartXCommand(jobId));
176                    }
177                }
178            }
179            catch (Exception ex) {
180                throw new CommandException(ErrorCode.E1310, ex.getMessage(), ex);
181            }
182            LOG.info("ENDED Bundle Submit");
183            return this.jobId;
184        }
185    
186        /* (non-Javadoc)
187         * @see org.apache.oozie.command.TransitionXCommand#notifyParent()
188         */
189        @Override
190        public void notifyParent() throws CommandException {
191        }
192    
193        /* (non-Javadoc)
194         * @see org.apache.oozie.command.XCommand#getEntityKey()
195         */
196        @Override
197        protected String getEntityKey() {
198            return null;
199        }
200    
201        /* (non-Javadoc)
202         * @see org.apache.oozie.command.XCommand#isLockRequired()
203         */
204        @Override
205        protected boolean isLockRequired() {
206            return false;
207        }
208    
209        /* (non-Javadoc)
210         * @see org.apache.oozie.command.XCommand#loadState()
211         */
212        @Override
213        protected void loadState() throws CommandException {
214        }
215    
216        /* (non-Javadoc)
217         * @see org.apache.oozie.command.XCommand#verifyPrecondition()
218         */
219        @Override
220        protected void verifyPrecondition() throws CommandException, PreconditionException {
221        }
222    
223        /* (non-Javadoc)
224         * @see org.apache.oozie.command.XCommand#eagerLoadState()
225         */
226        @Override
227        protected void eagerLoadState() throws CommandException {
228            super.eagerLoadState();
229            jpaService = Services.get().get(JPAService.class);
230            if (jpaService == null) {
231                throw new CommandException(ErrorCode.E0610);
232            }
233        }
234    
235        /* (non-Javadoc)
236         * @see org.apache.oozie.command.XCommand#eagerVerifyPrecondition()
237         */
238        @Override
239        protected void eagerVerifyPrecondition() throws CommandException, PreconditionException {
240            try {
241                super.eagerVerifyPrecondition();
242                mergeDefaultConfig();
243                String appXml = readAndValidateXml();
244                bundleBean.setOrigJobXml(appXml);
245                LOG.debug("jobXml after initial validation " + XmlUtils.prettyPrint(appXml).toString());
246            }
247            catch (BundleJobException ex) {
248                LOG.warn("BundleJobException:  ", ex);
249                throw new CommandException(ex);
250            }
251            catch (IllegalArgumentException iex) {
252                LOG.warn("IllegalArgumentException:  ", iex);
253                throw new CommandException(ErrorCode.E1310, iex);
254            }
255            catch (Exception ex) {
256                LOG.warn("Exception:  ", ex);
257                throw new CommandException(ErrorCode.E1310, ex);
258            }
259        }
260    
261        /**
262         * Merge default configuration with user-defined configuration.
263         *
264         * @throws CommandException thrown if failed to merge configuration
265         */
266        protected void mergeDefaultConfig() throws CommandException {
267            Path configDefault = null;
268            try {
269                String bundleAppPathStr = conf.get(OozieClient.BUNDLE_APP_PATH);
270                Path bundleAppPath = new Path(bundleAppPathStr);
271                String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
272                String group = ParamChecker.notEmpty(conf.get(OozieClient.GROUP_NAME), OozieClient.GROUP_NAME);
273                FileSystem fs = Services.get().get(HadoopAccessorService.class).createFileSystem(user, group, bundleAppPath.toUri(),
274                        new Configuration());
275    
276                // app path could be a directory
277                if (!fs.isFile(bundleAppPath)) {
278                    configDefault = new Path(bundleAppPath, CONFIG_DEFAULT);
279                } else {
280                    configDefault = new Path(bundleAppPath.getParent(), CONFIG_DEFAULT);
281                }
282    
283                if (fs.exists(configDefault)) {
284                    Configuration defaultConf = new XConfiguration(fs.open(configDefault));
285                    PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES);
286                    XConfiguration.injectDefaults(defaultConf, conf);
287                }
288                else {
289                    LOG.info("configDefault Doesn't exist " + configDefault);
290                }
291                PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES);
292            }
293            catch (IOException e) {
294                throw new CommandException(ErrorCode.E0702, e.getMessage() + " : Problem reading default config "
295                        + configDefault, e);
296            }
297            catch (HadoopAccessorException e) {
298                throw new CommandException(e);
299            }
300            LOG.debug("Merged CONF :" + XmlUtils.prettyPrint(conf).toString());
301        }
302    
303        /**
304         * Read the application XML and validate against bundle Schema
305         *
306         * @return validated bundle XML
307         * @throws BundleJobException thrown if failed to read or validate xml
308         */
309        private String readAndValidateXml() throws BundleJobException {
310            String appPath = ParamChecker.notEmpty(conf.get(OozieClient.BUNDLE_APP_PATH), OozieClient.BUNDLE_APP_PATH);
311            String bundleXml = readDefinition(appPath);
312            validateXml(bundleXml);
313            return bundleXml;
314        }
315    
316        /**
317         * Read bundle definition.
318         *
319         * @param appPath application path.
320         * @param user user name.
321         * @param group group name.
322         * @param autToken authentication token.
323         * @return bundle definition.
324         * @throws BundleJobException thrown if the definition could not be read.
325         */
326        protected String readDefinition(String appPath) throws BundleJobException {
327            String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
328            String group = ParamChecker.notEmpty(conf.get(OozieClient.GROUP_NAME), OozieClient.GROUP_NAME);
329            //Configuration confHadoop = CoordUtils.getHadoopConf(conf);
330            try {
331                URI uri = new URI(appPath);
332                LOG.debug("user =" + user + " group =" + group);
333                FileSystem fs = Services.get().get(HadoopAccessorService.class).createFileSystem(user, group, uri,
334                        new Configuration());
335                Path appDefPath = null;
336    
337                // app path could be a directory
338                Path path = new Path(uri.getPath());
339                if (!fs.isFile(path)) {
340                    appDefPath = new Path(path, BUNDLE_XML_FILE);
341                } else {
342                    appDefPath = path;
343                }
344    
345                Reader reader = new InputStreamReader(fs.open(appDefPath));
346                StringWriter writer = new StringWriter();
347                IOUtils.copyCharStream(reader, writer);
348                return writer.toString();
349            }
350            catch (IOException ex) {
351                LOG.warn("IOException :" + XmlUtils.prettyPrint(conf), ex);
352                throw new BundleJobException(ErrorCode.E1301, ex.getMessage(), ex);
353            }
354            catch (URISyntaxException ex) {
355                LOG.warn("URISyException :" + ex.getMessage());
356                throw new BundleJobException(ErrorCode.E1302, appPath, ex.getMessage(), ex);
357            }
358            catch (HadoopAccessorException ex) {
359                throw new BundleJobException(ex);
360            }
361            catch (Exception ex) {
362                LOG.warn("Exception :", ex);
363                throw new BundleJobException(ErrorCode.E1301, ex.getMessage(), ex);
364            }
365        }
366    
367        /**
368         * Validate against Bundle XSD file
369         *
370         * @param xmlContent input bundle xml
371         * @throws BundleJobException thrown if failed to validate xml
372         */
373        private void validateXml(String xmlContent) throws BundleJobException {
374            javax.xml.validation.Schema schema = Services.get().get(SchemaService.class).getSchema(SchemaName.BUNDLE);
375            Validator validator = schema.newValidator();
376            try {
377                validator.validate(new StreamSource(new StringReader(xmlContent)));
378            }
379            catch (SAXException ex) {
380                LOG.warn("SAXException :", ex);
381                throw new BundleJobException(ErrorCode.E0701, ex.getMessage(), ex);
382            }
383            catch (IOException ex) {
384                LOG.warn("IOException :", ex);
385                throw new BundleJobException(ErrorCode.E0702, ex.getMessage(), ex);
386            }
387        }
388    
389        /**
390         * Write a Bundle Job into database
391         *
392         * @param Bundle job bean
393         * @return job id
394         * @throws CommandException thrown if failed to store bundle job bean to db
395         */
396        private String storeToDB(BundleJobBean bundleJob, String resolvedJobXml) throws CommandException {
397            try {
398                jobId = Services.get().get(UUIDService.class).generateId(ApplicationType.BUNDLE);
399    
400                bundleJob.setId(jobId);
401                bundleJob.setAuthToken(this.authToken);
402                bundleJob.setAppName(XmlUtils.parseXml(bundleBean.getOrigJobXml()).getAttributeValue("name"));
403                bundleJob.setAppName(bundleJob.getAppName());
404                bundleJob.setAppPath(conf.get(OozieClient.BUNDLE_APP_PATH));
405                // bundleJob.setStatus(BundleJob.Status.PREP); //This should be set in parent class.
406                bundleJob.setCreatedTime(new Date());
407                bundleJob.setUser(conf.get(OozieClient.USER_NAME));
408                bundleJob.setGroup(conf.get(OozieClient.GROUP_NAME));
409                bundleJob.setConf(XmlUtils.prettyPrint(conf).toString());
410                bundleJob.setJobXml(resolvedJobXml);
411                Element jobElement = XmlUtils.parseXml(resolvedJobXml);
412                Element controlsElement = jobElement.getChild("controls", jobElement.getNamespace());
413                if (controlsElement != null) {
414                    Element kickoffTimeElement = controlsElement.getChild("kick-off-time", jobElement.getNamespace());
415                    if (kickoffTimeElement != null && !kickoffTimeElement.getValue().isEmpty()) {
416                        Date kickoffTime = DateUtils.parseDateUTC(kickoffTimeElement.getValue());
417                        bundleJob.setKickoffTime(kickoffTime);
418                    }
419                }
420                bundleJob.setLastModifiedTime(new Date());
421    
422                if (!dryrun) {
423                    jpaService.execute(new BundleJobInsertJPAExecutor(bundleJob));
424                }
425            }
426            catch (Exception ex) {
427                throw new CommandException(ErrorCode.E1301, ex.getMessage(), ex);
428            }
429            return jobId;
430        }
431    
432        /* (non-Javadoc)
433         * @see org.apache.oozie.command.TransitionXCommand#getJob()
434         */
435        @Override
436        public Job getJob() {
437            return bundleBean;
438        }
439    
440        /**
441         * Resolve job xml with conf
442         *
443         * @param bundleXml bundle job xml
444         * @param conf job configuration
445         * @return resolved job xml
446         * @throws BundleJobException thrown if failed to resolve variables
447         */
448        private String resolvedVars(String bundleXml, Configuration conf) throws BundleJobException {
449            try {
450                ELEvaluator eval = createEvaluator(conf);
451                return eval.evaluate(bundleXml, String.class);
452            }
453            catch (Exception e) {
454                throw new BundleJobException(ErrorCode.E1004, e.getMessage(), e);
455            }
456        }
457    
458        /**
459         * Create ELEvaluator
460         *
461         * @param conf job configuration
462         * @return ELEvaluator the evaluator for el function
463         * @throws BundleJobException thrown if failed to create evaluator
464         */
465        public ELEvaluator createEvaluator(Configuration conf) throws BundleJobException {
466            ELEvaluator eval;
467            ELEvaluator.Context context;
468            try {
469                context = new ELEvaluator.Context();
470                eval = new ELEvaluator(context);
471                for (Map.Entry<String, String> entry : conf) {
472                    eval.setVariable(entry.getKey(), entry.getValue());
473                }
474            }
475            catch (Exception e) {
476                throw new BundleJobException(ErrorCode.E1004, e.getMessage(), e);
477            }
478            return eval;
479        }
480    
481        /**
482         * Verify the uniqueness of coordinator names
483         *
484         * @param resolved job xml
485         * @throws CommandException thrown if failed to verify the uniqueness of coordinator names
486         */
487        @SuppressWarnings("unchecked")
488        private Void verifyCoordNameUnique(String resolvedJobXml) throws CommandException {
489            Set<String> set = new HashSet<String>();
490            try {
491                Element bAppXml = XmlUtils.parseXml(resolvedJobXml);
492                List<Element> coordElems = bAppXml.getChildren("coordinator", bAppXml.getNamespace());
493                for (Element elem : coordElems) {
494                    Attribute name = elem.getAttribute("name");
495                    if (name != null) {
496                        if (set.contains(name.getValue())) {
497                            throw new CommandException(ErrorCode.E1304, name);
498                        }
499                        set.add(name.getValue());
500                    }
501                    else {
502                        throw new CommandException(ErrorCode.E1305);
503                    }
504                }
505            }
506            catch (JDOMException jex) {
507                throw new CommandException(ErrorCode.E1301, jex);
508            }
509    
510            return null;
511        }
512    
513        /* (non-Javadoc)
514         * @see org.apache.oozie.command.TransitionXCommand#updateJob()
515         */
516        @Override
517        public void updateJob() throws CommandException {
518        }
519    }