001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.command.wf;
020
021import org.apache.hadoop.conf.Configuration;
022import org.apache.hadoop.fs.Path;
023import org.apache.hadoop.fs.FileSystem;
024import org.apache.oozie.AppType;
025import org.apache.oozie.SLAEventBean;
026import org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext;
027import org.apache.oozie.WorkflowJobBean;
028import org.apache.oozie.ErrorCode;
029import org.apache.oozie.action.oozie.SubWorkflowActionExecutor;
030import org.apache.oozie.service.HadoopAccessorException;
031import org.apache.oozie.service.JPAService;
032import org.apache.oozie.service.UUIDService;
033import org.apache.oozie.service.WorkflowStoreService;
034import org.apache.oozie.service.WorkflowAppService;
035import org.apache.oozie.service.HadoopAccessorService;
036import org.apache.oozie.service.Services;
037import org.apache.oozie.service.DagXLogInfoService;
038import org.apache.oozie.util.ELUtils;
039import org.apache.oozie.util.LogUtils;
040import org.apache.oozie.sla.SLAOperations;
041import org.apache.oozie.util.XLog;
042import org.apache.oozie.util.ParamChecker;
043import org.apache.oozie.util.XConfiguration;
044import org.apache.oozie.util.XmlUtils;
045import org.apache.oozie.command.CommandException;
046import org.apache.oozie.executor.jpa.BatchQueryExecutor;
047import org.apache.oozie.executor.jpa.JPAExecutorException;
048import org.apache.oozie.service.ELService;
049import org.apache.oozie.store.StoreException;
050import org.apache.oozie.workflow.WorkflowApp;
051import org.apache.oozie.workflow.WorkflowException;
052import org.apache.oozie.workflow.WorkflowInstance;
053import org.apache.oozie.workflow.WorkflowLib;
054import org.apache.oozie.util.ELEvaluator;
055import org.apache.oozie.util.InstrumentUtils;
056import org.apache.oozie.util.PropertiesUtils;
057import org.apache.oozie.util.db.SLADbOperations;
058import org.apache.oozie.service.SchemaService.SchemaName;
059import org.apache.oozie.client.OozieClient;
060import org.apache.oozie.client.WorkflowJob;
061import org.apache.oozie.client.SLAEvent.SlaAppType;
062import org.apache.oozie.client.rest.JsonBean;
063import org.jdom.Element;
064import org.jdom.filter.ElementFilter;
065
066import java.util.ArrayList;
067import java.util.Date;
068import java.util.Iterator;
069import java.util.List;
070import java.util.Map;
071import java.util.Set;
072import java.util.HashSet;
073import java.io.IOException;
074import java.net.URI;
075
076@SuppressWarnings("deprecation")
077public class SubmitXCommand extends WorkflowXCommand<String> {
078    public static final String CONFIG_DEFAULT = "config-default.xml";
079
080    private Configuration conf;
081    private List<JsonBean> insertList = new ArrayList<JsonBean>();
082    private String parentId;
083
084    /**
085     * Constructor to create the workflow Submit Command.
086     *
087     * @param conf : Configuration for workflow job
088     */
089    public SubmitXCommand(Configuration conf) {
090        super("submit", "submit", 1);
091        this.conf = ParamChecker.notNull(conf, "conf");
092    }
093
094    /**
095     * Constructor for submitting wf through coordinator
096     *
097     * @param conf : Configuration for workflow job
098     * @param parentId: the coord action id
099     */
100    public SubmitXCommand(Configuration conf, String parentId) {
101        this(conf);
102        this.parentId = parentId;
103    }
104
105    /**
106     * Constructor to create the workflow Submit Command.
107     *
108     * @param dryrun : if dryrun
109     * @param conf : Configuration for workflow job
110     */
111    public SubmitXCommand(boolean dryrun, Configuration conf) {
112        this(conf);
113        this.dryrun = dryrun;
114    }
115
116    private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>();
117    private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>();
118
119    static {
120        String[] badUserProps = {PropertiesUtils.DAYS, PropertiesUtils.HOURS, PropertiesUtils.MINUTES,
121                PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB, PropertiesUtils.TB, PropertiesUtils.PB,
122                PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN, PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN,
123                PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS};
124        PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES);
125
126        String[] badDefaultProps = {PropertiesUtils.HADOOP_USER};
127        PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES);
128        PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES);
129    }
130
131    @Override
132    protected String execute() throws CommandException {
133        InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
134        WorkflowAppService wps = Services.get().get(WorkflowAppService.class);
135        try {
136            XLog.Info.get().setParameter(DagXLogInfoService.TOKEN, conf.get(OozieClient.LOG_TOKEN));
137            String user = conf.get(OozieClient.USER_NAME);
138            URI uri = new URI(conf.get(OozieClient.APP_PATH));
139            HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
140            Configuration fsConf = has.createJobConf(uri.getAuthority());
141            FileSystem fs = has.createFileSystem(user, uri, fsConf);
142
143            Path configDefault = null;
144            Configuration defaultConf = null;
145            // app path could be a directory
146            Path path = new Path(uri.getPath());
147            if (!fs.isFile(path)) {
148                configDefault = new Path(path, CONFIG_DEFAULT);
149            } else {
150                configDefault = new Path(path.getParent(), CONFIG_DEFAULT);
151            }
152
153            if (fs.exists(configDefault)) {
154                try {
155                    defaultConf = new XConfiguration(fs.open(configDefault));
156                    PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES);
157                    XConfiguration.injectDefaults(defaultConf, conf);
158                }
159                catch (IOException ex) {
160                    throw new IOException("default configuration file, " + ex.getMessage(), ex);
161                }
162            }
163            if (defaultConf != null) {
164                defaultConf = resolveDefaultConfVariables(defaultConf);
165            }
166
167            WorkflowApp app = wps.parseDef(conf, defaultConf);
168            XConfiguration protoActionConf = wps.createProtoActionConf(conf, true);
169            WorkflowLib workflowLib = Services.get().get(WorkflowStoreService.class).getWorkflowLibWithNoDB();
170
171            PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES);
172
173            // Resolving all variables in the job properties.
174            // This ensures the Hadoop Configuration semantics is preserved.
175            XConfiguration resolvedVarsConf = new XConfiguration();
176            for (Map.Entry<String, String> entry : conf) {
177                resolvedVarsConf.set(entry.getKey(), conf.get(entry.getKey()));
178            }
179            conf = resolvedVarsConf;
180
181            WorkflowInstance wfInstance;
182            try {
183                wfInstance = workflowLib.createInstance(app, conf);
184            }
185            catch (WorkflowException e) {
186                throw new StoreException(e);
187            }
188
189            Configuration conf = wfInstance.getConf();
190            // System.out.println("WF INSTANCE CONF:");
191            // System.out.println(XmlUtils.prettyPrint(conf).toString());
192
193            WorkflowJobBean workflow = new WorkflowJobBean();
194            workflow.setId(wfInstance.getId());
195            workflow.setAppName(ELUtils.resolveAppName(app.getName(), conf));
196            workflow.setAppPath(conf.get(OozieClient.APP_PATH));
197            workflow.setConf(XmlUtils.prettyPrint(conf).toString());
198            workflow.setProtoActionConf(protoActionConf.toXmlString());
199            workflow.setCreatedTime(new Date());
200            workflow.setLastModifiedTime(new Date());
201            workflow.setLogToken(conf.get(OozieClient.LOG_TOKEN, ""));
202            workflow.setStatus(WorkflowJob.Status.PREP);
203            workflow.setRun(0);
204            workflow.setUser(conf.get(OozieClient.USER_NAME));
205            workflow.setGroup(conf.get(OozieClient.GROUP_NAME));
206            workflow.setWorkflowInstance(wfInstance);
207            workflow.setExternalId(conf.get(OozieClient.EXTERNAL_ID));
208            // Set parent id if it doesn't already have one (for subworkflows)
209            if (workflow.getParentId() == null) {
210                workflow.setParentId(conf.get(SubWorkflowActionExecutor.PARENT_ID));
211            }
212            // Set to coord action Id if workflow submitted through coordinator
213            if (workflow.getParentId() == null) {
214                workflow.setParentId(parentId);
215            }
216
217            LogUtils.setLogInfo(workflow);
218            LOG.debug("Workflow record created, Status [{0}]", workflow.getStatus());
219            Element wfElem = XmlUtils.parseXml(app.getDefinition());
220            ELEvaluator evalSla = createELEvaluatorForGroup(conf, "wf-sla-submit");
221            String jobSlaXml = verifySlaElements(wfElem, evalSla);
222            if (!dryrun) {
223                writeSLARegistration(wfElem, jobSlaXml, workflow.getId(), workflow.getParentId(), workflow.getUser(),
224                        workflow.getGroup(), workflow.getAppName(), LOG, evalSla);
225                workflow.setSlaXml(jobSlaXml);
226                // System.out.println("SlaXml :"+ slaXml);
227
228                //store.insertWorkflow(workflow);
229                insertList.add(workflow);
230                JPAService jpaService = Services.get().get(JPAService.class);
231                if (jpaService != null) {
232                    try {
233                        BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, null, null);
234                    }
235                    catch (JPAExecutorException je) {
236                        throw new CommandException(je);
237                    }
238                }
239                else {
240                    LOG.error(ErrorCode.E0610);
241                    return null;
242                }
243
244                return workflow.getId();
245            }
246            else {
247                // Checking variable substitution for dryrun
248                ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(workflow, null, false, false);
249                Element workflowXml = XmlUtils.parseXml(app.getDefinition());
250                removeSlaElements(workflowXml);
251                String workflowXmlString = XmlUtils.removeComments(XmlUtils.prettyPrint(workflowXml).toString());
252                workflowXmlString = context.getELEvaluator().evaluate(workflowXmlString, String.class);
253                workflowXml = XmlUtils.parseXml(workflowXmlString);
254
255                Iterator<Element> it = workflowXml.getDescendants(new ElementFilter("job-xml"));
256
257                // Checking all variable substitutions in job-xml files
258                while (it.hasNext()) {
259                    Element e = it.next();
260                    String jobXml = e.getTextTrim();
261                    Path xmlPath = new Path(workflow.getAppPath(), jobXml);
262                    Configuration jobXmlConf = new XConfiguration(fs.open(xmlPath));
263
264
265                    String jobXmlConfString = XmlUtils.prettyPrint(jobXmlConf).toString();
266                    jobXmlConfString = XmlUtils.removeComments(jobXmlConfString);
267                    context.getELEvaluator().evaluate(jobXmlConfString, String.class);
268                }
269
270                return "OK";
271            }
272        }
273        catch (WorkflowException ex) {
274            throw new CommandException(ex);
275        }
276        catch (HadoopAccessorException ex) {
277            throw new CommandException(ex);
278        }
279        catch (Exception ex) {
280            throw new CommandException(ErrorCode.E0803, ex.getMessage(), ex);
281        }
282    }
283
284    /**
285     * Resolving variables from config-default, which might be referencing into conf/defaultConf
286     * @param defaultConf config-default.xml
287     * @return resolved config-default configuration.
288     */
289    private Configuration resolveDefaultConfVariables(Configuration defaultConf) {
290        XConfiguration resolveDefaultConf = new XConfiguration();
291        for (Map.Entry<String, String> entry : defaultConf) {
292            String defaultConfKey = entry.getKey();
293            String defaultConfValue = entry.getValue();
294            // if value is referencing some other key, first check within the default config to resolve,
295            // then job.properties (conf)
296            if (defaultConfValue.contains("$") && defaultConf.get(defaultConfKey).contains("$")) {
297                resolveDefaultConf.set(defaultConfKey, conf.get(defaultConfKey));
298            } else {
299                resolveDefaultConf.set(defaultConfKey, defaultConf.get(defaultConfKey));
300            }
301        }
302        return resolveDefaultConf;
303    }
304
305    private void removeSlaElements(Element eWfJob) {
306        Element sla = XmlUtils.getSLAElement(eWfJob);
307        if (sla != null) {
308            eWfJob.removeChildren(sla.getName(), sla.getNamespace());
309        }
310
311        for (Element action : (List<Element>) eWfJob.getChildren("action", eWfJob.getNamespace())) {
312            sla = XmlUtils.getSLAElement(action);
313            if (sla != null) {
314                action.removeChildren(sla.getName(), sla.getNamespace());
315            }
316        }
317    }
318    private String verifySlaElements(Element eWfJob, ELEvaluator evalSla) throws CommandException {
319        String jobSlaXml = "";
320        // Validate WF job
321        Element eSla = XmlUtils.getSLAElement(eWfJob);
322        if (eSla != null) {
323            jobSlaXml = resolveSla(eSla, evalSla);
324        }
325
326        // Validate all actions
327        for (Element action : (List<Element>) eWfJob.getChildren("action", eWfJob.getNamespace())) {
328            eSla = XmlUtils.getSLAElement(action);
329            if (eSla != null) {
330                resolveSla(eSla, evalSla);
331            }
332        }
333        return jobSlaXml;
334    }
335
336    private void writeSLARegistration(Element eWfJob, String slaXml, String jobId, String parentId, String user,
337            String group, String appName, XLog log, ELEvaluator evalSla) throws CommandException {
338        try {
339            if (slaXml != null && slaXml.length() > 0) {
340                Element eSla = XmlUtils.parseXml(slaXml);
341                SLAEventBean slaEvent = SLADbOperations.createSlaRegistrationEvent(eSla, jobId,
342                        SlaAppType.WORKFLOW_JOB, user, group, log);
343                if(slaEvent != null) {
344                    insertList.add(slaEvent);
345                }
346                // insert into new table
347                SLAOperations.createSlaRegistrationEvent(eSla, jobId, parentId, AppType.WORKFLOW_JOB, user, appName,
348                        log, false);
349            }
350            // Add sla for wf actions
351            for (Element action : (List<Element>) eWfJob.getChildren("action", eWfJob.getNamespace())) {
352                Element actionSla = XmlUtils.getSLAElement(action);
353                if (actionSla != null) {
354                    String actionSlaXml = SubmitXCommand.resolveSla(actionSla, evalSla);
355                    actionSla = XmlUtils.parseXml(actionSlaXml);
356                    String actionId = Services.get().get(UUIDService.class)
357                            .generateChildId(jobId, action.getAttributeValue("name") + "");
358                    SLAOperations.createSlaRegistrationEvent(actionSla, actionId, jobId, AppType.WORKFLOW_ACTION,
359                            user, appName, log, false);
360                }
361            }
362        }
363        catch (Exception e) {
364            e.printStackTrace();
365            throw new CommandException(ErrorCode.E1007, "workflow " + jobId, e.getMessage(), e);
366        }
367    }
368
369    /**
370     * Resolve variables in sla xml element.
371     *
372     * @param eSla sla xml element
373     * @param evalSla sla evaluator
374     * @return sla xml string after evaluation
375     * @throws CommandException
376     */
377    public static String resolveSla(Element eSla, ELEvaluator evalSla) throws CommandException {
378        // EL evaluation
379        String slaXml = XmlUtils.prettyPrint(eSla).toString();
380        try {
381            slaXml = XmlUtils.removeComments(slaXml);
382            slaXml = evalSla.evaluate(slaXml, String.class);
383            XmlUtils.validateData(slaXml, SchemaName.SLA_ORIGINAL);
384            return slaXml;
385        }
386        catch (Exception e) {
387            throw new CommandException(ErrorCode.E1004, "Validation error :" + e.getMessage(), e);
388        }
389    }
390
391    /**
392     * Create an EL evaluator for a given group.
393     *
394     * @param conf configuration variable
395     * @param group group variable
396     * @return the evaluator created for the group
397     */
398    public static ELEvaluator createELEvaluatorForGroup(Configuration conf, String group) {
399        ELEvaluator eval = Services.get().get(ELService.class).createEvaluator(group);
400        for (Map.Entry<String, String> entry : conf) {
401            eval.setVariable(entry.getKey(), entry.getValue());
402        }
403        return eval;
404    }
405
406    @Override
407    public String getEntityKey() {
408        return null;
409    }
410
411    @Override
412    protected boolean isLockRequired() {
413        return false;
414    }
415
416    @Override
417    protected void loadState() {
418
419    }
420
421    @Override
422    protected void verifyPrecondition() throws CommandException {
423
424    }
425
426}