001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.wf;
019    
020    import org.apache.hadoop.conf.Configuration;
021    import org.apache.hadoop.fs.Path;
022    import org.apache.hadoop.fs.FileSystem;
023    import org.apache.oozie.SLAEventBean;
024    import org.apache.oozie.WorkflowJobBean;
025    import org.apache.oozie.ErrorCode;
026    import org.apache.oozie.service.HadoopAccessorException;
027    import org.apache.oozie.service.JPAService;
028    import org.apache.oozie.service.WorkflowStoreService;
029    import org.apache.oozie.service.WorkflowAppService;
030    import org.apache.oozie.service.HadoopAccessorService;
031    import org.apache.oozie.service.Services;
032    import org.apache.oozie.service.DagXLogInfoService;
033    import org.apache.oozie.util.ConfigUtils;
034    import org.apache.oozie.util.ELUtils;
035    import org.apache.oozie.util.LogUtils;
036    import org.apache.oozie.util.XLog;
037    import org.apache.oozie.util.ParamChecker;
038    import org.apache.oozie.util.XConfiguration;
039    import org.apache.oozie.util.XmlUtils;
040    import org.apache.oozie.command.CommandException;
041    import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
042    import org.apache.oozie.executor.jpa.JPAExecutorException;
043    import org.apache.oozie.service.ELService;
044    import org.apache.oozie.service.SchemaService;
045    import org.apache.oozie.store.StoreException;
046    import org.apache.oozie.workflow.WorkflowApp;
047    import org.apache.oozie.workflow.WorkflowException;
048    import org.apache.oozie.workflow.WorkflowInstance;
049    import org.apache.oozie.workflow.WorkflowLib;
050    import org.apache.oozie.util.ELEvaluator;
051    import org.apache.oozie.util.InstrumentUtils;
052    import org.apache.oozie.util.PropertiesUtils;
053    import org.apache.oozie.util.db.SLADbOperations;
054    import org.apache.oozie.service.SchemaService.SchemaName;
055    import org.apache.oozie.client.OozieClient;
056    import org.apache.oozie.client.WorkflowJob;
057    import org.apache.oozie.client.SLAEvent.SlaAppType;
058    import org.apache.oozie.client.rest.JsonBean;
059    import org.jdom.Element;
060    import org.jdom.Namespace;
061    
062    import java.util.ArrayList;
063    import java.util.Date;
064    import java.util.List;
065    import java.util.Map;
066    import java.util.Set;
067    import java.util.HashSet;
068    import java.io.IOException;
069    import java.net.URI;
070    
071    public class SubmitXCommand extends WorkflowXCommand<String> {
072        public static final String CONFIG_DEFAULT = "config-default.xml";
073    
074        private Configuration conf;
075        private String authToken;
076        private List<JsonBean> insertList = new ArrayList<JsonBean>();
077    
078        /**
079         * Constructor to create the workflow Submit Command.
080         *
081         * @param conf : Configuration for workflow job
082         * @param authToken : To be used for authentication
083         */
084        public SubmitXCommand(Configuration conf, String authToken) {
085            super("submit", "submit", 1);
086            this.conf = ParamChecker.notNull(conf, "conf");
087            this.authToken = ParamChecker.notEmpty(authToken, "authToken");
088        }
089    
090        /**
091         * Constructor to create the workflow Submit Command.
092         *
093         * @param dryrun : if dryrun
094         * @param conf : Configuration for workflow job
095         * @param authToken : To be used for authentication
096         */
097        public SubmitXCommand(boolean dryrun, Configuration conf, String authToken) {
098            this(conf, authToken);
099            this.dryrun = dryrun;
100        }
101    
102        private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>();
103        private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>();
104    
105        static {
106            String[] badUserProps = {PropertiesUtils.DAYS, PropertiesUtils.HOURS, PropertiesUtils.MINUTES,
107                    PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB, PropertiesUtils.TB, PropertiesUtils.PB,
108                    PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN, PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN,
109                    PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS};
110            PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES);
111    
112            String[] badDefaultProps = {PropertiesUtils.HADOOP_USER};
113            PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES);
114            PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES);
115        }
116    
117        @Override
118        protected String execute() throws CommandException {
119            InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
120            WorkflowAppService wps = Services.get().get(WorkflowAppService.class);
121            try {
122                XLog.Info.get().setParameter(DagXLogInfoService.TOKEN, conf.get(OozieClient.LOG_TOKEN));
123                WorkflowApp app = wps.parseDef(conf, authToken);
124                XConfiguration protoActionConf = wps.createProtoActionConf(conf, authToken, true);
125                WorkflowLib workflowLib = Services.get().get(WorkflowStoreService.class).getWorkflowLibWithNoDB();
126    
127                String user = conf.get(OozieClient.USER_NAME);
128                String group = ConfigUtils.getWithDeprecatedCheck(conf, OozieClient.JOB_ACL, OozieClient.GROUP_NAME, null);
129                URI uri = new URI(conf.get(OozieClient.APP_PATH));
130                HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
131                Configuration fsConf = has.createJobConf(uri.getAuthority());
132                FileSystem fs = has.createFileSystem(user, uri, fsConf);
133    
134                Path configDefault = null;
135                // app path could be a directory
136                Path path = new Path(uri.getPath());
137                if (!fs.isFile(path)) {
138                    configDefault = new Path(path, CONFIG_DEFAULT);
139                } else {
140                    configDefault = new Path(path.getParent(), CONFIG_DEFAULT);
141                }
142    
143                if (fs.exists(configDefault)) {
144                    try {
145                        Configuration defaultConf = new XConfiguration(fs.open(configDefault));
146                        PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES);
147                        XConfiguration.injectDefaults(defaultConf, conf);
148                    }
149                    catch (IOException ex) {
150                        throw new IOException("default configuration file, " + ex.getMessage(), ex);
151                    }
152                }
153    
154                PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES);
155    
156                // Resolving all variables in the job properties.
157                // This ensures the Hadoop Configuration semantics is preserved.
158                XConfiguration resolvedVarsConf = new XConfiguration();
159                for (Map.Entry<String, String> entry : conf) {
160                    resolvedVarsConf.set(entry.getKey(), conf.get(entry.getKey()));
161                }
162                conf = resolvedVarsConf;
163    
164                WorkflowInstance wfInstance;
165                try {
166                    wfInstance = workflowLib.createInstance(app, conf);
167                }
168                catch (WorkflowException e) {
169                    throw new StoreException(e);
170                }
171    
172                Configuration conf = wfInstance.getConf();
173                // System.out.println("WF INSTANCE CONF:");
174                // System.out.println(XmlUtils.prettyPrint(conf).toString());
175    
176                WorkflowJobBean workflow = new WorkflowJobBean();
177                workflow.setId(wfInstance.getId());
178                workflow.setAppName(ELUtils.resolveAppName(app.getName(), conf));
179                workflow.setAppPath(conf.get(OozieClient.APP_PATH));
180                workflow.setConf(XmlUtils.prettyPrint(conf).toString());
181                workflow.setProtoActionConf(protoActionConf.toXmlString());
182                workflow.setCreatedTime(new Date());
183                workflow.setLastModifiedTime(new Date());
184                workflow.setLogToken(conf.get(OozieClient.LOG_TOKEN, ""));
185                workflow.setStatus(WorkflowJob.Status.PREP);
186                workflow.setRun(0);
187                workflow.setUser(conf.get(OozieClient.USER_NAME));
188                workflow.setGroup(conf.get(OozieClient.GROUP_NAME));
189                workflow.setAuthToken(authToken);
190                workflow.setWorkflowInstance(wfInstance);
191                workflow.setExternalId(conf.get(OozieClient.EXTERNAL_ID));
192    
193                LogUtils.setLogInfo(workflow, logInfo);
194                LOG = XLog.resetPrefix(LOG);
195                LOG.debug("Workflow record created, Status [{0}]", workflow.getStatus());
196                Element wfElem = XmlUtils.parseXml(app.getDefinition());
197                ELEvaluator evalSla = createELEvaluatorForGroup(conf, "wf-sla-submit");
198                String jobSlaXml = verifySlaElements(wfElem, evalSla);
199                if (!dryrun) {
200                    writeSLARegistration(jobSlaXml, workflow.getId(), workflow.getUser(), workflow.getGroup(), LOG);
201                    workflow.setSlaXml(jobSlaXml);
202                    // System.out.println("SlaXml :"+ slaXml);
203    
204                    //store.insertWorkflow(workflow);
205                    insertList.add(workflow);
206                    JPAService jpaService = Services.get().get(JPAService.class);
207                    if (jpaService != null) {
208                        try {
209                            jpaService.execute(new BulkUpdateInsertJPAExecutor(null, insertList));
210                        }
211                        catch (JPAExecutorException je) {
212                            throw new CommandException(je);
213                        }
214                    }
215                    else {
216                        LOG.error(ErrorCode.E0610);
217                        return null;
218                    }
219    
220                    return workflow.getId();
221                }
222                else {
223                    return "OK";
224                }
225            }
226            catch (WorkflowException ex) {
227                throw new CommandException(ex);
228            }
229            catch (HadoopAccessorException ex) {
230                throw new CommandException(ex);
231            }
232            catch (Exception ex) {
233                throw new CommandException(ErrorCode.E0803, ex.getMessage(), ex);
234            }
235        }
236    
237        private String verifySlaElements(Element eWfJob, ELEvaluator evalSla) throws CommandException {
238            String jobSlaXml = "";
239            // Validate WF job
240            Element eSla = eWfJob.getChild("info", Namespace.getNamespace(SchemaService.SLA_NAME_SPACE_URI));
241            if (eSla != null) {
242                jobSlaXml = resolveSla(eSla, evalSla);
243            }
244    
245            // Validate all actions
246            for (Element action : (List<Element>) eWfJob.getChildren("action", eWfJob.getNamespace())) {
247                eSla = action.getChild("info", Namespace.getNamespace(SchemaService.SLA_NAME_SPACE_URI));
248                if (eSla != null) {
249                    resolveSla(eSla, evalSla);
250                }
251            }
252            return jobSlaXml;
253        }
254    
255        private void writeSLARegistration(String slaXml, String id, String user, String group, XLog log)
256                throws CommandException {
257            try {
258                if (slaXml != null && slaXml.length() > 0) {
259                    Element eSla = XmlUtils.parseXml(slaXml);
260                    SLAEventBean slaEvent = SLADbOperations.createSlaRegistrationEvent(eSla, id,
261                            SlaAppType.WORKFLOW_JOB, user, group, log);
262                    if(slaEvent != null) {
263                        insertList.add(slaEvent);
264                    }
265                }
266            }
267            catch (Exception e) {
268                e.printStackTrace();
269                throw new CommandException(ErrorCode.E1007, "workflow " + id, e.getMessage(), e);
270            }
271        }
272    
273        /**
274         * Resolve variables in sla xml element.
275         *
276         * @param eSla sla xml element
277         * @param evalSla sla evaluator
278         * @return sla xml string after evaluation
279         * @throws CommandException
280         */
281        public static String resolveSla(Element eSla, ELEvaluator evalSla) throws CommandException {
282            // EL evaluation
283            String slaXml = XmlUtils.prettyPrint(eSla).toString();
284            try {
285                slaXml = XmlUtils.removeComments(slaXml);
286                slaXml = evalSla.evaluate(slaXml, String.class);
287                XmlUtils.validateData(slaXml, SchemaName.SLA_ORIGINAL);
288                return slaXml;
289            }
290            catch (Exception e) {
291                throw new CommandException(ErrorCode.E1004, "Validation erro :" + e.getMessage(), e);
292            }
293        }
294    
295        /**
296         * Create an EL evaluator for a given group.
297         *
298         * @param conf configuration variable
299         * @param group group variable
300         * @return the evaluator created for the group
301         */
302        public static ELEvaluator createELEvaluatorForGroup(Configuration conf, String group) {
303            ELEvaluator eval = Services.get().get(ELService.class).createEvaluator(group);
304            for (Map.Entry<String, String> entry : conf) {
305                eval.setVariable(entry.getKey(), entry.getValue());
306            }
307            return eval;
308        }
309    
310        @Override
311        public String getEntityKey() {
312            return null;
313        }
314    
315        @Override
316        protected boolean isLockRequired() {
317            return false;
318        }
319    
320        @Override
321        protected void loadState() {
322    
323        }
324    
325        @Override
326        protected void verifyPrecondition() throws CommandException {
327    
328        }
329    
330    }