This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.wf;
019
020 import org.apache.hadoop.conf.Configuration;
021 import org.apache.hadoop.fs.Path;
022 import org.apache.hadoop.fs.FileSystem;
023 import org.apache.oozie.SLAEventBean;
024 import org.apache.oozie.WorkflowJobBean;
025 import org.apache.oozie.ErrorCode;
026 import org.apache.oozie.service.HadoopAccessorException;
027 import org.apache.oozie.service.JPAService;
028 import org.apache.oozie.service.WorkflowStoreService;
029 import org.apache.oozie.service.WorkflowAppService;
030 import org.apache.oozie.service.HadoopAccessorService;
031 import org.apache.oozie.service.Services;
032 import org.apache.oozie.service.DagXLogInfoService;
033 import org.apache.oozie.util.ConfigUtils;
034 import org.apache.oozie.util.ELUtils;
035 import org.apache.oozie.util.LogUtils;
036 import org.apache.oozie.util.XLog;
037 import org.apache.oozie.util.ParamChecker;
038 import org.apache.oozie.util.XConfiguration;
039 import org.apache.oozie.util.XmlUtils;
040 import org.apache.oozie.command.CommandException;
041 import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
042 import org.apache.oozie.executor.jpa.JPAExecutorException;
043 import org.apache.oozie.service.ELService;
044 import org.apache.oozie.service.SchemaService;
045 import org.apache.oozie.store.StoreException;
046 import org.apache.oozie.workflow.WorkflowApp;
047 import org.apache.oozie.workflow.WorkflowException;
048 import org.apache.oozie.workflow.WorkflowInstance;
049 import org.apache.oozie.workflow.WorkflowLib;
050 import org.apache.oozie.util.ELEvaluator;
051 import org.apache.oozie.util.InstrumentUtils;
052 import org.apache.oozie.util.PropertiesUtils;
053 import org.apache.oozie.util.db.SLADbOperations;
054 import org.apache.oozie.service.SchemaService.SchemaName;
055 import org.apache.oozie.client.OozieClient;
056 import org.apache.oozie.client.WorkflowJob;
057 import org.apache.oozie.client.SLAEvent.SlaAppType;
058 import org.apache.oozie.client.rest.JsonBean;
059 import org.jdom.Element;
060 import org.jdom.Namespace;
061
062 import java.util.ArrayList;
063 import java.util.Date;
064 import java.util.List;
065 import java.util.Map;
066 import java.util.Set;
067 import java.util.HashSet;
068 import java.io.IOException;
069 import java.net.URI;
070
071 public class SubmitXCommand extends WorkflowXCommand<String> {
072 public static final String CONFIG_DEFAULT = "config-default.xml";
073
074 private Configuration conf;
075 private String authToken;
076 private List<JsonBean> insertList = new ArrayList<JsonBean>();
077
078 /**
079 * Constructor to create the workflow Submit Command.
080 *
081 * @param conf : Configuration for workflow job
082 * @param authToken : To be used for authentication
083 */
084 public SubmitXCommand(Configuration conf, String authToken) {
085 super("submit", "submit", 1);
086 this.conf = ParamChecker.notNull(conf, "conf");
087 this.authToken = ParamChecker.notEmpty(authToken, "authToken");
088 }
089
090 /**
091 * Constructor to create the workflow Submit Command.
092 *
093 * @param dryrun : if dryrun
094 * @param conf : Configuration for workflow job
095 * @param authToken : To be used for authentication
096 */
097 public SubmitXCommand(boolean dryrun, Configuration conf, String authToken) {
098 this(conf, authToken);
099 this.dryrun = dryrun;
100 }
101
102 private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>();
103 private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>();
104
105 static {
106 String[] badUserProps = {PropertiesUtils.DAYS, PropertiesUtils.HOURS, PropertiesUtils.MINUTES,
107 PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB, PropertiesUtils.TB, PropertiesUtils.PB,
108 PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN, PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN,
109 PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS};
110 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES);
111
112 String[] badDefaultProps = {PropertiesUtils.HADOOP_USER};
113 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES);
114 PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES);
115 }
116
117 @Override
118 protected String execute() throws CommandException {
119 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
120 WorkflowAppService wps = Services.get().get(WorkflowAppService.class);
121 try {
122 XLog.Info.get().setParameter(DagXLogInfoService.TOKEN, conf.get(OozieClient.LOG_TOKEN));
123 WorkflowApp app = wps.parseDef(conf, authToken);
124 XConfiguration protoActionConf = wps.createProtoActionConf(conf, authToken, true);
125 WorkflowLib workflowLib = Services.get().get(WorkflowStoreService.class).getWorkflowLibWithNoDB();
126
127 String user = conf.get(OozieClient.USER_NAME);
128 String group = ConfigUtils.getWithDeprecatedCheck(conf, OozieClient.JOB_ACL, OozieClient.GROUP_NAME, null);
129 URI uri = new URI(conf.get(OozieClient.APP_PATH));
130 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
131 Configuration fsConf = has.createJobConf(uri.getAuthority());
132 FileSystem fs = has.createFileSystem(user, uri, fsConf);
133
134 Path configDefault = null;
135 // app path could be a directory
136 Path path = new Path(uri.getPath());
137 if (!fs.isFile(path)) {
138 configDefault = new Path(path, CONFIG_DEFAULT);
139 } else {
140 configDefault = new Path(path.getParent(), CONFIG_DEFAULT);
141 }
142
143 if (fs.exists(configDefault)) {
144 try {
145 Configuration defaultConf = new XConfiguration(fs.open(configDefault));
146 PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES);
147 XConfiguration.injectDefaults(defaultConf, conf);
148 }
149 catch (IOException ex) {
150 throw new IOException("default configuration file, " + ex.getMessage(), ex);
151 }
152 }
153
154 PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES);
155
156 // Resolving all variables in the job properties.
157 // This ensures the Hadoop Configuration semantics is preserved.
158 XConfiguration resolvedVarsConf = new XConfiguration();
159 for (Map.Entry<String, String> entry : conf) {
160 resolvedVarsConf.set(entry.getKey(), conf.get(entry.getKey()));
161 }
162 conf = resolvedVarsConf;
163
164 WorkflowInstance wfInstance;
165 try {
166 wfInstance = workflowLib.createInstance(app, conf);
167 }
168 catch (WorkflowException e) {
169 throw new StoreException(e);
170 }
171
172 Configuration conf = wfInstance.getConf();
173 // System.out.println("WF INSTANCE CONF:");
174 // System.out.println(XmlUtils.prettyPrint(conf).toString());
175
176 WorkflowJobBean workflow = new WorkflowJobBean();
177 workflow.setId(wfInstance.getId());
178 workflow.setAppName(ELUtils.resolveAppName(app.getName(), conf));
179 workflow.setAppPath(conf.get(OozieClient.APP_PATH));
180 workflow.setConf(XmlUtils.prettyPrint(conf).toString());
181 workflow.setProtoActionConf(protoActionConf.toXmlString());
182 workflow.setCreatedTime(new Date());
183 workflow.setLastModifiedTime(new Date());
184 workflow.setLogToken(conf.get(OozieClient.LOG_TOKEN, ""));
185 workflow.setStatus(WorkflowJob.Status.PREP);
186 workflow.setRun(0);
187 workflow.setUser(conf.get(OozieClient.USER_NAME));
188 workflow.setGroup(conf.get(OozieClient.GROUP_NAME));
189 workflow.setAuthToken(authToken);
190 workflow.setWorkflowInstance(wfInstance);
191 workflow.setExternalId(conf.get(OozieClient.EXTERNAL_ID));
192
193 LogUtils.setLogInfo(workflow, logInfo);
194 LOG = XLog.resetPrefix(LOG);
195 LOG.debug("Workflow record created, Status [{0}]", workflow.getStatus());
196 Element wfElem = XmlUtils.parseXml(app.getDefinition());
197 ELEvaluator evalSla = createELEvaluatorForGroup(conf, "wf-sla-submit");
198 String jobSlaXml = verifySlaElements(wfElem, evalSla);
199 if (!dryrun) {
200 writeSLARegistration(jobSlaXml, workflow.getId(), workflow.getUser(), workflow.getGroup(), LOG);
201 workflow.setSlaXml(jobSlaXml);
202 // System.out.println("SlaXml :"+ slaXml);
203
204 //store.insertWorkflow(workflow);
205 insertList.add(workflow);
206 JPAService jpaService = Services.get().get(JPAService.class);
207 if (jpaService != null) {
208 try {
209 jpaService.execute(new BulkUpdateInsertJPAExecutor(null, insertList));
210 }
211 catch (JPAExecutorException je) {
212 throw new CommandException(je);
213 }
214 }
215 else {
216 LOG.error(ErrorCode.E0610);
217 return null;
218 }
219
220 return workflow.getId();
221 }
222 else {
223 return "OK";
224 }
225 }
226 catch (WorkflowException ex) {
227 throw new CommandException(ex);
228 }
229 catch (HadoopAccessorException ex) {
230 throw new CommandException(ex);
231 }
232 catch (Exception ex) {
233 throw new CommandException(ErrorCode.E0803, ex.getMessage(), ex);
234 }
235 }
236
237 private String verifySlaElements(Element eWfJob, ELEvaluator evalSla) throws CommandException {
238 String jobSlaXml = "";
239 // Validate WF job
240 Element eSla = eWfJob.getChild("info", Namespace.getNamespace(SchemaService.SLA_NAME_SPACE_URI));
241 if (eSla != null) {
242 jobSlaXml = resolveSla(eSla, evalSla);
243 }
244
245 // Validate all actions
246 for (Element action : (List<Element>) eWfJob.getChildren("action", eWfJob.getNamespace())) {
247 eSla = action.getChild("info", Namespace.getNamespace(SchemaService.SLA_NAME_SPACE_URI));
248 if (eSla != null) {
249 resolveSla(eSla, evalSla);
250 }
251 }
252 return jobSlaXml;
253 }
254
255 private void writeSLARegistration(String slaXml, String id, String user, String group, XLog log)
256 throws CommandException {
257 try {
258 if (slaXml != null && slaXml.length() > 0) {
259 Element eSla = XmlUtils.parseXml(slaXml);
260 SLAEventBean slaEvent = SLADbOperations.createSlaRegistrationEvent(eSla, id,
261 SlaAppType.WORKFLOW_JOB, user, group, log);
262 if(slaEvent != null) {
263 insertList.add(slaEvent);
264 }
265 }
266 }
267 catch (Exception e) {
268 e.printStackTrace();
269 throw new CommandException(ErrorCode.E1007, "workflow " + id, e.getMessage(), e);
270 }
271 }
272
273 /**
274 * Resolve variables in sla xml element.
275 *
276 * @param eSla sla xml element
277 * @param evalSla sla evaluator
278 * @return sla xml string after evaluation
279 * @throws CommandException
280 */
281 public static String resolveSla(Element eSla, ELEvaluator evalSla) throws CommandException {
282 // EL evaluation
283 String slaXml = XmlUtils.prettyPrint(eSla).toString();
284 try {
285 slaXml = XmlUtils.removeComments(slaXml);
286 slaXml = evalSla.evaluate(slaXml, String.class);
287 XmlUtils.validateData(slaXml, SchemaName.SLA_ORIGINAL);
288 return slaXml;
289 }
290 catch (Exception e) {
291 throw new CommandException(ErrorCode.E1004, "Validation erro :" + e.getMessage(), e);
292 }
293 }
294
295 /**
296 * Create an EL evaluator for a given group.
297 *
298 * @param conf configuration variable
299 * @param group group variable
300 * @return the evaluator created for the group
301 */
302 public static ELEvaluator createELEvaluatorForGroup(Configuration conf, String group) {
303 ELEvaluator eval = Services.get().get(ELService.class).createEvaluator(group);
304 for (Map.Entry<String, String> entry : conf) {
305 eval.setVariable(entry.getKey(), entry.getValue());
306 }
307 return eval;
308 }
309
310 @Override
311 public String getEntityKey() {
312 return null;
313 }
314
315 @Override
316 protected boolean isLockRequired() {
317 return false;
318 }
319
320 @Override
321 protected void loadState() {
322
323 }
324
325 @Override
326 protected void verifyPrecondition() throws CommandException {
327
328 }
329
330 }