This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.wf;
019
020 import org.apache.hadoop.conf.Configuration;
021 import org.apache.hadoop.fs.Path;
022 import org.apache.hadoop.fs.FileSystem;
023 import org.apache.oozie.WorkflowJobBean;
024 import org.apache.oozie.ErrorCode;
025 import org.apache.oozie.service.HadoopAccessorException;
026 import org.apache.oozie.service.JPAService;
027 import org.apache.oozie.service.WorkflowStoreService;
028 import org.apache.oozie.service.WorkflowAppService;
029 import org.apache.oozie.service.HadoopAccessorService;
030 import org.apache.oozie.service.Services;
031 import org.apache.oozie.service.DagXLogInfoService;
032 import org.apache.oozie.util.XLog;
033 import org.apache.oozie.util.ParamChecker;
034 import org.apache.oozie.util.XConfiguration;
035 import org.apache.oozie.util.XmlUtils;
036 import org.apache.oozie.command.CommandException;
037 import org.apache.oozie.executor.jpa.WorkflowJobInsertJPAExecutor;
038 import org.apache.oozie.service.ELService;
039 import org.apache.oozie.service.SchemaService;
040 import org.apache.oozie.store.StoreException;
041 import org.apache.oozie.workflow.WorkflowApp;
042 import org.apache.oozie.workflow.WorkflowException;
043 import org.apache.oozie.workflow.WorkflowInstance;
044 import org.apache.oozie.workflow.WorkflowLib;
045 import org.apache.oozie.util.ELEvaluator;
046 import org.apache.oozie.util.InstrumentUtils;
047 import org.apache.oozie.util.PropertiesUtils;
048 import org.apache.oozie.util.db.SLADbOperations;
049 import org.apache.oozie.service.SchemaService.SchemaName;
050 import org.apache.oozie.client.OozieClient;
051 import org.apache.oozie.client.WorkflowJob;
052 import org.apache.oozie.client.SLAEvent.SlaAppType;
053 import org.jdom.Element;
054 import org.jdom.Namespace;
055
056 import java.util.Date;
057 import java.util.List;
058 import java.util.Map;
059 import java.util.Set;
060 import java.util.HashSet;
061 import java.io.IOException;
062 import java.net.URI;
063
064 public class SubmitXCommand extends WorkflowXCommand<String> {
065 public static final String CONFIG_DEFAULT = "config-default.xml";
066
067 private Configuration conf;
068 private String authToken;
069
070 public SubmitXCommand(Configuration conf, String authToken) {
071 super("submit", "submit", 1);
072 this.conf = ParamChecker.notNull(conf, "conf");
073 this.authToken = ParamChecker.notEmpty(authToken, "authToken");
074 }
075
076 private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>();
077 private static final Set<String> DISALLOWED_USER_PROPERTIES = new HashSet<String>();
078
079 static {
080 String[] badUserProps = {PropertiesUtils.DAYS, PropertiesUtils.HOURS, PropertiesUtils.MINUTES,
081 PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB, PropertiesUtils.TB, PropertiesUtils.PB,
082 PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN, PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN,
083 PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS};
084 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_USER_PROPERTIES);
085
086 String[] badDefaultProps = {PropertiesUtils.HADOOP_USER, PropertiesUtils.HADOOP_UGI,
087 WorkflowAppService.HADOOP_JT_KERBEROS_NAME, WorkflowAppService.HADOOP_NN_KERBEROS_NAME};
088 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES);
089 PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES);
090 }
091
092 @Override
093 protected String execute() throws CommandException {
094 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
095 WorkflowAppService wps = Services.get().get(WorkflowAppService.class);
096 try {
097 XLog.Info.get().setParameter(DagXLogInfoService.TOKEN, conf.get(OozieClient.LOG_TOKEN));
098 WorkflowApp app = wps.parseDef(conf, authToken);
099 XConfiguration protoActionConf = wps.createProtoActionConf(conf, authToken, true);
100 WorkflowLib workflowLib = Services.get().get(WorkflowStoreService.class).getWorkflowLibWithNoDB();
101
102 String user = conf.get(OozieClient.USER_NAME);
103 String group = conf.get(OozieClient.GROUP_NAME);
104 URI uri = new URI(conf.get(OozieClient.APP_PATH));
105 FileSystem fs = Services.get().get(HadoopAccessorService.class).createFileSystem(user,
106 group, uri, new Configuration());
107
108 Path configDefault = null;
109 // app path could be a directory
110 Path path = new Path(uri.getPath());
111 if (!fs.isFile(path)) {
112 configDefault = new Path(path, SubmitCommand.CONFIG_DEFAULT);
113 } else {
114 configDefault = new Path(path.getParent(), SubmitCommand.CONFIG_DEFAULT);
115 }
116
117 if (fs.exists(configDefault)) {
118 try {
119 Configuration defaultConf = new XConfiguration(fs.open(configDefault));
120 PropertiesUtils.checkDisallowedProperties(defaultConf, DISALLOWED_DEFAULT_PROPERTIES);
121 XConfiguration.injectDefaults(defaultConf, conf);
122 }
123 catch (IOException ex) {
124 throw new IOException("default configuration file, " + ex.getMessage(), ex);
125 }
126 }
127
128 PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_USER_PROPERTIES);
129
130 // Resolving all variables in the job properties.
131 // This ensures the Hadoop Configuration semantics is preserved.
132 XConfiguration resolvedVarsConf = new XConfiguration();
133 for (Map.Entry<String, String> entry : conf) {
134 resolvedVarsConf.set(entry.getKey(), conf.get(entry.getKey()));
135 }
136 conf = resolvedVarsConf;
137
138 WorkflowInstance wfInstance;
139 try {
140 wfInstance = workflowLib.createInstance(app, conf);
141 }
142 catch (WorkflowException e) {
143 throw new StoreException(e);
144 }
145
146 Configuration conf = wfInstance.getConf();
147 // System.out.println("WF INSTANCE CONF:");
148 // System.out.println(XmlUtils.prettyPrint(conf).toString());
149
150 WorkflowJobBean workflow = new WorkflowJobBean();
151 workflow.setId(wfInstance.getId());
152 workflow.setAppName(app.getName());
153 workflow.setAppPath(conf.get(OozieClient.APP_PATH));
154 workflow.setConf(XmlUtils.prettyPrint(conf).toString());
155 workflow.setProtoActionConf(protoActionConf.toXmlString());
156 workflow.setCreatedTime(new Date());
157 workflow.setLastModifiedTime(new Date());
158 workflow.setLogToken(conf.get(OozieClient.LOG_TOKEN, ""));
159 workflow.setStatus(WorkflowJob.Status.PREP);
160 workflow.setRun(0);
161 workflow.setUser(conf.get(OozieClient.USER_NAME));
162 workflow.setGroup(conf.get(OozieClient.GROUP_NAME));
163 workflow.setAuthToken(authToken);
164 workflow.setWorkflowInstance(wfInstance);
165 workflow.setExternalId(conf.get(OozieClient.EXTERNAL_ID));
166
167 //setLogInfo(workflow);
168 Element wfElem = XmlUtils.parseXml(app.getDefinition());
169 ELEvaluator evalSla = createELEvaluatorForGroup(conf, "wf-sla-submit");
170 String jobSlaXml = verifySlaElements(wfElem, evalSla);
171 writeSLARegistration(jobSlaXml, workflow.getId(), workflow.getUser(), workflow.getGroup(), LOG);
172 workflow.setSlaXml(jobSlaXml);
173 // System.out.println("SlaXml :"+ slaXml);
174
175 //store.insertWorkflow(workflow);
176 JPAService jpaService = Services.get().get(JPAService.class);
177 if (jpaService != null) {
178 jpaService.execute(new WorkflowJobInsertJPAExecutor(workflow));
179 }
180 else {
181 LOG.error(ErrorCode.E0610);
182 return null;
183 }
184
185 return workflow.getId();
186 }
187 catch (WorkflowException ex) {
188 throw new CommandException(ex);
189 }
190 catch (HadoopAccessorException ex) {
191 throw new CommandException(ex);
192 }
193 catch (Exception ex) {
194 throw new CommandException(ErrorCode.E0803, ex);
195 }
196 }
197
198 private String verifySlaElements(Element eWfJob, ELEvaluator evalSla) throws CommandException {
199 String jobSlaXml = "";
200 // Validate WF job
201 Element eSla = eWfJob.getChild("info", Namespace.getNamespace(SchemaService.SLA_NAME_SPACE_URI));
202 if (eSla != null) {
203 jobSlaXml = resolveSla(eSla, evalSla);
204 }
205
206 // Validate all actions
207 for (Element action : (List<Element>) eWfJob.getChildren("action", eWfJob.getNamespace())) {
208 eSla = action.getChild("info", Namespace.getNamespace(SchemaService.SLA_NAME_SPACE_URI));
209 if (eSla != null) {
210 resolveSla(eSla, evalSla);
211 }
212 }
213 return jobSlaXml;
214 }
215
216 private void writeSLARegistration(String slaXml, String id, String user, String group, XLog log)
217 throws CommandException {
218 try {
219 if (slaXml != null && slaXml.length() > 0) {
220 Element eSla = XmlUtils.parseXml(slaXml);
221 SLADbOperations.writeSlaRegistrationEvent(eSla, id, SlaAppType.WORKFLOW_JOB, user, group, log);
222 }
223 }
224 catch (Exception e) {
225 e.printStackTrace();
226 throw new CommandException(ErrorCode.E1007, "workflow " + id, e);
227 }
228 }
229
230 /**
231 * Resolve variables in sla xml element.
232 *
233 * @param eSla sla xml element
234 * @param evalSla sla evaluator
235 * @return sla xml string after evaluation
236 * @throws CommandException
237 */
238 public static String resolveSla(Element eSla, ELEvaluator evalSla) throws CommandException {
239 // EL evaluation
240 String slaXml = XmlUtils.prettyPrint(eSla).toString();
241 try {
242 slaXml = XmlUtils.removeComments(slaXml);
243 slaXml = evalSla.evaluate(slaXml, String.class);
244 XmlUtils.validateData(slaXml, SchemaName.SLA_ORIGINAL);
245 return slaXml;
246 }
247 catch (Exception e) {
248 throw new CommandException(ErrorCode.E1004, "Validation erro :" + e.getMessage(), e);
249 }
250 }
251
252 /**
253 * Create an EL evaluator for a given group.
254 *
255 * @param conf configuration variable
256 * @param group group variable
257 * @return the evaluator created for the group
258 */
259 public static ELEvaluator createELEvaluatorForGroup(Configuration conf, String group) {
260 ELEvaluator eval = Services.get().get(ELService.class).createEvaluator(group);
261 for (Map.Entry<String, String> entry : conf) {
262 eval.setVariable(entry.getKey(), entry.getValue());
263 }
264 return eval;
265 }
266
267 @Override
268 protected String getEntityKey() {
269 return null;
270 }
271
272 @Override
273 protected boolean isLockRequired() {
274 return false;
275 }
276
277 @Override
278 protected void loadState() {
279
280 }
281
282 @Override
283 protected void verifyPrecondition() throws CommandException {
284
285 }
286
287 }