This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.wf;
019
020 import org.apache.hadoop.conf.Configuration;
021 import org.apache.hadoop.fs.Path;
022 import org.apache.hadoop.fs.FileSystem;
023 import org.apache.oozie.WorkflowJobBean;
024 import org.apache.oozie.ErrorCode;
025 import org.apache.oozie.service.HadoopAccessorException;
026 import org.apache.oozie.service.WorkflowStoreService;
027 import org.apache.oozie.service.WorkflowAppService;
028 import org.apache.oozie.service.HadoopAccessorService;
029 import org.apache.oozie.service.Services;
030 import org.apache.oozie.service.DagXLogInfoService;
031 import org.apache.oozie.util.XLog;
032 import org.apache.oozie.util.ParamChecker;
033 import org.apache.oozie.util.XConfiguration;
034 import org.apache.oozie.util.XmlUtils;
035 import org.apache.oozie.command.Command;
036 import org.apache.oozie.command.CommandException;
037 import org.apache.oozie.command.coord.CoordSubmitCommand;
038 import org.apache.oozie.coord.CoordELFunctions;
039 import org.apache.oozie.coord.CoordinatorJobException;
040 import org.apache.oozie.service.ELService;
041 import org.apache.oozie.service.SchemaService;
042 import org.apache.oozie.service.WorkflowAppService;
043 import org.apache.oozie.service.DagXLogInfoService;
044 import org.apache.oozie.service.WorkflowStoreService;
045 import org.apache.oozie.store.StoreException;
046 import org.apache.oozie.store.Store;
047 import org.apache.oozie.store.WorkflowStore;
048 import org.apache.oozie.workflow.WorkflowApp;
049 import org.apache.oozie.workflow.WorkflowException;
050 import org.apache.oozie.workflow.WorkflowInstance;
051 import org.apache.oozie.workflow.WorkflowLib;
052 import org.apache.oozie.util.ELEvaluator;
053 import org.apache.oozie.util.ParamChecker;
054 import org.apache.oozie.util.PropertiesUtils;
055 import org.apache.oozie.util.XLog;
056 import org.apache.oozie.util.XmlUtils;
057 import org.apache.oozie.util.XConfiguration;
058 import org.apache.oozie.util.db.SLADbOperations;
059 import org.apache.oozie.service.Services;
060 import org.apache.oozie.service.SchemaService.SchemaName;
061 import org.apache.oozie.client.OozieClient;
062 import org.apache.oozie.client.WorkflowJob;
063 import org.apache.oozie.client.SLAEvent.SlaAppType;
064 import org.jdom.Element;
065 import org.jdom.JDOMException;
066 import org.jdom.Namespace;
067
068 import java.util.Date;
069 import java.util.List;
070 import java.util.Map;
071 import java.util.Set;
072 import java.util.HashSet;
073 import java.util.Map;
074 import java.io.IOException;
075
076 public class SubmitCommand extends WorkflowCommand<String> {
077 public static final String CONFIG_DEFAULT = "config-default.xml";
078
079 private Configuration conf;
080 private String authToken;
081
082 public SubmitCommand(Configuration conf, String authToken) {
083 super("submit", "submit", 1, XLog.STD);
084 this.conf = ParamChecker.notNull(conf, "conf");
085 this.authToken = ParamChecker.notEmpty(authToken, "authToken");
086 }
087
088 private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>();
089 private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>();
090
091 static {
092 String[] badUserProps = {PropertiesUtils.DAYS, PropertiesUtils.HOURS, PropertiesUtils.MINUTES,
093 PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB, PropertiesUtils.TB, PropertiesUtils.PB,
094 PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN, PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN,
095 PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS};
096 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES);
097
098 String[] badDefaultProps = {PropertiesUtils.HADOOP_USER, PropertiesUtils.HADOOP_UGI,
099 WorkflowAppService.HADOOP_JT_KERBEROS_NAME, WorkflowAppService.HADOOP_NN_KERBEROS_NAME};
100 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES);
101 PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES);
102 }
103
104 @Override
105 protected String call(WorkflowStore store) throws StoreException, CommandException {
106 incrJobCounter(1);
107 WorkflowAppService wps = Services.get().get(WorkflowAppService.class);
108 try {
109 XLog.Info.get().setParameter(DagXLogInfoService.TOKEN, conf.get(OozieClient.LOG_TOKEN));
110 WorkflowApp app = wps.parseDef(conf, authToken);
111 XConfiguration protoActionConf = wps.createProtoActionConf(conf, authToken, true);
112 WorkflowLib workflowLib = Services.get().get(WorkflowStoreService.class).getWorkflowLibWithNoDB();
113
114 Path configDefault = new Path(new Path(conf.get(OozieClient.APP_PATH)).getParent(), CONFIG_DEFAULT);
115 String user = conf.get(OozieClient.USER_NAME);
116 String group = conf.get(OozieClient.GROUP_NAME);
117 FileSystem fs = Services.get().get(HadoopAccessorService.class).createFileSystem(user, group,
118 configDefault.toUri(), new Configuration());
119
120 if (fs.exists(configDefault)) {
121 try {
122 Configuration defaultConf = new XConfiguration(fs.open(configDefault));
123 PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES);
124 XConfiguration.injectDefaults(defaultConf, conf);
125 }
126 catch (IOException ex) {
127 throw new IOException("default configuration file, " + ex.getMessage(), ex);
128 }
129 }
130
131 PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES);
132
133 // Resolving all variables in the job properties.
134 // This ensures the Hadoop Configuration semantics is preserved.
135 XConfiguration resolvedVarsConf = new XConfiguration();
136 for (Map.Entry<String, String> entry : conf) {
137 resolvedVarsConf.set(entry.getKey(), conf.get(entry.getKey()));
138 }
139 conf = resolvedVarsConf;
140
141 WorkflowInstance wfInstance;
142 try {
143 wfInstance = workflowLib.createInstance(app, conf);
144 }
145 catch (WorkflowException e) {
146 throw new StoreException(e);
147 }
148
149 Configuration conf = wfInstance.getConf();
150 // System.out.println("WF INSTANCE CONF:");
151 // System.out.println(XmlUtils.prettyPrint(conf).toString());
152
153 WorkflowJobBean workflow = new WorkflowJobBean();
154 workflow.setId(wfInstance.getId());
155 workflow.setAppName(app.getName());
156 workflow.setAppPath(conf.get(OozieClient.APP_PATH));
157 workflow.setConf(XmlUtils.prettyPrint(conf).toString());
158 workflow.setProtoActionConf(protoActionConf.toXmlString());
159 workflow.setCreatedTime(new Date());
160 workflow.setLastModifiedTime(new Date());
161 workflow.setLogToken(conf.get(OozieClient.LOG_TOKEN, ""));
162 workflow.setStatus(WorkflowJob.Status.PREP);
163 workflow.setRun(0);
164 workflow.setUser(conf.get(OozieClient.USER_NAME));
165 workflow.setGroup(conf.get(OozieClient.GROUP_NAME));
166 workflow.setAuthToken(authToken);
167 workflow.setWorkflowInstance(wfInstance);
168 workflow.setExternalId(conf.get(OozieClient.EXTERNAL_ID));
169
170 setLogInfo(workflow);
171 Element wfElem = XmlUtils.parseXml(app.getDefinition());
172 ELEvaluator evalSla = createELEvaluatorForGroup(conf, "wf-sla-submit");
173 String jobSlaXml = verifySlaElements(wfElem, evalSla);
174 writeSLARegistration(jobSlaXml, workflow.getId(), workflow.getUser(), workflow.getGroup(), store);
175 workflow.setSlaXml(jobSlaXml);
176 // System.out.println("SlaXml :"+ slaXml);
177
178 store.insertWorkflow(workflow);
179
180 // Configuration conf1 = workflow.getWorkflowInstance().getConf();
181 // System.out.println("WF1 INSTANCE CONF:");
182 // System.out.println(XmlUtils.prettyPrint(conf1).toString());
183 // Add WF_JOB SLA Registration event
184
185 return workflow.getId();
186 }
187 catch (WorkflowException ex) {
188 throw new CommandException(ex);
189 }
190 catch (HadoopAccessorException ex) {
191 throw new CommandException(ex);
192 }
193 catch (Exception ex) {
194 throw new CommandException(ErrorCode.E0803, ex);
195 }
196 }
197
198 private String verifySlaElements(Element eWfJob, ELEvaluator evalSla) throws CommandException {
199 String jobSlaXml = "";
200 // String prefix = XmlUtils.getNamespacePrefix(eWfJob,
201 // SchemaService.SLA_NAME_SPACE_URI);
202 // Validate WF job
203 Element eSla = eWfJob.getChild("info", Namespace.getNamespace(SchemaService.SLA_NAME_SPACE_URI));
204 if (eSla != null) {
205 jobSlaXml = resolveSla(eSla, evalSla);
206 }
207
208 // Validate all actions
209 for (Element action : (List<Element>) eWfJob.getChildren("action", eWfJob.getNamespace())) {
210 eSla = action.getChild("info", Namespace.getNamespace(SchemaService.SLA_NAME_SPACE_URI));
211 if (eSla != null) {
212 resolveSla(eSla, evalSla);
213 }
214 }
215 return jobSlaXml;
216 }
217
218 private void writeSLARegistration(String slaXml, String id, String user, String group, Store store)
219 throws CommandException {
220 try {
221 if (slaXml != null && slaXml.length() > 0) {
222 Element eSla = XmlUtils.parseXml(slaXml);
223 SLADbOperations.writeSlaRegistrationEvent(eSla, store, id, SlaAppType.WORKFLOW_JOB, user, group);
224 }
225 }
226 catch (Exception e) {
227 // TODO Auto-generated catch block
228 e.printStackTrace();
229 throw new CommandException(ErrorCode.E1007, "workflow " + id, e);
230 }
231 }
232
233 public static String resolveSla(Element eSla, ELEvaluator evalSla) throws CommandException {
234 // EL evaluation
235 String slaXml = XmlUtils.prettyPrint(eSla).toString();
236 try {
237 slaXml = XmlUtils.removeComments(slaXml);
238 slaXml = evalSla.evaluate(slaXml, String.class);
239 XmlUtils.validateData(slaXml, SchemaName.SLA_ORIGINAL);
240 return slaXml;
241 }
242 catch (Exception e) {
243 throw new CommandException(ErrorCode.E1004, "Validation erro :" + e.getMessage(), e);
244 }
245 }
246
247 public static ELEvaluator createELEvaluatorForGroup(Configuration conf, String group) {
248 ELEvaluator eval = Services.get().get(ELService.class).createEvaluator(group);
249 for (Map.Entry<String, String> entry : conf) {
250 eval.setVariable(entry.getKey(), entry.getValue());
251 }
252 return eval;
253 }
254
255 }