001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.command.bundle;
019
020import java.io.IOException;
021import java.io.InputStreamReader;
022import java.io.Reader;
023import java.io.StringReader;
024import java.io.StringWriter;
025import java.net.URI;
026import java.net.URISyntaxException;
027import java.util.Date;
028import java.util.HashSet;
029import java.util.List;
030import java.util.Map;
031import java.util.Set;
032
033import javax.xml.transform.stream.StreamSource;
034import javax.xml.validation.Validator;
035
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.fs.FileSystem;
038import org.apache.hadoop.fs.Path;
039import org.apache.oozie.BundleJobBean;
040import org.apache.oozie.ErrorCode;
041import org.apache.oozie.client.Job;
042import org.apache.oozie.client.OozieClient;
043import org.apache.oozie.command.CommandException;
044import org.apache.oozie.command.PreconditionException;
045import org.apache.oozie.command.SubmitTransitionXCommand;
046import org.apache.oozie.executor.jpa.BundleJobQueryExecutor;
047import org.apache.oozie.service.HadoopAccessorException;
048import org.apache.oozie.service.HadoopAccessorService;
049import org.apache.oozie.service.SchemaService;
050import org.apache.oozie.service.Services;
051import org.apache.oozie.service.UUIDService;
052import org.apache.oozie.service.SchemaService.SchemaName;
053import org.apache.oozie.service.UUIDService.ApplicationType;
054import org.apache.oozie.util.ConfigUtils;
055import org.apache.oozie.util.DateUtils;
056import org.apache.oozie.util.ELEvaluator;
057import org.apache.oozie.util.ELUtils;
058import org.apache.oozie.util.IOUtils;
059import org.apache.oozie.util.InstrumentUtils;
060import org.apache.oozie.util.LogUtils;
061import org.apache.oozie.util.ParamChecker;
062import org.apache.oozie.util.PropertiesUtils;
063import org.apache.oozie.util.XConfiguration;
064import org.apache.oozie.util.XmlUtils;
065import org.apache.oozie.util.ParameterVerifier;
066import org.jdom.Attribute;
067import org.jdom.Element;
068import org.jdom.JDOMException;
069import org.xml.sax.SAXException;
070
071/**
072 * This Command will submit the bundle.
073 */
074public class BundleSubmitXCommand extends SubmitTransitionXCommand {
075
076    private Configuration conf;
077    public static final String CONFIG_DEFAULT = "bundle-config-default.xml";
078    public static final String BUNDLE_XML_FILE = "bundle.xml";
079    private final BundleJobBean bundleBean = new BundleJobBean();
080    private String jobId;
081    private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>();
082    private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>();
083
084    static {
085        String[] badUserProps = { PropertiesUtils.YEAR, PropertiesUtils.MONTH, PropertiesUtils.DAY,
086                PropertiesUtils.HOUR, PropertiesUtils.MINUTE, PropertiesUtils.DAYS, PropertiesUtils.HOURS,
087                PropertiesUtils.MINUTES, PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB,
088                PropertiesUtils.TB, PropertiesUtils.PB, PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN,
089                PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN, PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS };
090        PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES);
091
092        String[] badDefaultProps = { PropertiesUtils.HADOOP_USER};
093        PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES);
094        PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES);
095    }
096
097    /**
098     * Constructor to create the bundle submit command.
099     *
100     * @param conf configuration for bundle job
101     */
102    public BundleSubmitXCommand(Configuration conf) {
103        super("bundle_submit", "bundle_submit", 1);
104        this.conf = ParamChecker.notNull(conf, "conf");
105    }
106
107    /**
108     * Constructor to create the bundle submit command.
109     *
110     * @param dryrun true if dryrun is enable
111     * @param conf configuration for bundle job
112     */
113    public BundleSubmitXCommand(boolean dryrun, Configuration conf) {
114        this(conf);
115        this.dryrun = dryrun;
116    }
117
118    /* (non-Javadoc)
119     * @see org.apache.oozie.command.SubmitTransitionXCommand#submit()
120     */
121    @Override
122    protected String submit() throws CommandException {
123        LOG.info("STARTED Bundle Submit");
124        try {
125            InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
126
127            ParameterVerifier.verifyParameters(conf, XmlUtils.parseXml(bundleBean.getOrigJobXml()));
128
129            String jobXmlWithNoComment = XmlUtils.removeComments(this.bundleBean.getOrigJobXml().toString());
130            // Resolving all variables in the job properties.
131            // This ensures the Hadoop Configuration semantics is preserved.
132            XConfiguration resolvedVarsConf = new XConfiguration();
133            for (Map.Entry<String, String> entry : conf) {
134                resolvedVarsConf.set(entry.getKey(), conf.get(entry.getKey()));
135            }
136            conf = resolvedVarsConf;
137
138            String resolvedJobXml = resolvedVars(jobXmlWithNoComment, conf);
139
140            //verify the uniqueness of coord names
141            verifyCoordNameUnique(resolvedJobXml);
142            this.jobId = storeToDB(bundleBean, resolvedJobXml);
143            LogUtils.setLogInfo(bundleBean, logInfo);
144
145            if (dryrun) {
146                Date startTime = bundleBean.getStartTime();
147                long startTimeMilli = startTime.getTime();
148                long endTimeMilli = startTimeMilli + (3600 * 1000);
149                Date jobEndTime = bundleBean.getEndTime();
150                Date endTime = new Date(endTimeMilli);
151                if (endTime.compareTo(jobEndTime) > 0) {
152                    endTime = jobEndTime;
153                }
154                jobId = bundleBean.getId();
155                LOG.info("[" + jobId + "]: Update status to PREP");
156                bundleBean.setStatus(Job.Status.PREP);
157                try {
158                    new XConfiguration(new StringReader(bundleBean.getConf()));
159                }
160                catch (IOException e1) {
161                    LOG.warn("Configuration parse error. read from DB :" + bundleBean.getConf(), e1);
162                }
163                String output = bundleBean.getJobXml() + System.getProperty("line.separator");
164                return output;
165            }
166            else {
167                if (bundleBean.getKickoffTime() == null) {
168                    // If there is no KickOffTime, default kickoff is NOW.
169                    LOG.debug("Since kickoff time is not defined for job id " + jobId
170                            + ". Queuing and BundleStartXCommand immediately after submission");
171                    queue(new BundleStartXCommand(jobId));
172                }
173            }
174        }
175        catch (Exception ex) {
176            throw new CommandException(ErrorCode.E1310, ex.getMessage(), ex);
177        }
178        LOG.info("ENDED Bundle Submit");
179        return this.jobId;
180    }
181
182    /* (non-Javadoc)
183     * @see org.apache.oozie.command.TransitionXCommand#notifyParent()
184     */
185    @Override
186    public void notifyParent() throws CommandException {
187    }
188
189    /* (non-Javadoc)
190     * @see org.apache.oozie.command.XCommand#getEntityKey()
191     */
192    @Override
193    public String getEntityKey() {
194        return null;
195    }
196
197    /* (non-Javadoc)
198     * @see org.apache.oozie.command.XCommand#isLockRequired()
199     */
200    @Override
201    protected boolean isLockRequired() {
202        return false;
203    }
204
205    @Override
206    protected void loadState() throws CommandException {
207    }
208
209    @Override
210    protected void verifyPrecondition() throws CommandException, PreconditionException {
211    }
212
213    @Override
214    protected void eagerLoadState() throws CommandException {
215    }
216
217    @Override
218    protected void eagerVerifyPrecondition() throws CommandException, PreconditionException {
219        try {
220            mergeDefaultConfig();
221            String appXml = readAndValidateXml();
222            bundleBean.setOrigJobXml(appXml);
223            LOG.debug("jobXml after initial validation " + XmlUtils.prettyPrint(appXml).toString());
224        }
225        catch (BundleJobException ex) {
226            LOG.warn("BundleJobException:  ", ex);
227            throw new CommandException(ex);
228        }
229        catch (IllegalArgumentException iex) {
230            LOG.warn("IllegalArgumentException:  ", iex);
231            throw new CommandException(ErrorCode.E1310, iex.getMessage(), iex);
232        }
233        catch (Exception ex) {
234            LOG.warn("Exception:  ", ex);
235            throw new CommandException(ErrorCode.E1310, ex.getMessage(), ex);
236        }
237    }
238
239    /**
240     * Merge default configuration with user-defined configuration.
241     *
242     * @throws CommandException thrown if failed to merge configuration
243     */
244    protected void mergeDefaultConfig() throws CommandException {
245        Path configDefault = null;
246        try {
247            String bundleAppPathStr = conf.get(OozieClient.BUNDLE_APP_PATH);
248            Path bundleAppPath = new Path(bundleAppPathStr);
249            String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
250            HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
251            Configuration fsConf = has.createJobConf(bundleAppPath.toUri().getAuthority());
252            FileSystem fs = has.createFileSystem(user, bundleAppPath.toUri(), fsConf);
253
254            // app path could be a directory
255            if (!fs.isFile(bundleAppPath)) {
256                configDefault = new Path(bundleAppPath, CONFIG_DEFAULT);
257            } else {
258                configDefault = new Path(bundleAppPath.getParent(), CONFIG_DEFAULT);
259            }
260
261            if (fs.exists(configDefault)) {
262                Configuration defaultConf = new XConfiguration(fs.open(configDefault));
263                PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES);
264                XConfiguration.injectDefaults(defaultConf, conf);
265            }
266            else {
267                LOG.info("configDefault Doesn't exist " + configDefault);
268            }
269            PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES);
270        }
271        catch (IOException e) {
272            throw new CommandException(ErrorCode.E0702, e.getMessage() + " : Problem reading default config "
273                    + configDefault, e);
274        }
275        catch (HadoopAccessorException e) {
276            throw new CommandException(e);
277        }
278        LOG.debug("Merged CONF :" + XmlUtils.prettyPrint(conf).toString());
279    }
280
281    /**
282     * Read the application XML and validate against bundle Schema
283     *
284     * @return validated bundle XML
285     * @throws BundleJobException thrown if failed to read or validate xml
286     */
287    private String readAndValidateXml() throws BundleJobException {
288        String appPath = ParamChecker.notEmpty(conf.get(OozieClient.BUNDLE_APP_PATH), OozieClient.BUNDLE_APP_PATH);
289        String bundleXml = readDefinition(appPath);
290        validateXml(bundleXml);
291        return bundleXml;
292    }
293
294    /**
295     * Read bundle definition.
296     *
297     * @param appPath application path.
298     * @param user user name.
299     * @param group group name.
300     * @return bundle definition.
301     * @throws BundleJobException thrown if the definition could not be read.
302     */
303    protected String readDefinition(String appPath) throws BundleJobException {
304        String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
305        //Configuration confHadoop = CoordUtils.getHadoopConf(conf);
306        try {
307            URI uri = new URI(appPath);
308            LOG.debug("user =" + user);
309            HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
310            Configuration fsConf = has.createJobConf(uri.getAuthority());
311            FileSystem fs = has.createFileSystem(user, uri, fsConf);
312            Path appDefPath = null;
313
314            // app path could be a directory
315            Path path = new Path(uri.getPath());
316            if (!fs.isFile(path)) {
317                appDefPath = new Path(path, BUNDLE_XML_FILE);
318            } else {
319                appDefPath = path;
320            }
321
322            Reader reader = new InputStreamReader(fs.open(appDefPath));
323            StringWriter writer = new StringWriter();
324            IOUtils.copyCharStream(reader, writer);
325            return writer.toString();
326        }
327        catch (IOException ex) {
328            LOG.warn("IOException :" + XmlUtils.prettyPrint(conf), ex);
329            throw new BundleJobException(ErrorCode.E1301, ex.getMessage(), ex);
330        }
331        catch (URISyntaxException ex) {
332            LOG.warn("URISyException :" + ex.getMessage());
333            throw new BundleJobException(ErrorCode.E1302, appPath, ex.getMessage(), ex);
334        }
335        catch (HadoopAccessorException ex) {
336            throw new BundleJobException(ex);
337        }
338        catch (Exception ex) {
339            LOG.warn("Exception :", ex);
340            throw new BundleJobException(ErrorCode.E1301, ex.getMessage(), ex);
341        }
342    }
343
344    /**
345     * Validate against Bundle XSD file
346     *
347     * @param xmlContent input bundle xml
348     * @throws BundleJobException thrown if failed to validate xml
349     */
350    private void validateXml(String xmlContent) throws BundleJobException {
351        javax.xml.validation.Schema schema = Services.get().get(SchemaService.class).getSchema(SchemaName.BUNDLE);
352        Validator validator = schema.newValidator();
353        try {
354            validator.validate(new StreamSource(new StringReader(xmlContent)));
355        }
356        catch (SAXException ex) {
357            LOG.warn("SAXException :", ex);
358            throw new BundleJobException(ErrorCode.E0701, ex.getMessage(), ex);
359        }
360        catch (IOException ex) {
361            LOG.warn("IOException :", ex);
362            throw new BundleJobException(ErrorCode.E0702, ex.getMessage(), ex);
363        }
364    }
365
366    /**
367     * Write a Bundle Job into database
368     *
369     * @param Bundle job bean
370     * @return job id
371     * @throws CommandException thrown if failed to store bundle job bean to db
372     */
373    private String storeToDB(BundleJobBean bundleJob, String resolvedJobXml) throws CommandException {
374        try {
375            jobId = Services.get().get(UUIDService.class).generateId(ApplicationType.BUNDLE);
376
377            bundleJob.setId(jobId);
378            String name = XmlUtils.parseXml(bundleBean.getOrigJobXml()).getAttributeValue("name");
379            name = ELUtils.resolveAppName(name, conf);
380            bundleJob.setAppName(name);
381            bundleJob.setAppPath(conf.get(OozieClient.BUNDLE_APP_PATH));
382            // bundleJob.setStatus(BundleJob.Status.PREP); //This should be set in parent class.
383            bundleJob.setCreatedTime(new Date());
384            bundleJob.setUser(conf.get(OozieClient.USER_NAME));
385            String group = ConfigUtils.getWithDeprecatedCheck(conf, OozieClient.JOB_ACL, OozieClient.GROUP_NAME, null);
386            bundleJob.setGroup(group);
387            bundleJob.setConf(XmlUtils.prettyPrint(conf).toString());
388            bundleJob.setJobXml(resolvedJobXml);
389            Element jobElement = XmlUtils.parseXml(resolvedJobXml);
390            Element controlsElement = jobElement.getChild("controls", jobElement.getNamespace());
391            if (controlsElement != null) {
392                Element kickoffTimeElement = controlsElement.getChild("kick-off-time", jobElement.getNamespace());
393                if (kickoffTimeElement != null && !kickoffTimeElement.getValue().isEmpty()) {
394                    Date kickoffTime = DateUtils.parseDateOozieTZ(kickoffTimeElement.getValue());
395                    bundleJob.setKickoffTime(kickoffTime);
396                }
397            }
398            bundleJob.setLastModifiedTime(new Date());
399
400            if (!dryrun) {
401                BundleJobQueryExecutor.getInstance().insert(bundleJob);
402            }
403        }
404        catch (Exception ex) {
405            throw new CommandException(ErrorCode.E1301, ex.getMessage(), ex);
406        }
407        return jobId;
408    }
409
410    /* (non-Javadoc)
411     * @see org.apache.oozie.command.TransitionXCommand#getJob()
412     */
413    @Override
414    public Job getJob() {
415        return bundleBean;
416    }
417
418    /**
419     * Resolve job xml with conf
420     *
421     * @param bundleXml bundle job xml
422     * @param conf job configuration
423     * @return resolved job xml
424     * @throws BundleJobException thrown if failed to resolve variables
425     */
426    private String resolvedVars(String bundleXml, Configuration conf) throws BundleJobException {
427        try {
428            ELEvaluator eval = createEvaluator(conf);
429            return eval.evaluate(bundleXml, String.class);
430        }
431        catch (Exception e) {
432            throw new BundleJobException(ErrorCode.E1004, e.getMessage(), e);
433        }
434    }
435
436    /**
437     * Create ELEvaluator
438     *
439     * @param conf job configuration
440     * @return ELEvaluator the evaluator for el function
441     * @throws BundleJobException thrown if failed to create evaluator
442     */
443    public ELEvaluator createEvaluator(Configuration conf) throws BundleJobException {
444        ELEvaluator eval;
445        ELEvaluator.Context context;
446        try {
447            context = new ELEvaluator.Context();
448            eval = new ELEvaluator(context);
449            for (Map.Entry<String, String> entry : conf) {
450                eval.setVariable(entry.getKey(), entry.getValue());
451            }
452        }
453        catch (Exception e) {
454            throw new BundleJobException(ErrorCode.E1004, e.getMessage(), e);
455        }
456        return eval;
457    }
458
459    /**
460     * Verify the uniqueness of coordinator names
461     *
462     * @param resolved job xml
463     * @throws CommandException thrown if failed to verify the uniqueness of coordinator names
464     */
465    @SuppressWarnings("unchecked")
466    private Void verifyCoordNameUnique(String resolvedJobXml) throws CommandException {
467        Set<String> set = new HashSet<String>();
468        try {
469            Element bAppXml = XmlUtils.parseXml(resolvedJobXml);
470            List<Element> coordElems = bAppXml.getChildren("coordinator", bAppXml.getNamespace());
471            for (Element elem : coordElems) {
472                Attribute name = elem.getAttribute("name");
473                if (name != null) {
474                    if (set.contains(name.getValue())) {
475                        throw new CommandException(ErrorCode.E1304, name);
476                    }
477                    set.add(name.getValue());
478                }
479                else {
480                    throw new CommandException(ErrorCode.E1305);
481                }
482            }
483        }
484        catch (JDOMException jex) {
485            throw new CommandException(ErrorCode.E1301, jex.getMessage(), jex);
486        }
487
488        return null;
489    }
490
491    /* (non-Javadoc)
492     * @see org.apache.oozie.command.TransitionXCommand#updateJob()
493     */
494    @Override
495    public void updateJob() throws CommandException {
496    }
497
498    @Override
499    public void performWrites() throws CommandException {
500    }
501}