001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.wf;
019    
020    import org.apache.hadoop.conf.Configuration;
021    import org.apache.hadoop.fs.Path;
022    import org.apache.hadoop.fs.FileSystem;
023    import org.apache.oozie.WorkflowJobBean;
024    import org.apache.oozie.ErrorCode;
025    import org.apache.oozie.service.HadoopAccessorException;
026    import org.apache.oozie.service.JPAService;
027    import org.apache.oozie.service.WorkflowStoreService;
028    import org.apache.oozie.service.WorkflowAppService;
029    import org.apache.oozie.service.HadoopAccessorService;
030    import org.apache.oozie.service.Services;
031    import org.apache.oozie.service.DagXLogInfoService;
032    import org.apache.oozie.util.ConfigUtils;
033    import org.apache.oozie.util.LogUtils;
034    import org.apache.oozie.util.XLog;
035    import org.apache.oozie.util.ParamChecker;
036    import org.apache.oozie.util.XConfiguration;
037    import org.apache.oozie.util.XmlUtils;
038    import org.apache.oozie.command.CommandException;
039    import org.apache.oozie.executor.jpa.WorkflowJobInsertJPAExecutor;
040    import org.apache.oozie.service.ELService;
041    import org.apache.oozie.service.SchemaService;
042    import org.apache.oozie.store.StoreException;
043    import org.apache.oozie.workflow.WorkflowApp;
044    import org.apache.oozie.workflow.WorkflowException;
045    import org.apache.oozie.workflow.WorkflowInstance;
046    import org.apache.oozie.workflow.WorkflowLib;
047    import org.apache.oozie.util.ELEvaluator;
048    import org.apache.oozie.util.InstrumentUtils;
049    import org.apache.oozie.util.PropertiesUtils;
050    import org.apache.oozie.util.db.SLADbOperations;
051    import org.apache.oozie.service.SchemaService.SchemaName;
052    import org.apache.oozie.client.OozieClient;
053    import org.apache.oozie.client.WorkflowJob;
054    import org.apache.oozie.client.SLAEvent.SlaAppType;
055    import org.jdom.Element;
056    import org.jdom.Namespace;
057    
058    import java.util.Date;
059    import java.util.List;
060    import java.util.Map;
061    import java.util.Set;
062    import java.util.HashSet;
063    import java.io.IOException;
064    import java.net.URI;
065    
066    public class SubmitXCommand extends WorkflowXCommand<String> {
067        public static final String CONFIG_DEFAULT = "config-default.xml";
068    
069        private Configuration conf;
070        private String authToken;
071    
072        public SubmitXCommand(Configuration conf, String authToken) {
073            super("submit", "submit", 1);
074            this.conf = ParamChecker.notNull(conf, "conf");
075            this.authToken = ParamChecker.notEmpty(authToken, "authToken");
076        }
077    
078        private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>();
079        private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>();
080    
081        static {
082            String[] badUserProps = {PropertiesUtils.DAYS, PropertiesUtils.HOURS, PropertiesUtils.MINUTES,
083                    PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB, PropertiesUtils.TB, PropertiesUtils.PB,
084                    PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN, PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN,
085                    PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS};
086            PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES);
087    
088            String[] badDefaultProps = {PropertiesUtils.HADOOP_USER};
089            PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES);
090            PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES);
091        }
092    
093        @Override
094        protected String execute() throws CommandException {
095            InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
096            WorkflowAppService wps = Services.get().get(WorkflowAppService.class);
097            try {
098                XLog.Info.get().setParameter(DagXLogInfoService.TOKEN, conf.get(OozieClient.LOG_TOKEN));
099                WorkflowApp app = wps.parseDef(conf, authToken);
100                XConfiguration protoActionConf = wps.createProtoActionConf(conf, authToken, true);
101                WorkflowLib workflowLib = Services.get().get(WorkflowStoreService.class).getWorkflowLibWithNoDB();
102    
103                String user = conf.get(OozieClient.USER_NAME);
104                String group = ConfigUtils.getWithDeprecatedCheck(conf, OozieClient.JOB_ACL, OozieClient.GROUP_NAME, null);
105                URI uri = new URI(conf.get(OozieClient.APP_PATH));
106                HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
107                Configuration fsConf = has.createJobConf(uri.getAuthority());
108                FileSystem fs = has.createFileSystem(user, uri, fsConf);
109    
110                Path configDefault = null;
111                // app path could be a directory
112                Path path = new Path(uri.getPath());
113                if (!fs.isFile(path)) {
114                    configDefault = new Path(path, CONFIG_DEFAULT);
115                } else {
116                    configDefault = new Path(path.getParent(), CONFIG_DEFAULT);
117                }
118    
119                if (fs.exists(configDefault)) {
120                    try {
121                        Configuration defaultConf = new XConfiguration(fs.open(configDefault));
122                        PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES);
123                        XConfiguration.injectDefaults(defaultConf, conf);
124                    }
125                    catch (IOException ex) {
126                        throw new IOException("default configuration file, " + ex.getMessage(), ex);
127                    }
128                }
129    
130                PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES);
131    
132                // Resolving all variables in the job properties.
133                // This ensures the Hadoop Configuration semantics is preserved.
134                XConfiguration resolvedVarsConf = new XConfiguration();
135                for (Map.Entry<String, String> entry : conf) {
136                    resolvedVarsConf.set(entry.getKey(), conf.get(entry.getKey()));
137                }
138                conf = resolvedVarsConf;
139    
140                WorkflowInstance wfInstance;
141                try {
142                    wfInstance = workflowLib.createInstance(app, conf);
143                }
144                catch (WorkflowException e) {
145                    throw new StoreException(e);
146                }
147    
148                Configuration conf = wfInstance.getConf();
149                // System.out.println("WF INSTANCE CONF:");
150                // System.out.println(XmlUtils.prettyPrint(conf).toString());
151    
152                WorkflowJobBean workflow = new WorkflowJobBean();
153                workflow.setId(wfInstance.getId());
154                workflow.setAppName(app.getName());
155                workflow.setAppPath(conf.get(OozieClient.APP_PATH));
156                workflow.setConf(XmlUtils.prettyPrint(conf).toString());
157                workflow.setProtoActionConf(protoActionConf.toXmlString());
158                workflow.setCreatedTime(new Date());
159                workflow.setLastModifiedTime(new Date());
160                workflow.setLogToken(conf.get(OozieClient.LOG_TOKEN, ""));
161                workflow.setStatus(WorkflowJob.Status.PREP);
162                workflow.setRun(0);
163                workflow.setUser(conf.get(OozieClient.USER_NAME));
164                workflow.setGroup(conf.get(OozieClient.GROUP_NAME));
165                workflow.setAuthToken(authToken);
166                workflow.setWorkflowInstance(wfInstance);
167                workflow.setExternalId(conf.get(OozieClient.EXTERNAL_ID));
168    
169                LogUtils.setLogInfo(workflow, logInfo);
170                LOG = XLog.resetPrefix(LOG);
171                LOG.debug("Workflow record created, Status [{0}]", workflow.getStatus());
172                Element wfElem = XmlUtils.parseXml(app.getDefinition());
173                ELEvaluator evalSla = createELEvaluatorForGroup(conf, "wf-sla-submit");
174                String jobSlaXml = verifySlaElements(wfElem, evalSla);
175                writeSLARegistration(jobSlaXml, workflow.getId(), workflow.getUser(), workflow.getGroup(), LOG);
176                workflow.setSlaXml(jobSlaXml);
177                // System.out.println("SlaXml :"+ slaXml);
178    
179                //store.insertWorkflow(workflow);
180                JPAService jpaService = Services.get().get(JPAService.class);
181                if (jpaService != null) {
182                    jpaService.execute(new WorkflowJobInsertJPAExecutor(workflow));
183                }
184                else {
185                    LOG.error(ErrorCode.E0610);
186                    return null;
187                }
188    
189                return workflow.getId();
190            }
191            catch (WorkflowException ex) {
192                throw new CommandException(ex);
193            }
194            catch (HadoopAccessorException ex) {
195                throw new CommandException(ex);
196            }
197            catch (Exception ex) {
198                throw new CommandException(ErrorCode.E0803, ex);
199            }
200        }
201    
202        private String verifySlaElements(Element eWfJob, ELEvaluator evalSla) throws CommandException {
203            String jobSlaXml = "";
204            // Validate WF job
205            Element eSla = eWfJob.getChild("info", Namespace.getNamespace(SchemaService.SLA_NAME_SPACE_URI));
206            if (eSla != null) {
207                jobSlaXml = resolveSla(eSla, evalSla);
208            }
209    
210            // Validate all actions
211            for (Element action : (List<Element>) eWfJob.getChildren("action", eWfJob.getNamespace())) {
212                eSla = action.getChild("info", Namespace.getNamespace(SchemaService.SLA_NAME_SPACE_URI));
213                if (eSla != null) {
214                    resolveSla(eSla, evalSla);
215                }
216            }
217            return jobSlaXml;
218        }
219    
220        private void writeSLARegistration(String slaXml, String id, String user, String group, XLog log)
221                throws CommandException {
222            try {
223                if (slaXml != null && slaXml.length() > 0) {
224                    Element eSla = XmlUtils.parseXml(slaXml);
225                    SLADbOperations.writeSlaRegistrationEvent(eSla, id, SlaAppType.WORKFLOW_JOB, user, group, log);
226                }
227            }
228            catch (Exception e) {
229                e.printStackTrace();
230                throw new CommandException(ErrorCode.E1007, "workflow " + id, e);
231            }
232        }
233    
234        /**
235         * Resolve variables in sla xml element.
236         *
237         * @param eSla sla xml element
238         * @param evalSla sla evaluator
239         * @return sla xml string after evaluation
240         * @throws CommandException
241         */
242        public static String resolveSla(Element eSla, ELEvaluator evalSla) throws CommandException {
243            // EL evaluation
244            String slaXml = XmlUtils.prettyPrint(eSla).toString();
245            try {
246                slaXml = XmlUtils.removeComments(slaXml);
247                slaXml = evalSla.evaluate(slaXml, String.class);
248                XmlUtils.validateData(slaXml, SchemaName.SLA_ORIGINAL);
249                return slaXml;
250            }
251            catch (Exception e) {
252                throw new CommandException(ErrorCode.E1004, "Validation erro :" + e.getMessage(), e);
253            }
254        }
255    
256        /**
257         * Create an EL evaluator for a given group.
258         *
259         * @param conf configuration variable
260         * @param group group variable
261         * @return the evaluator created for the group
262         */
263        public static ELEvaluator createELEvaluatorForGroup(Configuration conf, String group) {
264            ELEvaluator eval = Services.get().get(ELService.class).createEvaluator(group);
265            for (Map.Entry<String, String> entry : conf) {
266                eval.setVariable(entry.getKey(), entry.getValue());
267            }
268            return eval;
269        }
270    
271        @Override
272        public String getEntityKey() {
273            return null;
274        }
275    
276        @Override
277        protected boolean isLockRequired() {
278            return false;
279        }
280    
281        @Override
282        protected void loadState() {
283    
284        }
285    
286        @Override
287        protected void verifyPrecondition() throws CommandException {
288    
289        }
290    
291    }