This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie;
019
020 import org.apache.oozie.util.XLogStreamer;
021 import org.apache.oozie.service.XLogService;
022 import org.apache.oozie.service.DagXLogInfoService;
023 import org.apache.hadoop.conf.Configuration;
024 import org.apache.oozie.client.CoordinatorJob;
025 import org.apache.oozie.client.WorkflowJob;
026 import org.apache.oozie.client.OozieClient;
027 import org.apache.oozie.command.CommandException;
028 import org.apache.oozie.command.wf.CompletedActionXCommand;
029 import org.apache.oozie.command.wf.DefinitionXCommand;
030 import org.apache.oozie.command.wf.ExternalIdXCommand;
031 import org.apache.oozie.command.wf.JobXCommand;
032 import org.apache.oozie.command.wf.JobsXCommand;
033 import org.apache.oozie.command.wf.KillXCommand;
034 import org.apache.oozie.command.wf.ReRunXCommand;
035 import org.apache.oozie.command.wf.ResumeXCommand;
036 import org.apache.oozie.command.wf.StartXCommand;
037 import org.apache.oozie.command.wf.SubmitHttpXCommand;
038 import org.apache.oozie.command.wf.SubmitMRXCommand;
039 import org.apache.oozie.command.wf.SubmitPigXCommand;
040 import org.apache.oozie.command.wf.SubmitXCommand;
041 import org.apache.oozie.command.wf.SuspendXCommand;
042 import org.apache.oozie.command.wf.WorkflowActionInfoXCommand;
043 import org.apache.oozie.service.Services;
044 import org.apache.oozie.service.CallableQueueService;
045 import org.apache.oozie.util.ParamChecker;
046 import org.apache.oozie.util.XCallable;
047 import org.apache.oozie.util.XLog;
048
049 import java.io.Writer;
050 import java.util.Date;
051 import java.util.List;
052 import java.util.Properties;
053 import java.util.Set;
054 import java.util.HashSet;
055 import java.util.StringTokenizer;
056 import java.util.Map;
057 import java.util.HashMap;
058 import java.util.ArrayList;
059 import java.io.IOException;
060
061 /**
062 * The DagEngine provides all the DAG engine functionality for WS calls.
063 */
064 public class DagEngine extends BaseEngine {
065
066 private static final int HIGH_PRIORITY = 2;
067 private static XLog LOG = XLog.getLog(DagEngine.class);
068
069 /**
070 * Create a system Dag engine, with no user and no group.
071 */
072 public DagEngine() {
073 if (Services.get().getConf().getBoolean(USE_XCOMMAND, true) == false) {
074 LOG.debug("Oozie DagEngine is not using XCommands.");
075 }
076 else {
077 LOG.debug("Oozie DagEngine is using XCommands.");
078 }
079 }
080
081 /**
082 * Create a Dag engine to perform operations on behave of a user.
083 *
084 * @param user user name.
085 * @param authToken the authentication token.
086 */
087 public DagEngine(String user, String authToken) {
088 this();
089
090 this.user = ParamChecker.notEmpty(user, "user");
091 this.authToken = ParamChecker.notEmpty(authToken, "authToken");
092 }
093
094 /**
095 * Submit a workflow job. <p/> It validates configuration properties.
096 *
097 * @param conf job configuration.
098 * @param startJob indicates if the job should be started or not.
099 * @return the job Id.
100 * @throws DagEngineException thrown if the job could not be created.
101 */
102 @Override
103 public String submitJob(Configuration conf, boolean startJob) throws DagEngineException {
104 validateSubmitConfiguration(conf);
105
106 try {
107 String jobId;
108 SubmitXCommand submit = new SubmitXCommand(conf, getAuthToken());
109 jobId = submit.call();
110 if (startJob) {
111 start(jobId);
112 }
113 return jobId;
114 }
115 catch (CommandException ex) {
116 throw new DagEngineException(ex);
117 }
118 }
119
120 /**
121 * Submit a pig/mapreduce job through HTTP.
122 * <p/>
123 * It validates configuration properties.
124 *
125 * @param conf job configuration.
126 * @param jobType job type - can be "pig" or "mapreduce".
127 * @return the job Id.
128 * @throws DagEngineException thrown if the job could not be created.
129 */
130 public String submitHttpJob(Configuration conf, String jobType) throws DagEngineException {
131 validateSubmitConfiguration(conf);
132
133 try {
134 String jobId;
135 SubmitHttpXCommand submit = null;
136 if (jobType.equals("pig")) {
137 submit = new SubmitPigXCommand(conf, getAuthToken());
138 }
139 else if (jobType.equals("mapreduce")) {
140 submit = new SubmitMRXCommand(conf, getAuthToken());
141 }
142
143 jobId = submit.call();
144 start(jobId);
145 return jobId;
146 }
147 catch (CommandException ex) {
148 throw new DagEngineException(ex);
149 }
150 }
151
152 private void validateSubmitConfiguration(Configuration conf) throws DagEngineException {
153 if (conf.get(OozieClient.APP_PATH) == null) {
154 throw new DagEngineException(ErrorCode.E0401, OozieClient.APP_PATH);
155 }
156 }
157
158 /**
159 * Start a job.
160 *
161 * @param jobId job Id.
162 * @throws DagEngineException thrown if the job could not be started.
163 */
164 @Override
165 public void start(String jobId) throws DagEngineException {
166 // Changing to synchronous call from asynchronous queuing to prevent the
167 // loss of command if the queue is full or the queue is lost in case of
168 // failure.
169 try {
170 new StartXCommand(jobId).call();
171 }
172 catch (CommandException e) {
173 throw new DagEngineException(e);
174 }
175 }
176
177 /**
178 * Resume a job.
179 *
180 * @param jobId job Id.
181 * @throws DagEngineException thrown if the job could not be resumed.
182 */
183 @Override
184 public void resume(String jobId) throws DagEngineException {
185 // Changing to synchronous call from asynchronous queuing to prevent the
186 // loss of command if the queue is full or the queue is lost in case of
187 // failure.
188 try {
189 new ResumeXCommand(jobId).call();
190 }
191 catch (CommandException e) {
192 throw new DagEngineException(e);
193 }
194 }
195
196 /**
197 * Suspend a job.
198 *
199 * @param jobId job Id.
200 * @throws DagEngineException thrown if the job could not be suspended.
201 */
202 @Override
203 public void suspend(String jobId) throws DagEngineException {
204 // Changing to synchronous call from asynchronous queuing to prevent the
205 // loss of command if the queue is full or the queue is lost in case of
206 // failure.
207 try {
208 new SuspendXCommand(jobId).call();
209 }
210 catch (CommandException e) {
211 throw new DagEngineException(e);
212 }
213 }
214
215 /**
216 * Kill a job.
217 *
218 * @param jobId job Id.
219 * @throws DagEngineException thrown if the job could not be killed.
220 */
221 @Override
222 public void kill(String jobId) throws DagEngineException {
223 // Changing to synchronous call from asynchronous queuing to prevent the
224 // loss of command if the queue is full or the queue is lost in case of
225 // failure.
226 try {
227 new KillXCommand(jobId).call();
228 LOG.info("User " + user + " killed the WF job " + jobId);
229 }
230 catch (CommandException e) {
231 throw new DagEngineException(e);
232 }
233 }
234
235 /* (non-Javadoc)
236 * @see org.apache.oozie.BaseEngine#change(java.lang.String, java.lang.String)
237 */
238 @Override
239 public void change(String jobId, String changeValue) throws DagEngineException {
240 // This code should not be reached.
241 throw new DagEngineException(ErrorCode.E1017);
242 }
243
244 /**
245 * Rerun a job.
246 *
247 * @param jobId job Id to rerun.
248 * @param conf configuration information for the rerun.
249 * @throws DagEngineException thrown if the job could not be rerun.
250 */
251 @Override
252 public void reRun(String jobId, Configuration conf) throws DagEngineException {
253 try {
254 validateReRunConfiguration(conf);
255 new ReRunXCommand(jobId, conf, getAuthToken()).call();
256 start(jobId);
257 }
258 catch (CommandException ex) {
259 throw new DagEngineException(ex);
260 }
261 }
262
263 private void validateReRunConfiguration(Configuration conf) throws DagEngineException {
264 if (conf.get(OozieClient.APP_PATH) == null) {
265 throw new DagEngineException(ErrorCode.E0401, OozieClient.APP_PATH);
266 }
267 if (conf.get(OozieClient.RERUN_SKIP_NODES) == null && conf.get(OozieClient.RERUN_FAIL_NODES) == null) {
268 throw new DagEngineException(ErrorCode.E0401, OozieClient.RERUN_SKIP_NODES + " OR "
269 + OozieClient.RERUN_FAIL_NODES);
270 }
271 if (conf.get(OozieClient.RERUN_SKIP_NODES) != null && conf.get(OozieClient.RERUN_FAIL_NODES) != null) {
272 throw new DagEngineException(ErrorCode.E0404, OozieClient.RERUN_SKIP_NODES + " OR "
273 + OozieClient.RERUN_FAIL_NODES);
274 }
275 }
276
277 /**
278 * Process an action callback.
279 *
280 * @param actionId the action Id.
281 * @param externalStatus the action external status.
282 * @param actionData the action output data, <code>null</code> if none.
283 * @throws DagEngineException thrown if the callback could not be processed.
284 */
285 public void processCallback(String actionId, String externalStatus, Properties actionData)
286 throws DagEngineException {
287 XLog.Info.get().clearParameter(XLogService.GROUP);
288 XLog.Info.get().clearParameter(XLogService.USER);
289 XCallable<Void> command = null;
290
291 command = new CompletedActionXCommand(actionId, externalStatus,
292 actionData, HIGH_PRIORITY);
293 if (!Services.get().get(CallableQueueService.class).queue(command)) {
294 LOG.warn(XLog.OPS, "queue is full or system is in SAFEMODE, ignoring callback");
295 }
296 }
297
298 /**
299 * Return the info about a job.
300 *
301 * @param jobId job Id.
302 * @return the workflow job info.
303 * @throws DagEngineException thrown if the job info could not be obtained.
304 */
305 @Override
306 public WorkflowJob getJob(String jobId) throws DagEngineException {
307 try {
308 return new JobXCommand(jobId).call();
309 }
310 catch (CommandException ex) {
311 throw new DagEngineException(ex);
312 }
313 }
314
315 /**
316 * Return the info about a job with actions subset.
317 *
318 * @param jobId job Id
319 * @param start starting from this index in the list of actions belonging to the job
320 * @param length number of actions to be returned
321 * @return the workflow job info.
322 * @throws DagEngineException thrown if the job info could not be obtained.
323 */
324 @Override
325 public WorkflowJob getJob(String jobId, int start, int length) throws DagEngineException {
326 try {
327 return new JobXCommand(jobId, start, length).call();
328 }
329 catch (CommandException ex) {
330 throw new DagEngineException(ex);
331 }
332 }
333
334 /**
335 * Return the a job definition.
336 *
337 * @param jobId job Id.
338 * @return the job definition.
339 * @throws DagEngineException thrown if the job definition could no be obtained.
340 */
341 @Override
342 public String getDefinition(String jobId) throws DagEngineException {
343 try {
344 return new DefinitionXCommand(jobId).call();
345 }
346 catch (CommandException ex) {
347 throw new DagEngineException(ex);
348 }
349 }
350
351 /**
352 * Stream the log of a job.
353 *
354 * @param jobId job Id.
355 * @param writer writer to stream the log to.
356 * @throws IOException thrown if the log cannot be streamed.
357 * @throws DagEngineException thrown if there is error in getting the Workflow Information for jobId.
358 */
359 @Override
360 public void streamLog(String jobId, Writer writer) throws IOException, DagEngineException {
361 XLogStreamer.Filter filter = new XLogStreamer.Filter();
362 filter.setParameter(DagXLogInfoService.JOB, jobId);
363 WorkflowJob job = getJob(jobId);
364 Date lastTime = job.getEndTime();
365 if (lastTime == null) {
366 lastTime = job.getLastModifiedTime();
367 }
368 Services.get().get(XLogService.class).streamLog(filter, job.getCreatedTime(), lastTime, writer);
369 }
370
371 private static final Set<String> FILTER_NAMES = new HashSet<String>();
372
373 static {
374 FILTER_NAMES.add(OozieClient.FILTER_USER);
375 FILTER_NAMES.add(OozieClient.FILTER_NAME);
376 FILTER_NAMES.add(OozieClient.FILTER_GROUP);
377 FILTER_NAMES.add(OozieClient.FILTER_STATUS);
378 FILTER_NAMES.add(OozieClient.FILTER_ID);
379 }
380
381 /**
382 * Validate a jobs filter.
383 *
384 * @param filter filter to validate.
385 * @return the parsed filter.
386 * @throws DagEngineException thrown if the filter is invalid.
387 */
388 protected Map<String, List<String>> parseFilter(String filter) throws DagEngineException {
389 Map<String, List<String>> map = new HashMap<String, List<String>>();
390 if (filter != null) {
391 StringTokenizer st = new StringTokenizer(filter, ";");
392 while (st.hasMoreTokens()) {
393 String token = st.nextToken();
394 if (token.contains("=")) {
395 String[] pair = token.split("=");
396 if (pair.length != 2) {
397 throw new DagEngineException(ErrorCode.E0420, filter, "elements must be name=value pairs");
398 }
399 if (!FILTER_NAMES.contains(pair[0])) {
400 throw new DagEngineException(ErrorCode.E0420, filter, XLog
401 .format("invalid name [{0}]", pair[0]));
402 }
403 if (pair[0].equals("status")) {
404 try {
405 WorkflowJob.Status.valueOf(pair[1]);
406 }
407 catch (IllegalArgumentException ex) {
408 throw new DagEngineException(ErrorCode.E0420, filter, XLog.format("invalid status [{0}]",
409 pair[1]));
410 }
411 }
412 List<String> list = map.get(pair[0]);
413 if (list == null) {
414 list = new ArrayList<String>();
415 map.put(pair[0], list);
416 }
417 list.add(pair[1]);
418 }
419 else {
420 throw new DagEngineException(ErrorCode.E0420, filter, "elements must be name=value pairs");
421 }
422 }
423 }
424 return map;
425 }
426
427 /**
428 * Return the info about a set of jobs.
429 *
430 * @param filter job filter. Refer to the {@link org.apache.oozie.client.OozieClient} for the filter syntax.
431 * @param start offset, base 1.
432 * @param len number of jobs to return.
433 * @return job info for all matching jobs, the jobs don't contain node action information.
434 * @throws DagEngineException thrown if the jobs info could not be obtained.
435 */
436 public WorkflowsInfo getJobs(String filter, int start, int len) throws DagEngineException {
437 Map<String, List<String>> filterList = parseFilter(filter);
438 try {
439 return new JobsXCommand(filterList, start, len).call();
440 }
441 catch (CommandException dce) {
442 throw new DagEngineException(dce);
443 }
444 }
445
446 /**
447 * Return the workflow Job ID for an external ID. <p/> This is reverse lookup for recovery purposes.
448 *
449 * @param externalId external ID provided at job submission time.
450 * @return the associated workflow job ID if any, <code>null</code> if none.
451 * @throws DagEngineException thrown if the lookup could not be done.
452 */
453 @Override
454 public String getJobIdForExternalId(String externalId) throws DagEngineException {
455 try {
456 return new ExternalIdXCommand(externalId).call();
457 }
458 catch (CommandException dce) {
459 throw new DagEngineException(dce);
460 }
461 }
462
463 @Override
464 public CoordinatorJob getCoordJob(String jobId) throws BaseEngineException {
465 throw new BaseEngineException(new XException(ErrorCode.E0301, "cannot get a coordinator job from DagEngine"));
466 }
467
468 @Override
469 public CoordinatorJob getCoordJob(String jobId, String filter, int start, int length) throws BaseEngineException {
470 throw new BaseEngineException(new XException(ErrorCode.E0301, "cannot get a coordinator job from DagEngine"));
471 }
472
473 public WorkflowActionBean getWorkflowAction(String actionId) throws BaseEngineException {
474 try {
475 return new WorkflowActionInfoXCommand(actionId).call();
476 }
477 catch (CommandException ex) {
478 throw new BaseEngineException(ex);
479 }
480 }
481
482 /* (non-Javadoc)
483 * @see org.apache.oozie.BaseEngine#dryRunSubmit(org.apache.hadoop.conf.Configuration)
484 */
485 @Override
486 public String dryRunSubmit(Configuration conf) throws BaseEngineException {
487 try {
488 SubmitXCommand submit = new SubmitXCommand(true, conf, getAuthToken());
489 return submit.call();
490 } catch (CommandException ex) {
491 throw new DagEngineException(ex);
492 }
493 }
494 }