001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.wf;
019    
020    import org.apache.hadoop.conf.Configuration;
021    import org.apache.hadoop.fs.Path;
022    import org.apache.hadoop.fs.FileSystem;
023    import org.apache.oozie.WorkflowJobBean;
024    import org.apache.oozie.ErrorCode;
025    import org.apache.oozie.service.HadoopAccessorException;
026    import org.apache.oozie.service.WorkflowStoreService;
027    import org.apache.oozie.service.WorkflowAppService;
028    import org.apache.oozie.service.HadoopAccessorService;
029    import org.apache.oozie.service.Services;
030    import org.apache.oozie.service.DagXLogInfoService;
031    import org.apache.oozie.util.XLog;
032    import org.apache.oozie.util.ParamChecker;
033    import org.apache.oozie.util.XConfiguration;
034    import org.apache.oozie.util.XmlUtils;
035    import org.apache.oozie.command.Command;
036    import org.apache.oozie.command.CommandException;
037    import org.apache.oozie.command.coord.CoordSubmitCommand;
038    import org.apache.oozie.coord.CoordELFunctions;
039    import org.apache.oozie.coord.CoordinatorJobException;
040    import org.apache.oozie.service.ELService;
041    import org.apache.oozie.service.SchemaService;
042    import org.apache.oozie.service.WorkflowAppService;
043    import org.apache.oozie.service.DagXLogInfoService;
044    import org.apache.oozie.service.WorkflowStoreService;
045    import org.apache.oozie.store.StoreException;
046    import org.apache.oozie.store.Store;
047    import org.apache.oozie.store.WorkflowStore;
048    import org.apache.oozie.workflow.WorkflowApp;
049    import org.apache.oozie.workflow.WorkflowException;
050    import org.apache.oozie.workflow.WorkflowInstance;
051    import org.apache.oozie.workflow.WorkflowLib;
052    import org.apache.oozie.util.ELEvaluator;
053    import org.apache.oozie.util.ParamChecker;
054    import org.apache.oozie.util.PropertiesUtils;
055    import org.apache.oozie.util.XLog;
056    import org.apache.oozie.util.XmlUtils;
057    import org.apache.oozie.util.XConfiguration;
058    import org.apache.oozie.util.db.SLADbOperations;
059    import org.apache.oozie.service.Services;
060    import org.apache.oozie.service.SchemaService.SchemaName;
061    import org.apache.oozie.client.OozieClient;
062    import org.apache.oozie.client.WorkflowJob;
063    import org.apache.oozie.client.SLAEvent.SlaAppType;
064    import org.jdom.Element;
065    import org.jdom.JDOMException;
066    import org.jdom.Namespace;
067    
068    import java.util.Date;
069    import java.util.List;
070    import java.util.Map;
071    import java.util.Set;
072    import java.util.HashSet;
073    import java.util.Map;
074    import java.io.IOException;
075    
076    public class SubmitCommand extends WorkflowCommand<String> {
077        public static final String CONFIG_DEFAULT = "config-default.xml";
078    
079        private Configuration conf;
080        private String authToken;
081    
082        public SubmitCommand(Configuration conf, String authToken) {
083            super("submit", "submit", 1, XLog.STD);
084            this.conf = ParamChecker.notNull(conf, "conf");
085            this.authToken = ParamChecker.notEmpty(authToken, "authToken");
086        }
087    
088        private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>();
089        private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>();
090    
091        static {
092            String[] badUserProps = {PropertiesUtils.DAYS, PropertiesUtils.HOURS, PropertiesUtils.MINUTES,
093                    PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB, PropertiesUtils.TB, PropertiesUtils.PB,
094                    PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN, PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN,
095                    PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS};
096            PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES);
097    
098            String[] badDefaultProps = {PropertiesUtils.HADOOP_USER, PropertiesUtils.HADOOP_UGI,
099                    WorkflowAppService.HADOOP_JT_KERBEROS_NAME, WorkflowAppService.HADOOP_NN_KERBEROS_NAME};
100            PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES);
101            PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES);
102        }
103    
104        @Override
105        protected String call(WorkflowStore store) throws StoreException, CommandException {
106            incrJobCounter(1);
107            WorkflowAppService wps = Services.get().get(WorkflowAppService.class);
108            try {
109                XLog.Info.get().setParameter(DagXLogInfoService.TOKEN, conf.get(OozieClient.LOG_TOKEN));
110                WorkflowApp app = wps.parseDef(conf, authToken);
111                XConfiguration protoActionConf = wps.createProtoActionConf(conf, authToken, true);
112                WorkflowLib workflowLib = Services.get().get(WorkflowStoreService.class).getWorkflowLibWithNoDB();
113    
114                Path configDefault = new Path(new Path(conf.get(OozieClient.APP_PATH)).getParent(), CONFIG_DEFAULT);
115                String user = conf.get(OozieClient.USER_NAME);
116                String group = conf.get(OozieClient.GROUP_NAME);
117                FileSystem fs = Services.get().get(HadoopAccessorService.class).createFileSystem(user, group,
118                                                                                                 configDefault.toUri(), new Configuration());
119    
120                if (fs.exists(configDefault)) {
121                    try {
122                        Configuration defaultConf = new XConfiguration(fs.open(configDefault));
123                        PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES);
124                        XConfiguration.injectDefaults(defaultConf, conf);
125                    }
126                    catch (IOException ex) {
127                        throw new IOException("default configuration file, " + ex.getMessage(), ex);
128                    }
129                }
130    
131                PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES);
132    
133                // Resolving all variables in the job properties.
134                // This ensures the Hadoop Configuration semantics is preserved.
135                XConfiguration resolvedVarsConf = new XConfiguration();
136                for (Map.Entry<String, String> entry : conf) {
137                    resolvedVarsConf.set(entry.getKey(), conf.get(entry.getKey()));
138                }
139                conf = resolvedVarsConf;
140    
141                WorkflowInstance wfInstance;
142                try {
143                    wfInstance = workflowLib.createInstance(app, conf);
144                }
145                catch (WorkflowException e) {
146                    throw new StoreException(e);
147                }
148    
149                Configuration conf = wfInstance.getConf();
150                // System.out.println("WF INSTANCE CONF:");
151                // System.out.println(XmlUtils.prettyPrint(conf).toString());
152    
153                WorkflowJobBean workflow = new WorkflowJobBean();
154                workflow.setId(wfInstance.getId());
155                workflow.setAppName(app.getName());
156                workflow.setAppPath(conf.get(OozieClient.APP_PATH));
157                workflow.setConf(XmlUtils.prettyPrint(conf).toString());
158                workflow.setProtoActionConf(protoActionConf.toXmlString());
159                workflow.setCreatedTime(new Date());
160                workflow.setLastModifiedTime(new Date());
161                workflow.setLogToken(conf.get(OozieClient.LOG_TOKEN, ""));
162                workflow.setStatus(WorkflowJob.Status.PREP);
163                workflow.setRun(0);
164                workflow.setUser(conf.get(OozieClient.USER_NAME));
165                workflow.setGroup(conf.get(OozieClient.GROUP_NAME));
166                workflow.setAuthToken(authToken);
167                workflow.setWorkflowInstance(wfInstance);
168                workflow.setExternalId(conf.get(OozieClient.EXTERNAL_ID));
169    
170                setLogInfo(workflow);
171                Element wfElem = XmlUtils.parseXml(app.getDefinition());
172                ELEvaluator evalSla = createELEvaluatorForGroup(conf, "wf-sla-submit");
173                String jobSlaXml = verifySlaElements(wfElem, evalSla);
174                writeSLARegistration(jobSlaXml, workflow.getId(), workflow.getUser(), workflow.getGroup(), store);
175                workflow.setSlaXml(jobSlaXml);
176                // System.out.println("SlaXml :"+ slaXml);
177    
178                store.insertWorkflow(workflow);
179    
180                // Configuration conf1 = workflow.getWorkflowInstance().getConf();
181                // System.out.println("WF1 INSTANCE CONF:");
182                // System.out.println(XmlUtils.prettyPrint(conf1).toString());
183                // Add WF_JOB SLA Registration event
184    
185                return workflow.getId();
186            }
187            catch (WorkflowException ex) {
188                throw new CommandException(ex);
189            }
190            catch (HadoopAccessorException ex) {
191                throw new CommandException(ex);
192            }
193            catch (Exception ex) {
194                throw new CommandException(ErrorCode.E0803, ex);
195            }
196        }
197    
198        private String verifySlaElements(Element eWfJob, ELEvaluator evalSla) throws CommandException {
199            String jobSlaXml = "";
200            // String prefix = XmlUtils.getNamespacePrefix(eWfJob,
201            // SchemaService.SLA_NAME_SPACE_URI);
202            // Validate WF job
203            Element eSla = eWfJob.getChild("info", Namespace.getNamespace(SchemaService.SLA_NAME_SPACE_URI));
204            if (eSla != null) {
205                jobSlaXml = resolveSla(eSla, evalSla);
206            }
207    
208            // Validate all actions
209            for (Element action : (List<Element>) eWfJob.getChildren("action", eWfJob.getNamespace())) {
210                eSla = action.getChild("info", Namespace.getNamespace(SchemaService.SLA_NAME_SPACE_URI));
211                if (eSla != null) {
212                    resolveSla(eSla, evalSla);
213                }
214            }
215            return jobSlaXml;
216        }
217    
218        private void writeSLARegistration(String slaXml, String id, String user, String group, Store store)
219                throws CommandException {
220            try {
221                if (slaXml != null && slaXml.length() > 0) {
222                    Element eSla = XmlUtils.parseXml(slaXml);
223                    SLADbOperations.writeSlaRegistrationEvent(eSla, store, id, SlaAppType.WORKFLOW_JOB, user, group);
224                }
225            }
226            catch (Exception e) {
227                // TODO Auto-generated catch block
228                e.printStackTrace();
229                throw new CommandException(ErrorCode.E1007, "workflow " + id, e);
230            }
231        }
232    
233        public static String resolveSla(Element eSla, ELEvaluator evalSla) throws CommandException {
234            // EL evaluation
235            String slaXml = XmlUtils.prettyPrint(eSla).toString();
236            try {
237                slaXml = XmlUtils.removeComments(slaXml);
238                slaXml = evalSla.evaluate(slaXml, String.class);
239                XmlUtils.validateData(slaXml, SchemaName.SLA_ORIGINAL);
240                return slaXml;
241            }
242            catch (Exception e) {
243                throw new CommandException(ErrorCode.E1004, "Validation erro :" + e.getMessage(), e);
244            }
245        }
246    
247        public static ELEvaluator createELEvaluatorForGroup(Configuration conf, String group) {
248            ELEvaluator eval = Services.get().get(ELService.class).createEvaluator(group);
249            for (Map.Entry<String, String> entry : conf) {
250                eval.setVariable(entry.getKey(), entry.getValue());
251            }
252            return eval;
253        }
254    
255    }