001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.wf;
019    
020    import org.apache.hadoop.conf.Configuration;
021    import org.apache.hadoop.fs.Path;
022    import org.apache.hadoop.fs.FileSystem;
023    import org.apache.oozie.AppType;
024    import org.apache.oozie.SLAEventBean;
025    import org.apache.oozie.WorkflowJobBean;
026    import org.apache.oozie.ErrorCode;
027    import org.apache.oozie.action.oozie.SubWorkflowActionExecutor;
028    import org.apache.oozie.service.HadoopAccessorException;
029    import org.apache.oozie.service.JPAService;
030    import org.apache.oozie.service.UUIDService;
031    import org.apache.oozie.service.WorkflowStoreService;
032    import org.apache.oozie.service.WorkflowAppService;
033    import org.apache.oozie.service.HadoopAccessorService;
034    import org.apache.oozie.service.Services;
035    import org.apache.oozie.service.DagXLogInfoService;
036    import org.apache.oozie.util.ConfigUtils;
037    import org.apache.oozie.util.ELUtils;
038    import org.apache.oozie.util.LogUtils;
039    import org.apache.oozie.sla.SLAOperations;
040    import org.apache.oozie.util.XLog;
041    import org.apache.oozie.util.ParamChecker;
042    import org.apache.oozie.util.XConfiguration;
043    import org.apache.oozie.util.XmlUtils;
044    import org.apache.oozie.command.CommandException;
045    import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
046    import org.apache.oozie.executor.jpa.JPAExecutorException;
047    import org.apache.oozie.service.ELService;
048    import org.apache.oozie.store.StoreException;
049    import org.apache.oozie.workflow.WorkflowApp;
050    import org.apache.oozie.workflow.WorkflowException;
051    import org.apache.oozie.workflow.WorkflowInstance;
052    import org.apache.oozie.workflow.WorkflowLib;
053    import org.apache.oozie.util.ELEvaluator;
054    import org.apache.oozie.util.InstrumentUtils;
055    import org.apache.oozie.util.PropertiesUtils;
056    import org.apache.oozie.util.db.SLADbOperations;
057    import org.apache.oozie.service.SchemaService.SchemaName;
058    import org.apache.oozie.client.OozieClient;
059    import org.apache.oozie.client.WorkflowJob;
060    import org.apache.oozie.client.SLAEvent.SlaAppType;
061    import org.apache.oozie.client.rest.JsonBean;
062    import org.jdom.Element;
063    import java.util.ArrayList;
064    import java.util.Date;
065    import java.util.List;
066    import java.util.Map;
067    import java.util.Set;
068    import java.util.HashSet;
069    import java.io.IOException;
070    import java.net.URI;
071    
072    @SuppressWarnings("deprecation")
073    public class SubmitXCommand extends WorkflowXCommand<String> {
074        public static final String CONFIG_DEFAULT = "config-default.xml";
075    
076        private Configuration conf;
077        private List<JsonBean> insertList = new ArrayList<JsonBean>();
078        private String parentId;
079    
080        /**
081         * Constructor to create the workflow Submit Command.
082         *
083         * @param conf : Configuration for workflow job
084         */
085        public SubmitXCommand(Configuration conf) {
086            super("submit", "submit", 1);
087            this.conf = ParamChecker.notNull(conf, "conf");
088        }
089    
090        /**
091         * Constructor for submitting wf through coordinator
092         *
093         * @param conf : Configuration for workflow job
094         * @param parentId: the coord action id
095         */
096        public SubmitXCommand(Configuration conf, String parentId) {
097            this(conf);
098            this.parentId = parentId;
099        }
100    
101        /**
102         * Constructor to create the workflow Submit Command.
103         *
104         * @param dryrun : if dryrun
105         * @param conf : Configuration for workflow job
106         * @param authToken : To be used for authentication
107         */
108        public SubmitXCommand(boolean dryrun, Configuration conf) {
109            this(conf);
110            this.dryrun = dryrun;
111        }
112    
113        private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>();
114        private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>();
115    
116        static {
117            String[] badUserProps = {PropertiesUtils.DAYS, PropertiesUtils.HOURS, PropertiesUtils.MINUTES,
118                    PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB, PropertiesUtils.TB, PropertiesUtils.PB,
119                    PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN, PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN,
120                    PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS};
121            PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES);
122    
123            String[] badDefaultProps = {PropertiesUtils.HADOOP_USER};
124            PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES);
125            PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES);
126        }
127    
128        @Override
129        protected String execute() throws CommandException {
130            InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
131            WorkflowAppService wps = Services.get().get(WorkflowAppService.class);
132            try {
133                XLog.Info.get().setParameter(DagXLogInfoService.TOKEN, conf.get(OozieClient.LOG_TOKEN));
134                WorkflowApp app = wps.parseDef(conf);
135                XConfiguration protoActionConf = wps.createProtoActionConf(conf, true);
136                WorkflowLib workflowLib = Services.get().get(WorkflowStoreService.class).getWorkflowLibWithNoDB();
137    
138                String user = conf.get(OozieClient.USER_NAME);
139                String group = ConfigUtils.getWithDeprecatedCheck(conf, OozieClient.JOB_ACL, OozieClient.GROUP_NAME, null);
140                URI uri = new URI(conf.get(OozieClient.APP_PATH));
141                HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
142                Configuration fsConf = has.createJobConf(uri.getAuthority());
143                FileSystem fs = has.createFileSystem(user, uri, fsConf);
144    
145                Path configDefault = null;
146                // app path could be a directory
147                Path path = new Path(uri.getPath());
148                if (!fs.isFile(path)) {
149                    configDefault = new Path(path, CONFIG_DEFAULT);
150                } else {
151                    configDefault = new Path(path.getParent(), CONFIG_DEFAULT);
152                }
153    
154                if (fs.exists(configDefault)) {
155                    try {
156                        Configuration defaultConf = new XConfiguration(fs.open(configDefault));
157                        PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES);
158                        XConfiguration.injectDefaults(defaultConf, conf);
159                    }
160                    catch (IOException ex) {
161                        throw new IOException("default configuration file, " + ex.getMessage(), ex);
162                    }
163                }
164    
165                PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES);
166    
167                // Resolving all variables in the job properties.
168                // This ensures the Hadoop Configuration semantics is preserved.
169                XConfiguration resolvedVarsConf = new XConfiguration();
170                for (Map.Entry<String, String> entry : conf) {
171                    resolvedVarsConf.set(entry.getKey(), conf.get(entry.getKey()));
172                }
173                conf = resolvedVarsConf;
174    
175                WorkflowInstance wfInstance;
176                try {
177                    wfInstance = workflowLib.createInstance(app, conf);
178                }
179                catch (WorkflowException e) {
180                    throw new StoreException(e);
181                }
182    
183                Configuration conf = wfInstance.getConf();
184                // System.out.println("WF INSTANCE CONF:");
185                // System.out.println(XmlUtils.prettyPrint(conf).toString());
186    
187                WorkflowJobBean workflow = new WorkflowJobBean();
188                workflow.setId(wfInstance.getId());
189                workflow.setAppName(ELUtils.resolveAppName(app.getName(), conf));
190                workflow.setAppPath(conf.get(OozieClient.APP_PATH));
191                workflow.setConf(XmlUtils.prettyPrint(conf).toString());
192                workflow.setProtoActionConf(protoActionConf.toXmlString());
193                workflow.setCreatedTime(new Date());
194                workflow.setLastModifiedTime(new Date());
195                workflow.setLogToken(conf.get(OozieClient.LOG_TOKEN, ""));
196                workflow.setStatus(WorkflowJob.Status.PREP);
197                workflow.setRun(0);
198                workflow.setUser(conf.get(OozieClient.USER_NAME));
199                workflow.setGroup(conf.get(OozieClient.GROUP_NAME));
200                workflow.setWorkflowInstance(wfInstance);
201                workflow.setExternalId(conf.get(OozieClient.EXTERNAL_ID));
202                // Set parent id if it doesn't already have one (for subworkflows)
203                if (workflow.getParentId() == null) {
204                    workflow.setParentId(conf.get(SubWorkflowActionExecutor.PARENT_ID));
205                }
206                // Set to coord action Id if workflow submitted through coordinator
207                if (workflow.getParentId() == null) {
208                    workflow.setParentId(parentId);
209                }
210    
211                LogUtils.setLogInfo(workflow, logInfo);
212                LOG = XLog.resetPrefix(LOG);
213                LOG.debug("Workflow record created, Status [{0}]", workflow.getStatus());
214                Element wfElem = XmlUtils.parseXml(app.getDefinition());
215                ELEvaluator evalSla = createELEvaluatorForGroup(conf, "wf-sla-submit");
216                String jobSlaXml = verifySlaElements(wfElem, evalSla);
217                if (!dryrun) {
218                    writeSLARegistration(wfElem, jobSlaXml, workflow.getId(), workflow.getParentId(), workflow.getUser(),
219                            workflow.getGroup(), workflow.getAppName(), LOG, evalSla);
220                    workflow.setSlaXml(jobSlaXml);
221                    // System.out.println("SlaXml :"+ slaXml);
222    
223                    //store.insertWorkflow(workflow);
224                    insertList.add(workflow);
225                    JPAService jpaService = Services.get().get(JPAService.class);
226                    if (jpaService != null) {
227                        try {
228                            jpaService.execute(new BulkUpdateInsertJPAExecutor(null, insertList));
229                        }
230                        catch (JPAExecutorException je) {
231                            throw new CommandException(je);
232                        }
233                    }
234                    else {
235                        LOG.error(ErrorCode.E0610);
236                        return null;
237                    }
238    
239                    return workflow.getId();
240                }
241                else {
242                    return "OK";
243                }
244            }
245            catch (WorkflowException ex) {
246                throw new CommandException(ex);
247            }
248            catch (HadoopAccessorException ex) {
249                throw new CommandException(ex);
250            }
251            catch (Exception ex) {
252                throw new CommandException(ErrorCode.E0803, ex.getMessage(), ex);
253            }
254        }
255    
256        private String verifySlaElements(Element eWfJob, ELEvaluator evalSla) throws CommandException {
257            String jobSlaXml = "";
258            // Validate WF job
259            Element eSla = XmlUtils.getSLAElement(eWfJob);
260            if (eSla != null) {
261                jobSlaXml = resolveSla(eSla, evalSla);
262            }
263    
264            // Validate all actions
265            for (Element action : (List<Element>) eWfJob.getChildren("action", eWfJob.getNamespace())) {
266                eSla = XmlUtils.getSLAElement(eWfJob);
267                if (eSla != null) {
268                    resolveSla(eSla, evalSla);
269                }
270            }
271            return jobSlaXml;
272        }
273    
274        private void writeSLARegistration(Element eWfJob, String slaXml, String jobId, String parentId, String user,
275                String group, String appName, XLog log, ELEvaluator evalSla) throws CommandException {
276            try {
277                if (slaXml != null && slaXml.length() > 0) {
278                    Element eSla = XmlUtils.parseXml(slaXml);
279                    SLAEventBean slaEvent = SLADbOperations.createSlaRegistrationEvent(eSla, jobId,
280                            SlaAppType.WORKFLOW_JOB, user, group, log);
281                    if(slaEvent != null) {
282                        insertList.add(slaEvent);
283                    }
284                    // insert into new table
285                    SLAOperations.createSlaRegistrationEvent(eSla, jobId, parentId, AppType.WORKFLOW_JOB, user, appName,
286                            log, false);
287                }
288                // Add sla for wf actions
289                for (Element action : (List<Element>) eWfJob.getChildren("action", eWfJob.getNamespace())) {
290                    Element actionSla = XmlUtils.getSLAElement(action);
291                    if (actionSla != null) {
292                        String actionSlaXml = SubmitXCommand.resolveSla(actionSla, evalSla);
293                        actionSla = XmlUtils.parseXml(actionSlaXml);
294                        String actionId = Services.get().get(UUIDService.class)
295                                .generateChildId(jobId, action.getAttributeValue("name") + "");
296                        SLAOperations.createSlaRegistrationEvent(actionSla, actionId, jobId, AppType.WORKFLOW_ACTION,
297                                user, appName, log, false);
298                    }
299                }
300            }
301            catch (Exception e) {
302                e.printStackTrace();
303                throw new CommandException(ErrorCode.E1007, "workflow " + jobId, e.getMessage(), e);
304            }
305        }
306    
307        /**
308         * Resolve variables in sla xml element.
309         *
310         * @param eSla sla xml element
311         * @param evalSla sla evaluator
312         * @return sla xml string after evaluation
313         * @throws CommandException
314         */
315        public static String resolveSla(Element eSla, ELEvaluator evalSla) throws CommandException {
316            // EL evaluation
317            String slaXml = XmlUtils.prettyPrint(eSla).toString();
318            try {
319                slaXml = XmlUtils.removeComments(slaXml);
320                slaXml = evalSla.evaluate(slaXml, String.class);
321                XmlUtils.validateData(slaXml, SchemaName.SLA_ORIGINAL);
322                return slaXml;
323            }
324            catch (Exception e) {
325                throw new CommandException(ErrorCode.E1004, "Validation error :" + e.getMessage(), e);
326            }
327        }
328    
329        /**
330         * Create an EL evaluator for a given group.
331         *
332         * @param conf configuration variable
333         * @param group group variable
334         * @return the evaluator created for the group
335         */
336        public static ELEvaluator createELEvaluatorForGroup(Configuration conf, String group) {
337            ELEvaluator eval = Services.get().get(ELService.class).createEvaluator(group);
338            for (Map.Entry<String, String> entry : conf) {
339                eval.setVariable(entry.getKey(), entry.getValue());
340            }
341            return eval;
342        }
343    
344        @Override
345        public String getEntityKey() {
346            return null;
347        }
348    
349        @Override
350        protected boolean isLockRequired() {
351            return false;
352        }
353    
354        @Override
355        protected void loadState() {
356    
357        }
358    
359        @Override
360        protected void verifyPrecondition() throws CommandException {
361    
362        }
363    
364    }