This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.wf;
019
020 import org.apache.hadoop.conf.Configuration;
021 import org.apache.hadoop.fs.Path;
022 import org.apache.hadoop.fs.FileSystem;
023 import org.apache.oozie.SLAEventBean;
024 import org.apache.oozie.WorkflowJobBean;
025 import org.apache.oozie.ErrorCode;
026 import org.apache.oozie.service.HadoopAccessorException;
027 import org.apache.oozie.service.JPAService;
028 import org.apache.oozie.service.WorkflowStoreService;
029 import org.apache.oozie.service.WorkflowAppService;
030 import org.apache.oozie.service.HadoopAccessorService;
031 import org.apache.oozie.service.Services;
032 import org.apache.oozie.service.DagXLogInfoService;
033 import org.apache.oozie.util.ConfigUtils;
034 import org.apache.oozie.util.ELUtils;
035 import org.apache.oozie.util.LogUtils;
036 import org.apache.oozie.util.XLog;
037 import org.apache.oozie.util.ParamChecker;
038 import org.apache.oozie.util.XConfiguration;
039 import org.apache.oozie.util.XmlUtils;
040 import org.apache.oozie.command.CommandException;
041 import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
042 import org.apache.oozie.executor.jpa.JPAExecutorException;
043 import org.apache.oozie.service.ELService;
044 import org.apache.oozie.service.SchemaService;
045 import org.apache.oozie.store.StoreException;
046 import org.apache.oozie.workflow.WorkflowApp;
047 import org.apache.oozie.workflow.WorkflowException;
048 import org.apache.oozie.workflow.WorkflowInstance;
049 import org.apache.oozie.workflow.WorkflowLib;
050 import org.apache.oozie.util.ELEvaluator;
051 import org.apache.oozie.util.InstrumentUtils;
052 import org.apache.oozie.util.PropertiesUtils;
053 import org.apache.oozie.util.db.SLADbOperations;
054 import org.apache.oozie.service.SchemaService.SchemaName;
055 import org.apache.oozie.client.OozieClient;
056 import org.apache.oozie.client.WorkflowJob;
057 import org.apache.oozie.client.SLAEvent.SlaAppType;
058 import org.apache.oozie.client.rest.JsonBean;
059 import org.jdom.Element;
060 import org.jdom.Namespace;
061
062 import java.util.ArrayList;
063 import java.util.Date;
064 import java.util.List;
065 import java.util.Map;
066 import java.util.Set;
067 import java.util.HashSet;
068 import java.io.IOException;
069 import java.net.URI;
070
071 public class SubmitXCommand extends WorkflowXCommand<String> {
072 public static final String CONFIG_DEFAULT = "config-default.xml";
073
074 private Configuration conf;
075 private String authToken;
076 private List<JsonBean> insertList = new ArrayList<JsonBean>();
077
078 public SubmitXCommand(Configuration conf, String authToken) {
079 super("submit", "submit", 1);
080 this.conf = ParamChecker.notNull(conf, "conf");
081 this.authToken = ParamChecker.notEmpty(authToken, "authToken");
082 }
083
084 private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>();
085 private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>();
086
087 static {
088 String[] badUserProps = {PropertiesUtils.DAYS, PropertiesUtils.HOURS, PropertiesUtils.MINUTES,
089 PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB, PropertiesUtils.TB, PropertiesUtils.PB,
090 PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN, PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN,
091 PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS};
092 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES);
093
094 String[] badDefaultProps = {PropertiesUtils.HADOOP_USER};
095 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES);
096 PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES);
097 }
098
099 @Override
100 protected String execute() throws CommandException {
101 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
102 WorkflowAppService wps = Services.get().get(WorkflowAppService.class);
103 try {
104 XLog.Info.get().setParameter(DagXLogInfoService.TOKEN, conf.get(OozieClient.LOG_TOKEN));
105 WorkflowApp app = wps.parseDef(conf, authToken);
106 XConfiguration protoActionConf = wps.createProtoActionConf(conf, authToken, true);
107 WorkflowLib workflowLib = Services.get().get(WorkflowStoreService.class).getWorkflowLibWithNoDB();
108
109 String user = conf.get(OozieClient.USER_NAME);
110 String group = ConfigUtils.getWithDeprecatedCheck(conf, OozieClient.JOB_ACL, OozieClient.GROUP_NAME, null);
111 URI uri = new URI(conf.get(OozieClient.APP_PATH));
112 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
113 Configuration fsConf = has.createJobConf(uri.getAuthority());
114 FileSystem fs = has.createFileSystem(user, uri, fsConf);
115
116 Path configDefault = null;
117 // app path could be a directory
118 Path path = new Path(uri.getPath());
119 if (!fs.isFile(path)) {
120 configDefault = new Path(path, CONFIG_DEFAULT);
121 } else {
122 configDefault = new Path(path.getParent(), CONFIG_DEFAULT);
123 }
124
125 if (fs.exists(configDefault)) {
126 try {
127 Configuration defaultConf = new XConfiguration(fs.open(configDefault));
128 PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES);
129 XConfiguration.injectDefaults(defaultConf, conf);
130 }
131 catch (IOException ex) {
132 throw new IOException("default configuration file, " + ex.getMessage(), ex);
133 }
134 }
135
136 PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES);
137
138 // Resolving all variables in the job properties.
139 // This ensures the Hadoop Configuration semantics is preserved.
140 XConfiguration resolvedVarsConf = new XConfiguration();
141 for (Map.Entry<String, String> entry : conf) {
142 resolvedVarsConf.set(entry.getKey(), conf.get(entry.getKey()));
143 }
144 conf = resolvedVarsConf;
145
146 WorkflowInstance wfInstance;
147 try {
148 wfInstance = workflowLib.createInstance(app, conf);
149 }
150 catch (WorkflowException e) {
151 throw new StoreException(e);
152 }
153
154 Configuration conf = wfInstance.getConf();
155 // System.out.println("WF INSTANCE CONF:");
156 // System.out.println(XmlUtils.prettyPrint(conf).toString());
157
158 WorkflowJobBean workflow = new WorkflowJobBean();
159 workflow.setId(wfInstance.getId());
160 workflow.setAppName(ELUtils.resolveAppName(app.getName(), conf));
161 workflow.setAppPath(conf.get(OozieClient.APP_PATH));
162 workflow.setConf(XmlUtils.prettyPrint(conf).toString());
163 workflow.setProtoActionConf(protoActionConf.toXmlString());
164 workflow.setCreatedTime(new Date());
165 workflow.setLastModifiedTime(new Date());
166 workflow.setLogToken(conf.get(OozieClient.LOG_TOKEN, ""));
167 workflow.setStatus(WorkflowJob.Status.PREP);
168 workflow.setRun(0);
169 workflow.setUser(conf.get(OozieClient.USER_NAME));
170 workflow.setGroup(conf.get(OozieClient.GROUP_NAME));
171 workflow.setAuthToken(authToken);
172 workflow.setWorkflowInstance(wfInstance);
173 workflow.setExternalId(conf.get(OozieClient.EXTERNAL_ID));
174
175 LogUtils.setLogInfo(workflow, logInfo);
176 LOG = XLog.resetPrefix(LOG);
177 LOG.debug("Workflow record created, Status [{0}]", workflow.getStatus());
178 Element wfElem = XmlUtils.parseXml(app.getDefinition());
179 ELEvaluator evalSla = createELEvaluatorForGroup(conf, "wf-sla-submit");
180 String jobSlaXml = verifySlaElements(wfElem, evalSla);
181 writeSLARegistration(jobSlaXml, workflow.getId(), workflow.getUser(), workflow.getGroup(), LOG);
182 workflow.setSlaXml(jobSlaXml);
183 // System.out.println("SlaXml :"+ slaXml);
184
185 //store.insertWorkflow(workflow);
186 insertList.add(workflow);
187 JPAService jpaService = Services.get().get(JPAService.class);
188 if (jpaService != null) {
189 try {
190 jpaService.execute(new BulkUpdateInsertJPAExecutor(null, insertList));
191 }
192 catch (JPAExecutorException je) {
193 throw new CommandException(je);
194 }
195 }
196 else {
197 LOG.error(ErrorCode.E0610);
198 return null;
199 }
200
201 return workflow.getId();
202 }
203 catch (WorkflowException ex) {
204 throw new CommandException(ex);
205 }
206 catch (HadoopAccessorException ex) {
207 throw new CommandException(ex);
208 }
209 catch (Exception ex) {
210 throw new CommandException(ErrorCode.E0803, ex);
211 }
212 }
213
214 private String verifySlaElements(Element eWfJob, ELEvaluator evalSla) throws CommandException {
215 String jobSlaXml = "";
216 // Validate WF job
217 Element eSla = eWfJob.getChild("info", Namespace.getNamespace(SchemaService.SLA_NAME_SPACE_URI));
218 if (eSla != null) {
219 jobSlaXml = resolveSla(eSla, evalSla);
220 }
221
222 // Validate all actions
223 for (Element action : (List<Element>) eWfJob.getChildren("action", eWfJob.getNamespace())) {
224 eSla = action.getChild("info", Namespace.getNamespace(SchemaService.SLA_NAME_SPACE_URI));
225 if (eSla != null) {
226 resolveSla(eSla, evalSla);
227 }
228 }
229 return jobSlaXml;
230 }
231
232 private void writeSLARegistration(String slaXml, String id, String user, String group, XLog log)
233 throws CommandException {
234 try {
235 if (slaXml != null && slaXml.length() > 0) {
236 Element eSla = XmlUtils.parseXml(slaXml);
237 SLAEventBean slaEvent = SLADbOperations.createSlaRegistrationEvent(eSla, id,
238 SlaAppType.WORKFLOW_JOB, user, group, log);
239 if(slaEvent != null) {
240 insertList.add(slaEvent);
241 }
242 }
243 }
244 catch (Exception e) {
245 e.printStackTrace();
246 throw new CommandException(ErrorCode.E1007, "workflow " + id, e);
247 }
248 }
249
250 /**
251 * Resolve variables in sla xml element.
252 *
253 * @param eSla sla xml element
254 * @param evalSla sla evaluator
255 * @return sla xml string after evaluation
256 * @throws CommandException
257 */
258 public static String resolveSla(Element eSla, ELEvaluator evalSla) throws CommandException {
259 // EL evaluation
260 String slaXml = XmlUtils.prettyPrint(eSla).toString();
261 try {
262 slaXml = XmlUtils.removeComments(slaXml);
263 slaXml = evalSla.evaluate(slaXml, String.class);
264 XmlUtils.validateData(slaXml, SchemaName.SLA_ORIGINAL);
265 return slaXml;
266 }
267 catch (Exception e) {
268 throw new CommandException(ErrorCode.E1004, "Validation erro :" + e.getMessage(), e);
269 }
270 }
271
272 /**
273 * Create an EL evaluator for a given group.
274 *
275 * @param conf configuration variable
276 * @param group group variable
277 * @return the evaluator created for the group
278 */
279 public static ELEvaluator createELEvaluatorForGroup(Configuration conf, String group) {
280 ELEvaluator eval = Services.get().get(ELService.class).createEvaluator(group);
281 for (Map.Entry<String, String> entry : conf) {
282 eval.setVariable(entry.getKey(), entry.getValue());
283 }
284 return eval;
285 }
286
287 @Override
288 public String getEntityKey() {
289 return null;
290 }
291
292 @Override
293 protected boolean isLockRequired() {
294 return false;
295 }
296
297 @Override
298 protected void loadState() {
299
300 }
301
302 @Override
303 protected void verifyPrecondition() throws CommandException {
304
305 }
306
307 }