001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.command.bundle;
020
021import java.io.IOException;
022import java.io.InputStreamReader;
023import java.io.Reader;
024import java.io.StringReader;
025import java.io.StringWriter;
026import java.net.URI;
027import java.net.URISyntaxException;
028import java.util.Date;
029import java.util.HashSet;
030import java.util.List;
031import java.util.Map;
032import java.util.Set;
033
034import javax.xml.transform.stream.StreamSource;
035import javax.xml.validation.Validator;
036
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.fs.FileSystem;
039import org.apache.hadoop.fs.Path;
040import org.apache.oozie.BundleJobBean;
041import org.apache.oozie.ErrorCode;
042import org.apache.oozie.client.Job;
043import org.apache.oozie.client.OozieClient;
044import org.apache.oozie.command.CommandException;
045import org.apache.oozie.command.PreconditionException;
046import org.apache.oozie.command.SubmitTransitionXCommand;
047import org.apache.oozie.executor.jpa.BundleJobQueryExecutor;
048import org.apache.oozie.service.ELService;
049import org.apache.oozie.service.HadoopAccessorException;
050import org.apache.oozie.service.HadoopAccessorService;
051import org.apache.oozie.service.SchemaService;
052import org.apache.oozie.service.SchemaService.SchemaName;
053import org.apache.oozie.service.Services;
054import org.apache.oozie.service.UUIDService;
055import org.apache.oozie.service.UUIDService.ApplicationType;
056import org.apache.oozie.util.ConfigUtils;
057import org.apache.oozie.util.DateUtils;
058import org.apache.oozie.util.ELEvaluator;
059import org.apache.oozie.util.ELUtils;
060import org.apache.oozie.util.IOUtils;
061import org.apache.oozie.util.InstrumentUtils;
062import org.apache.oozie.util.LogUtils;
063import org.apache.oozie.util.ParamChecker;
064import org.apache.oozie.util.ParameterVerifier;
065import org.apache.oozie.util.PropertiesUtils;
066import org.apache.oozie.util.XConfiguration;
067import org.apache.oozie.util.XmlUtils;
068import org.jdom.Attribute;
069import org.jdom.Element;
070import org.jdom.JDOMException;
071import org.xml.sax.SAXException;
072
073/**
074 * This Command will submit the bundle.
075 */
076public class BundleSubmitXCommand extends SubmitTransitionXCommand {
077
078    private Configuration conf;
079    public static final String CONFIG_DEFAULT = "bundle-config-default.xml";
080    public static final String BUNDLE_XML_FILE = "bundle.xml";
081    private final BundleJobBean bundleBean = new BundleJobBean();
082    private String jobId;
083    private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>();
084    private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>();
085
086    static {
087        String[] badUserProps = { PropertiesUtils.YEAR, PropertiesUtils.MONTH, PropertiesUtils.DAY,
088                PropertiesUtils.HOUR, PropertiesUtils.MINUTE, PropertiesUtils.DAYS, PropertiesUtils.HOURS,
089                PropertiesUtils.MINUTES, PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB,
090                PropertiesUtils.TB, PropertiesUtils.PB, PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN,
091                PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN, PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS };
092        PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES);
093
094        String[] badDefaultProps = { PropertiesUtils.HADOOP_USER};
095        PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES);
096        PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES);
097    }
098
099    /**
100     * Constructor to create the bundle submit command.
101     *
102     * @param conf configuration for bundle job
103     */
104    public BundleSubmitXCommand(Configuration conf) {
105        super("bundle_submit", "bundle_submit", 1);
106        this.conf = ParamChecker.notNull(conf, "conf");
107    }
108
109    /**
110     * Constructor to create the bundle submit command.
111     *
112     * @param dryrun true if dryrun is enable
113     * @param conf configuration for bundle job
114     */
115    public BundleSubmitXCommand(boolean dryrun, Configuration conf) {
116        this(conf);
117        this.dryrun = dryrun;
118    }
119
120    /* (non-Javadoc)
121     * @see org.apache.oozie.command.SubmitTransitionXCommand#submit()
122     */
123    @Override
124    protected String submit() throws CommandException {
125        LOG.info("STARTED Bundle Submit");
126        try {
127            InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
128
129            ParameterVerifier.verifyParameters(conf, XmlUtils.parseXml(bundleBean.getOrigJobXml()));
130
131            String jobXmlWithNoComment = XmlUtils.removeComments(this.bundleBean.getOrigJobXml().toString());
132            // Resolving all variables in the job properties.
133            // This ensures the Hadoop Configuration semantics is preserved.
134            XConfiguration resolvedVarsConf = new XConfiguration();
135            for (Map.Entry<String, String> entry : conf) {
136                resolvedVarsConf.set(entry.getKey(), conf.get(entry.getKey()));
137            }
138            conf = resolvedVarsConf;
139
140            String resolvedJobXml = resolvedVarsandFunctions(jobXmlWithNoComment, conf);
141
142            //verify the uniqueness of coord names
143            verifyCoordNameUnique(resolvedJobXml);
144            this.jobId = storeToDB(bundleBean, resolvedJobXml);
145            LogUtils.setLogInfo(bundleBean);
146
147            if (dryrun) {
148                Date startTime = bundleBean.getStartTime();
149                long startTimeMilli = startTime.getTime();
150                long endTimeMilli = startTimeMilli + (3600 * 1000);
151                Date jobEndTime = bundleBean.getEndTime();
152                Date endTime = new Date(endTimeMilli);
153                if (endTime.compareTo(jobEndTime) > 0) {
154                    endTime = jobEndTime;
155                }
156                jobId = bundleBean.getId();
157                LOG.info("[" + jobId + "]: Update status to PREP");
158                bundleBean.setStatus(Job.Status.PREP);
159                try {
160                    new XConfiguration(new StringReader(bundleBean.getConf()));
161                }
162                catch (IOException e1) {
163                    LOG.warn("Configuration parse error. read from DB :" + bundleBean.getConf(), e1);
164                }
165                String output = bundleBean.getJobXml() + System.getProperty("line.separator");
166                return output;
167            }
168            else {
169                if (bundleBean.getKickoffTime() == null) {
170                    // If there is no KickOffTime, default kickoff is NOW.
171                    LOG.debug("Since kickoff time is not defined for job id " + jobId
172                            + ". Queuing and BundleStartXCommand immediately after submission");
173                    queue(new BundleStartXCommand(jobId));
174                }
175            }
176        }
177        catch (Exception ex) {
178            throw new CommandException(ErrorCode.E1310, ex.getMessage(), ex);
179        }
180        LOG.info("ENDED Bundle Submit");
181        return this.jobId;
182    }
183
184    /* (non-Javadoc)
185     * @see org.apache.oozie.command.TransitionXCommand#notifyParent()
186     */
187    @Override
188    public void notifyParent() throws CommandException {
189    }
190
191    /* (non-Javadoc)
192     * @see org.apache.oozie.command.XCommand#getEntityKey()
193     */
194    @Override
195    public String getEntityKey() {
196        return null;
197    }
198
199    /* (non-Javadoc)
200     * @see org.apache.oozie.command.XCommand#isLockRequired()
201     */
202    @Override
203    protected boolean isLockRequired() {
204        return false;
205    }
206
207    @Override
208    protected void loadState() throws CommandException {
209    }
210
211    @Override
212    protected void verifyPrecondition() throws CommandException, PreconditionException {
213    }
214
215    @Override
216    protected void eagerLoadState() throws CommandException {
217    }
218
219    @Override
220    protected void eagerVerifyPrecondition() throws CommandException, PreconditionException {
221        try {
222            mergeDefaultConfig();
223            String appXml = readAndValidateXml();
224            bundleBean.setOrigJobXml(appXml);
225            LOG.debug("jobXml after initial validation " + XmlUtils.prettyPrint(appXml).toString());
226        }
227        catch (BundleJobException ex) {
228            LOG.warn("BundleJobException:  ", ex);
229            throw new CommandException(ex);
230        }
231        catch (IllegalArgumentException iex) {
232            LOG.warn("IllegalArgumentException:  ", iex);
233            throw new CommandException(ErrorCode.E1310, iex.getMessage(), iex);
234        }
235        catch (Exception ex) {
236            LOG.warn("Exception:  ", ex);
237            throw new CommandException(ErrorCode.E1310, ex.getMessage(), ex);
238        }
239    }
240
241    /**
242     * Merge default configuration with user-defined configuration.
243     *
244     * @throws CommandException thrown if failed to merge configuration
245     */
246    protected void mergeDefaultConfig() throws CommandException {
247        Path configDefault = null;
248        try {
249            String bundleAppPathStr = conf.get(OozieClient.BUNDLE_APP_PATH);
250            Path bundleAppPath = new Path(bundleAppPathStr);
251            String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
252            HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
253            Configuration fsConf = has.createJobConf(bundleAppPath.toUri().getAuthority());
254            FileSystem fs = has.createFileSystem(user, bundleAppPath.toUri(), fsConf);
255
256            // app path could be a directory
257            if (!fs.isFile(bundleAppPath)) {
258                configDefault = new Path(bundleAppPath, CONFIG_DEFAULT);
259            } else {
260                configDefault = new Path(bundleAppPath.getParent(), CONFIG_DEFAULT);
261            }
262
263            if (fs.exists(configDefault)) {
264                Configuration defaultConf = new XConfiguration(fs.open(configDefault));
265                PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES);
266                XConfiguration.injectDefaults(defaultConf, conf);
267            }
268            else {
269                LOG.info("configDefault Doesn't exist " + configDefault);
270            }
271            PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES);
272        }
273        catch (IOException e) {
274            throw new CommandException(ErrorCode.E0702, e.getMessage() + " : Problem reading default config "
275                    + configDefault, e);
276        }
277        catch (HadoopAccessorException e) {
278            throw new CommandException(e);
279        }
280        LOG.debug("Merged CONF :" + XmlUtils.prettyPrint(conf).toString());
281    }
282
283    /**
284     * Read the application XML and validate against bundle Schema
285     *
286     * @return validated bundle XML
287     * @throws BundleJobException thrown if failed to read or validate xml
288     */
289    private String readAndValidateXml() throws BundleJobException {
290        String appPath = ParamChecker.notEmpty(conf.get(OozieClient.BUNDLE_APP_PATH), OozieClient.BUNDLE_APP_PATH);
291        String bundleXml = readDefinition(appPath);
292        validateXml(bundleXml);
293        return bundleXml;
294    }
295
296    /**
297     * Read bundle definition.
298     *
299     * @param appPath application path.
300     * @return bundle definition.
301     * @throws BundleJobException thrown if the definition could not be read.
302     */
303    protected String readDefinition(String appPath) throws BundleJobException {
304        String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
305        //Configuration confHadoop = CoordUtils.getHadoopConf(conf);
306        try {
307            URI uri = new URI(appPath);
308            LOG.debug("user =" + user);
309            HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
310            Configuration fsConf = has.createJobConf(uri.getAuthority());
311            FileSystem fs = has.createFileSystem(user, uri, fsConf);
312            Path appDefPath = null;
313
314            // app path could be a directory
315            Path path = new Path(uri.getPath());
316            if (!fs.isFile(path)) {
317                appDefPath = new Path(path, BUNDLE_XML_FILE);
318            } else {
319                appDefPath = path;
320            }
321
322            Reader reader = new InputStreamReader(fs.open(appDefPath));
323            StringWriter writer = new StringWriter();
324            IOUtils.copyCharStream(reader, writer);
325            return writer.toString();
326        }
327        catch (IOException ex) {
328            LOG.warn("IOException :" + XmlUtils.prettyPrint(conf), ex);
329            throw new BundleJobException(ErrorCode.E1301, ex.getMessage(), ex);
330        }
331        catch (URISyntaxException ex) {
332            LOG.warn("URISyException :" + ex.getMessage());
333            throw new BundleJobException(ErrorCode.E1302, appPath, ex.getMessage(), ex);
334        }
335        catch (HadoopAccessorException ex) {
336            throw new BundleJobException(ex);
337        }
338        catch (Exception ex) {
339            LOG.warn("Exception :", ex);
340            throw new BundleJobException(ErrorCode.E1301, ex.getMessage(), ex);
341        }
342    }
343
344    /**
345     * Validate against Bundle XSD file
346     *
347     * @param xmlContent input bundle xml
348     * @throws BundleJobException thrown if failed to validate xml
349     */
350    private void validateXml(String xmlContent) throws BundleJobException {
351        javax.xml.validation.Schema schema = Services.get().get(SchemaService.class).getSchema(SchemaName.BUNDLE);
352        Validator validator = schema.newValidator();
353        try {
354            validator.validate(new StreamSource(new StringReader(xmlContent)));
355        }
356        catch (SAXException ex) {
357            LOG.warn("SAXException :", ex);
358            throw new BundleJobException(ErrorCode.E0701, ex.getMessage(), ex);
359        }
360        catch (IOException ex) {
361            LOG.warn("IOException :", ex);
362            throw new BundleJobException(ErrorCode.E0702, ex.getMessage(), ex);
363        }
364    }
365
366    /**
367     * Write a Bundle Job into database
368     *
369     * @param Bundle job bean
370     * @return job id
371     * @throws CommandException thrown if failed to store bundle job bean to db
372     */
373    private String storeToDB(BundleJobBean bundleJob, String resolvedJobXml) throws CommandException {
374        try {
375            jobId = Services.get().get(UUIDService.class).generateId(ApplicationType.BUNDLE);
376
377            bundleJob.setId(jobId);
378            String name = XmlUtils.parseXml(bundleBean.getOrigJobXml()).getAttributeValue("name");
379            name = ELUtils.resolveAppName(name, conf);
380            bundleJob.setAppName(name);
381            bundleJob.setAppPath(conf.get(OozieClient.BUNDLE_APP_PATH));
382            // bundleJob.setStatus(BundleJob.Status.PREP); //This should be set in parent class.
383            bundleJob.setCreatedTime(new Date());
384            bundleJob.setUser(conf.get(OozieClient.USER_NAME));
385            String group = ConfigUtils.getWithDeprecatedCheck(conf, OozieClient.JOB_ACL, OozieClient.GROUP_NAME, null);
386            bundleJob.setGroup(group);
387            bundleJob.setConf(XmlUtils.prettyPrint(conf).toString());
388            bundleJob.setJobXml(resolvedJobXml);
389            Element jobElement = XmlUtils.parseXml(resolvedJobXml);
390            Element controlsElement = jobElement.getChild("controls", jobElement.getNamespace());
391            if (controlsElement != null) {
392                Element kickoffTimeElement = controlsElement.getChild("kick-off-time", jobElement.getNamespace());
393                if (kickoffTimeElement != null && !kickoffTimeElement.getValue().isEmpty()) {
394                    Date kickoffTime = DateUtils.parseDateOozieTZ(kickoffTimeElement.getValue());
395                    bundleJob.setKickoffTime(kickoffTime);
396                }
397            }
398            bundleJob.setLastModifiedTime(new Date());
399
400            if (!dryrun) {
401                BundleJobQueryExecutor.getInstance().insert(bundleJob);
402            }
403        }
404        catch (Exception ex) {
405            throw new CommandException(ErrorCode.E1301, ex.getMessage(), ex);
406        }
407        return jobId;
408    }
409
410    /* (non-Javadoc)
411     * @see org.apache.oozie.command.TransitionXCommand#getJob()
412     */
413    @Override
414    public Job getJob() {
415        return bundleBean;
416    }
417
418    public static ELEvaluator createELEvaluatorForGroup(Configuration conf, String group) {
419        ELEvaluator eval = Services.get().get(ELService.class).createEvaluator(group);
420        setConfigToEval(eval, conf);
421        return eval;
422    }
423
424    private static void setConfigToEval(ELEvaluator eval, Configuration conf) {
425        for (Map.Entry<String, String> entry : conf) {
426            eval.setVariable(entry.getKey(), entry.getValue().trim());
427        }
428    }
429
430    /**
431     * Resolve job xml with conf
432     *
433     * @param bundleXml bundle job xml
434     * @param conf job configuration
435     * @return resolved job xml
436     * @throws BundleJobException thrown if failed to resolve variables
437     */
438    private String resolvedVarsandFunctions(String bundleXml, Configuration conf) throws BundleJobException {
439        ELEvaluator eval;
440        try {
441            eval = createELEvaluatorForGroup(conf, "bundle-submit");
442            return eval.evaluate(bundleXml, String.class);
443        }
444        catch (Exception e) {
445            throw new BundleJobException(ErrorCode.E1004, e.getMessage(), e);
446        }
447    }
448
449    /**
450     * Create ELEvaluator
451     *
452     * @param conf job configuration
453     * @return ELEvaluator the evaluator for el function
454     * @throws BundleJobException thrown if failed to create evaluator
455     */
456    public ELEvaluator createEvaluator(Configuration conf) throws BundleJobException {
457        ELEvaluator eval;
458        ELEvaluator.Context context;
459        try {
460            context = new ELEvaluator.Context();
461            eval = new ELEvaluator(context);
462            for (Map.Entry<String, String> entry : conf) {
463                eval.setVariable(entry.getKey(), entry.getValue());
464            }
465        }
466        catch (Exception e) {
467            throw new BundleJobException(ErrorCode.E1004, e.getMessage(), e);
468        }
469        return eval;
470    }
471
472    /**
473     * Verify the uniqueness of coordinator names
474     *
475     * @param resolved job xml
476     * @throws CommandException thrown if failed to verify the uniqueness of coordinator names
477     */
478    @SuppressWarnings("unchecked")
479    private Void verifyCoordNameUnique(String resolvedJobXml) throws CommandException {
480        Set<String> set = new HashSet<String>();
481        try {
482            Element bAppXml = XmlUtils.parseXml(resolvedJobXml);
483            List<Element> coordElems = bAppXml.getChildren("coordinator", bAppXml.getNamespace());
484            for (Element elem : coordElems) {
485                Attribute name = elem.getAttribute("name");
486                if (name != null) {
487                    String coordName = name.getValue();
488                    try {
489                        coordName = ELUtils.resolveAppName(name.getValue(), conf);
490                    }
491                    catch (Exception e) {
492                        throw new CommandException(ErrorCode.E1321, e.getMessage(), e);
493                    }
494                    if (set.contains(coordName)) {
495                        throw new CommandException(ErrorCode.E1304, name);
496                    }
497                    set.add(coordName);
498                }
499                else {
500                    throw new CommandException(ErrorCode.E1305);
501                }
502            }
503        }
504        catch (JDOMException jex) {
505            throw new CommandException(ErrorCode.E1301, jex.getMessage(), jex);
506        }
507
508        return null;
509    }
510
511    /* (non-Javadoc)
512     * @see org.apache.oozie.command.TransitionXCommand#updateJob()
513     */
514    @Override
515    public void updateJob() throws CommandException {
516    }
517
518    @Override
519    public void performWrites() throws CommandException {
520    }
521}