This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.wf;
019
020 import org.apache.hadoop.conf.Configuration;
021 import org.apache.hadoop.fs.Path;
022 import org.apache.hadoop.fs.FileSystem;
023 import org.apache.oozie.AppType;
024 import org.apache.oozie.SLAEventBean;
025 import org.apache.oozie.WorkflowJobBean;
026 import org.apache.oozie.ErrorCode;
027 import org.apache.oozie.action.oozie.SubWorkflowActionExecutor;
028 import org.apache.oozie.service.HadoopAccessorException;
029 import org.apache.oozie.service.JPAService;
030 import org.apache.oozie.service.UUIDService;
031 import org.apache.oozie.service.WorkflowStoreService;
032 import org.apache.oozie.service.WorkflowAppService;
033 import org.apache.oozie.service.HadoopAccessorService;
034 import org.apache.oozie.service.Services;
035 import org.apache.oozie.service.DagXLogInfoService;
036 import org.apache.oozie.util.ConfigUtils;
037 import org.apache.oozie.util.ELUtils;
038 import org.apache.oozie.util.LogUtils;
039 import org.apache.oozie.sla.SLAOperations;
040 import org.apache.oozie.util.XLog;
041 import org.apache.oozie.util.ParamChecker;
042 import org.apache.oozie.util.XConfiguration;
043 import org.apache.oozie.util.XmlUtils;
044 import org.apache.oozie.command.CommandException;
045 import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
046 import org.apache.oozie.executor.jpa.JPAExecutorException;
047 import org.apache.oozie.service.ELService;
048 import org.apache.oozie.store.StoreException;
049 import org.apache.oozie.workflow.WorkflowApp;
050 import org.apache.oozie.workflow.WorkflowException;
051 import org.apache.oozie.workflow.WorkflowInstance;
052 import org.apache.oozie.workflow.WorkflowLib;
053 import org.apache.oozie.util.ELEvaluator;
054 import org.apache.oozie.util.InstrumentUtils;
055 import org.apache.oozie.util.PropertiesUtils;
056 import org.apache.oozie.util.db.SLADbOperations;
057 import org.apache.oozie.service.SchemaService.SchemaName;
058 import org.apache.oozie.client.OozieClient;
059 import org.apache.oozie.client.WorkflowJob;
060 import org.apache.oozie.client.SLAEvent.SlaAppType;
061 import org.apache.oozie.client.rest.JsonBean;
062 import org.jdom.Element;
063 import java.util.ArrayList;
064 import java.util.Date;
065 import java.util.List;
066 import java.util.Map;
067 import java.util.Set;
068 import java.util.HashSet;
069 import java.io.IOException;
070 import java.net.URI;
071
072 @SuppressWarnings("deprecation")
073 public class SubmitXCommand extends WorkflowXCommand<String> {
074 public static final String CONFIG_DEFAULT = "config-default.xml";
075
076 private Configuration conf;
077 private List<JsonBean> insertList = new ArrayList<JsonBean>();
078 private String parentId;
079
080 /**
081 * Constructor to create the workflow Submit Command.
082 *
083 * @param conf : Configuration for workflow job
084 */
085 public SubmitXCommand(Configuration conf) {
086 super("submit", "submit", 1);
087 this.conf = ParamChecker.notNull(conf, "conf");
088 }
089
090 /**
091 * Constructor for submitting wf through coordinator
092 *
093 * @param conf : Configuration for workflow job
094 * @param parentId: the coord action id
095 */
096 public SubmitXCommand(Configuration conf, String parentId) {
097 this(conf);
098 this.parentId = parentId;
099 }
100
101 /**
102 * Constructor to create the workflow Submit Command.
103 *
104 * @param dryrun : if dryrun
105 * @param conf : Configuration for workflow job
106 * @param authToken : To be used for authentication
107 */
108 public SubmitXCommand(boolean dryrun, Configuration conf) {
109 this(conf);
110 this.dryrun = dryrun;
111 }
112
113 private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>();
114 private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>();
115
116 static {
117 String[] badUserProps = {PropertiesUtils.DAYS, PropertiesUtils.HOURS, PropertiesUtils.MINUTES,
118 PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB, PropertiesUtils.TB, PropertiesUtils.PB,
119 PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN, PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN,
120 PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS};
121 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES);
122
123 String[] badDefaultProps = {PropertiesUtils.HADOOP_USER};
124 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES);
125 PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES);
126 }
127
128 @Override
129 protected String execute() throws CommandException {
130 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
131 WorkflowAppService wps = Services.get().get(WorkflowAppService.class);
132 try {
133 XLog.Info.get().setParameter(DagXLogInfoService.TOKEN, conf.get(OozieClient.LOG_TOKEN));
134 WorkflowApp app = wps.parseDef(conf);
135 XConfiguration protoActionConf = wps.createProtoActionConf(conf, true);
136 WorkflowLib workflowLib = Services.get().get(WorkflowStoreService.class).getWorkflowLibWithNoDB();
137
138 String user = conf.get(OozieClient.USER_NAME);
139 String group = ConfigUtils.getWithDeprecatedCheck(conf, OozieClient.JOB_ACL, OozieClient.GROUP_NAME, null);
140 URI uri = new URI(conf.get(OozieClient.APP_PATH));
141 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
142 Configuration fsConf = has.createJobConf(uri.getAuthority());
143 FileSystem fs = has.createFileSystem(user, uri, fsConf);
144
145 Path configDefault = null;
146 // app path could be a directory
147 Path path = new Path(uri.getPath());
148 if (!fs.isFile(path)) {
149 configDefault = new Path(path, CONFIG_DEFAULT);
150 } else {
151 configDefault = new Path(path.getParent(), CONFIG_DEFAULT);
152 }
153
154 if (fs.exists(configDefault)) {
155 try {
156 Configuration defaultConf = new XConfiguration(fs.open(configDefault));
157 PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES);
158 XConfiguration.injectDefaults(defaultConf, conf);
159 }
160 catch (IOException ex) {
161 throw new IOException("default configuration file, " + ex.getMessage(), ex);
162 }
163 }
164
165 PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES);
166
167 // Resolving all variables in the job properties.
168 // This ensures the Hadoop Configuration semantics is preserved.
169 XConfiguration resolvedVarsConf = new XConfiguration();
170 for (Map.Entry<String, String> entry : conf) {
171 resolvedVarsConf.set(entry.getKey(), conf.get(entry.getKey()));
172 }
173 conf = resolvedVarsConf;
174
175 WorkflowInstance wfInstance;
176 try {
177 wfInstance = workflowLib.createInstance(app, conf);
178 }
179 catch (WorkflowException e) {
180 throw new StoreException(e);
181 }
182
183 Configuration conf = wfInstance.getConf();
184 // System.out.println("WF INSTANCE CONF:");
185 // System.out.println(XmlUtils.prettyPrint(conf).toString());
186
187 WorkflowJobBean workflow = new WorkflowJobBean();
188 workflow.setId(wfInstance.getId());
189 workflow.setAppName(ELUtils.resolveAppName(app.getName(), conf));
190 workflow.setAppPath(conf.get(OozieClient.APP_PATH));
191 workflow.setConf(XmlUtils.prettyPrint(conf).toString());
192 workflow.setProtoActionConf(protoActionConf.toXmlString());
193 workflow.setCreatedTime(new Date());
194 workflow.setLastModifiedTime(new Date());
195 workflow.setLogToken(conf.get(OozieClient.LOG_TOKEN, ""));
196 workflow.setStatus(WorkflowJob.Status.PREP);
197 workflow.setRun(0);
198 workflow.setUser(conf.get(OozieClient.USER_NAME));
199 workflow.setGroup(conf.get(OozieClient.GROUP_NAME));
200 workflow.setWorkflowInstance(wfInstance);
201 workflow.setExternalId(conf.get(OozieClient.EXTERNAL_ID));
202 // Set parent id if it doesn't already have one (for subworkflows)
203 if (workflow.getParentId() == null) {
204 workflow.setParentId(conf.get(SubWorkflowActionExecutor.PARENT_ID));
205 }
206 // Set to coord action Id if workflow submitted through coordinator
207 if (workflow.getParentId() == null) {
208 workflow.setParentId(parentId);
209 }
210
211 LogUtils.setLogInfo(workflow, logInfo);
212 LOG = XLog.resetPrefix(LOG);
213 LOG.debug("Workflow record created, Status [{0}]", workflow.getStatus());
214 Element wfElem = XmlUtils.parseXml(app.getDefinition());
215 ELEvaluator evalSla = createELEvaluatorForGroup(conf, "wf-sla-submit");
216 String jobSlaXml = verifySlaElements(wfElem, evalSla);
217 if (!dryrun) {
218 writeSLARegistration(wfElem, jobSlaXml, workflow.getId(), workflow.getParentId(), workflow.getUser(),
219 workflow.getGroup(), workflow.getAppName(), LOG, evalSla);
220 workflow.setSlaXml(jobSlaXml);
221 // System.out.println("SlaXml :"+ slaXml);
222
223 //store.insertWorkflow(workflow);
224 insertList.add(workflow);
225 JPAService jpaService = Services.get().get(JPAService.class);
226 if (jpaService != null) {
227 try {
228 jpaService.execute(new BulkUpdateInsertJPAExecutor(null, insertList));
229 }
230 catch (JPAExecutorException je) {
231 throw new CommandException(je);
232 }
233 }
234 else {
235 LOG.error(ErrorCode.E0610);
236 return null;
237 }
238
239 return workflow.getId();
240 }
241 else {
242 return "OK";
243 }
244 }
245 catch (WorkflowException ex) {
246 throw new CommandException(ex);
247 }
248 catch (HadoopAccessorException ex) {
249 throw new CommandException(ex);
250 }
251 catch (Exception ex) {
252 throw new CommandException(ErrorCode.E0803, ex.getMessage(), ex);
253 }
254 }
255
256 private String verifySlaElements(Element eWfJob, ELEvaluator evalSla) throws CommandException {
257 String jobSlaXml = "";
258 // Validate WF job
259 Element eSla = XmlUtils.getSLAElement(eWfJob);
260 if (eSla != null) {
261 jobSlaXml = resolveSla(eSla, evalSla);
262 }
263
264 // Validate all actions
265 for (Element action : (List<Element>) eWfJob.getChildren("action", eWfJob.getNamespace())) {
266 eSla = XmlUtils.getSLAElement(eWfJob);
267 if (eSla != null) {
268 resolveSla(eSla, evalSla);
269 }
270 }
271 return jobSlaXml;
272 }
273
274 private void writeSLARegistration(Element eWfJob, String slaXml, String jobId, String parentId, String user,
275 String group, String appName, XLog log, ELEvaluator evalSla) throws CommandException {
276 try {
277 if (slaXml != null && slaXml.length() > 0) {
278 Element eSla = XmlUtils.parseXml(slaXml);
279 SLAEventBean slaEvent = SLADbOperations.createSlaRegistrationEvent(eSla, jobId,
280 SlaAppType.WORKFLOW_JOB, user, group, log);
281 if(slaEvent != null) {
282 insertList.add(slaEvent);
283 }
284 // insert into new table
285 SLAOperations.createSlaRegistrationEvent(eSla, jobId, parentId, AppType.WORKFLOW_JOB, user, appName,
286 log, false);
287 }
288 // Add sla for wf actions
289 for (Element action : (List<Element>) eWfJob.getChildren("action", eWfJob.getNamespace())) {
290 Element actionSla = XmlUtils.getSLAElement(action);
291 if (actionSla != null) {
292 String actionSlaXml = SubmitXCommand.resolveSla(actionSla, evalSla);
293 actionSla = XmlUtils.parseXml(actionSlaXml);
294 String actionId = Services.get().get(UUIDService.class)
295 .generateChildId(jobId, action.getAttributeValue("name") + "");
296 SLAOperations.createSlaRegistrationEvent(actionSla, actionId, jobId, AppType.WORKFLOW_ACTION,
297 user, appName, log, false);
298 }
299 }
300 }
301 catch (Exception e) {
302 e.printStackTrace();
303 throw new CommandException(ErrorCode.E1007, "workflow " + jobId, e.getMessage(), e);
304 }
305 }
306
307 /**
308 * Resolve variables in sla xml element.
309 *
310 * @param eSla sla xml element
311 * @param evalSla sla evaluator
312 * @return sla xml string after evaluation
313 * @throws CommandException
314 */
315 public static String resolveSla(Element eSla, ELEvaluator evalSla) throws CommandException {
316 // EL evaluation
317 String slaXml = XmlUtils.prettyPrint(eSla).toString();
318 try {
319 slaXml = XmlUtils.removeComments(slaXml);
320 slaXml = evalSla.evaluate(slaXml, String.class);
321 XmlUtils.validateData(slaXml, SchemaName.SLA_ORIGINAL);
322 return slaXml;
323 }
324 catch (Exception e) {
325 throw new CommandException(ErrorCode.E1004, "Validation error :" + e.getMessage(), e);
326 }
327 }
328
329 /**
330 * Create an EL evaluator for a given group.
331 *
332 * @param conf configuration variable
333 * @param group group variable
334 * @return the evaluator created for the group
335 */
336 public static ELEvaluator createELEvaluatorForGroup(Configuration conf, String group) {
337 ELEvaluator eval = Services.get().get(ELService.class).createEvaluator(group);
338 for (Map.Entry<String, String> entry : conf) {
339 eval.setVariable(entry.getKey(), entry.getValue());
340 }
341 return eval;
342 }
343
344 @Override
345 public String getEntityKey() {
346 return null;
347 }
348
349 @Override
350 protected boolean isLockRequired() {
351 return false;
352 }
353
354 @Override
355 protected void loadState() {
356
357 }
358
359 @Override
360 protected void verifyPrecondition() throws CommandException {
361
362 }
363
364 }