This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie;
019
020 import org.apache.oozie.util.XLogStreamer;
021 import org.apache.oozie.service.XLogService;
022 import org.apache.oozie.service.DagXLogInfoService;
023 import org.apache.hadoop.conf.Configuration;
024 import org.apache.oozie.client.CoordinatorJob;
025 import org.apache.oozie.client.WorkflowJob;
026 import org.apache.oozie.client.OozieClient;
027 import org.apache.oozie.command.CommandException;
028 import org.apache.oozie.command.XCommand;
029 import org.apache.oozie.command.wf.CompletedActionCommand;
030 import org.apache.oozie.command.wf.CompletedActionXCommand;
031 import org.apache.oozie.command.wf.DefinitionCommand;
032 import org.apache.oozie.command.wf.DefinitionXCommand;
033 import org.apache.oozie.command.wf.ExternalIdCommand;
034 import org.apache.oozie.command.wf.ExternalIdXCommand;
035 import org.apache.oozie.command.wf.JobCommand;
036 import org.apache.oozie.command.wf.JobXCommand;
037 import org.apache.oozie.command.wf.JobsCommand;
038 import org.apache.oozie.command.wf.JobsXCommand;
039 import org.apache.oozie.command.wf.KillCommand;
040 import org.apache.oozie.command.wf.KillXCommand;
041 import org.apache.oozie.command.wf.ReRunCommand;
042 import org.apache.oozie.command.wf.ReRunXCommand;
043 import org.apache.oozie.command.wf.ResumeCommand;
044 import org.apache.oozie.command.wf.ResumeXCommand;
045 import org.apache.oozie.command.wf.StartCommand;
046 import org.apache.oozie.command.wf.StartXCommand;
047 import org.apache.oozie.command.wf.SubmitCommand;
048 import org.apache.oozie.command.wf.SubmitHttpCommand;
049 import org.apache.oozie.command.wf.SubmitHttpXCommand;
050 import org.apache.oozie.command.wf.SubmitMRCommand;
051 import org.apache.oozie.command.wf.SubmitMRXCommand;
052 import org.apache.oozie.command.wf.SubmitPigCommand;
053 import org.apache.oozie.command.wf.SubmitPigXCommand;
054 import org.apache.oozie.command.wf.SubmitXCommand;
055 import org.apache.oozie.command.wf.SuspendCommand;
056 import org.apache.oozie.command.wf.SuspendXCommand;
057 import org.apache.oozie.command.wf.WorkflowActionInfoCommand;
058 import org.apache.oozie.command.wf.WorkflowActionInfoXCommand;
059 import org.apache.oozie.service.Services;
060 import org.apache.oozie.service.CallableQueueService;
061 import org.apache.oozie.util.ParamChecker;
062 import org.apache.oozie.util.XCallable;
063 import org.apache.oozie.util.XLog;
064
065 import java.io.Writer;
066 import java.util.Date;
067 import java.util.List;
068 import java.util.Properties;
069 import java.util.Set;
070 import java.util.HashSet;
071 import java.util.StringTokenizer;
072 import java.util.Map;
073 import java.util.HashMap;
074 import java.util.ArrayList;
075 import java.io.IOException;
076
077 /**
078 * The DagEngine provides all the DAG engine functionality for WS calls.
079 */
080 public class DagEngine extends BaseEngine {
081
082 private static final int HIGH_PRIORITY = 2;
083 private boolean useXCommand = true;
084 private static XLog LOG = XLog.getLog(DagEngine.class);
085
086 /**
087 * Create a system Dag engine, with no user and no group.
088 */
089 public DagEngine() {
090 if (Services.get().getConf().getBoolean(USE_XCOMMAND, true) == false) {
091 useXCommand = false;
092 LOG.debug("Oozie DagEngine is not using XCommands.");
093 }
094 else {
095 LOG.debug("Oozie DagEngine is using XCommands.");
096 }
097 }
098
099 /**
100 * Create a Dag engine to perform operations on behave of a user.
101 *
102 * @param user user name.
103 * @param authToken the authentication token.
104 */
105 public DagEngine(String user, String authToken) {
106 this();
107
108 this.user = ParamChecker.notEmpty(user, "user");
109 this.authToken = ParamChecker.notEmpty(authToken, "authToken");
110 }
111
112 /**
113 * Submit a workflow job. <p/> It validates configuration properties.
114 *
115 * @param conf job configuration.
116 * @param startJob indicates if the job should be started or not.
117 * @return the job Id.
118 * @throws DagEngineException thrown if the job could not be created.
119 */
120 @Override
121 public String submitJob(Configuration conf, boolean startJob) throws DagEngineException {
122 validateSubmitConfiguration(conf);
123
124 try {
125 String jobId;
126 if (useXCommand) {
127 SubmitXCommand submit = new SubmitXCommand(conf, getAuthToken());
128 jobId = submit.call();
129 }
130 else {
131 SubmitCommand submit = new SubmitCommand(conf, getAuthToken());
132 jobId = submit.call();
133 }
134 if (startJob) {
135 start(jobId);
136 }
137 return jobId;
138 }
139 catch (CommandException ex) {
140 throw new DagEngineException(ex);
141 }
142 }
143
144 /**
145 * Submit a pig/mapreduce job through HTTP.
146 * <p/>
147 * It validates configuration properties.
148 *
149 * @param conf job configuration.
150 * @param jobType job type - can be "pig" or "mapreduce".
151 * @return the job Id.
152 * @throws DagEngineException thrown if the job could not be created.
153 */
154 public String submitHttpJob(Configuration conf, String jobType) throws DagEngineException {
155 validateSubmitConfiguration(conf);
156
157 try {
158 String jobId;
159 if (useXCommand) {
160 SubmitHttpXCommand submit = null;
161 if (jobType.equals("pig")) {
162 submit = new SubmitPigXCommand(conf, getAuthToken());
163 }
164 else if (jobType.equals("mapreduce")) {
165 submit = new SubmitMRXCommand(conf, getAuthToken());
166 }
167
168 jobId = submit.call();
169 }
170 else {
171 SubmitHttpCommand submit = null;
172 if (jobType.equals("pig")) {
173 submit = new SubmitPigCommand(conf, getAuthToken());
174 }
175 else if (jobType.equals("mapreduce")) {
176 submit = new SubmitMRCommand(conf, getAuthToken());
177 }
178
179 jobId = submit.call();
180 }
181 start(jobId);
182 return jobId;
183 }
184 catch (CommandException ex) {
185 throw new DagEngineException(ex);
186 }
187 }
188
189 private void validateSubmitConfiguration(Configuration conf) throws DagEngineException {
190 if (conf.get(OozieClient.APP_PATH) == null) {
191 throw new DagEngineException(ErrorCode.E0401, OozieClient.APP_PATH);
192 }
193 }
194
195 /**
196 * Start a job.
197 *
198 * @param jobId job Id.
199 * @throws DagEngineException thrown if the job could not be started.
200 */
201 @Override
202 public void start(String jobId) throws DagEngineException {
203 // Changing to synchronous call from asynchronous queuing to prevent the
204 // loss of command if the queue is full or the queue is lost in case of
205 // failure.
206 try {
207 if (useXCommand) {
208 new StartXCommand(jobId).call();
209 }
210 else {
211 new StartCommand(jobId).call();
212 }
213 }
214 catch (CommandException e) {
215 throw new DagEngineException(e);
216 }
217 }
218
219 /**
220 * Resume a job.
221 *
222 * @param jobId job Id.
223 * @throws DagEngineException thrown if the job could not be resumed.
224 */
225 @Override
226 public void resume(String jobId) throws DagEngineException {
227 // Changing to synchronous call from asynchronous queuing to prevent the
228 // loss of command if the queue is full or the queue is lost in case of
229 // failure.
230 try {
231 if (useXCommand) {
232 new ResumeXCommand(jobId).call();
233 }
234 else {
235 new ResumeCommand(jobId).call();
236 }
237 }
238 catch (CommandException e) {
239 throw new DagEngineException(e);
240 }
241 }
242
243 /**
244 * Suspend a job.
245 *
246 * @param jobId job Id.
247 * @throws DagEngineException thrown if the job could not be suspended.
248 */
249 @Override
250 public void suspend(String jobId) throws DagEngineException {
251 // Changing to synchronous call from asynchronous queuing to prevent the
252 // loss of command if the queue is full or the queue is lost in case of
253 // failure.
254 try {
255 if (useXCommand) {
256 new SuspendXCommand(jobId).call();
257 }
258 else {
259 new SuspendCommand(jobId).call();
260 }
261 }
262 catch (CommandException e) {
263 throw new DagEngineException(e);
264 }
265 }
266
267 /**
268 * Kill a job.
269 *
270 * @param jobId job Id.
271 * @throws DagEngineException thrown if the job could not be killed.
272 */
273 @Override
274 public void kill(String jobId) throws DagEngineException {
275 // Changing to synchronous call from asynchronous queuing to prevent the
276 // loss of command if the queue is full or the queue is lost in case of
277 // failure.
278 try {
279 if (useXCommand) {
280 new KillXCommand(jobId).call();
281 }
282 else {
283 new KillCommand(jobId).call();
284 }
285 LOG.info("User " + user + " killed the WF job " + jobId);
286 }
287 catch (CommandException e) {
288 throw new DagEngineException(e);
289 }
290 }
291
292 /* (non-Javadoc)
293 * @see org.apache.oozie.BaseEngine#change(java.lang.String, java.lang.String)
294 */
295 @Override
296 public void change(String jobId, String changeValue) throws DagEngineException {
297 // This code should not be reached.
298 throw new DagEngineException(ErrorCode.E1017);
299 }
300
301 /**
302 * Rerun a job.
303 *
304 * @param jobId job Id to rerun.
305 * @param conf configuration information for the rerun.
306 * @throws DagEngineException thrown if the job could not be rerun.
307 */
308 @Override
309 public void reRun(String jobId, Configuration conf) throws DagEngineException {
310 try {
311 validateReRunConfiguration(conf);
312
313 if (useXCommand) {
314 new ReRunXCommand(jobId, conf, getAuthToken()).call();
315 }
316 else {
317 new ReRunCommand(jobId, conf, getAuthToken()).call();
318 }
319 start(jobId);
320 }
321 catch (CommandException ex) {
322 throw new DagEngineException(ex);
323 }
324 }
325
326 private void validateReRunConfiguration(Configuration conf) throws DagEngineException {
327 if (conf.get(OozieClient.APP_PATH) == null) {
328 throw new DagEngineException(ErrorCode.E0401, OozieClient.APP_PATH);
329 }
330 if (conf.get(OozieClient.RERUN_SKIP_NODES) == null && conf.get(OozieClient.RERUN_FAIL_NODES) == null) {
331 throw new DagEngineException(ErrorCode.E0401, OozieClient.RERUN_SKIP_NODES + " OR "
332 + OozieClient.RERUN_FAIL_NODES);
333 }
334 if (conf.get(OozieClient.RERUN_SKIP_NODES) != null && conf.get(OozieClient.RERUN_FAIL_NODES) != null) {
335 throw new DagEngineException(ErrorCode.E0404, OozieClient.RERUN_SKIP_NODES + " OR "
336 + OozieClient.RERUN_FAIL_NODES);
337 }
338 }
339
340 /**
341 * Process an action callback.
342 *
343 * @param actionId the action Id.
344 * @param externalStatus the action external status.
345 * @param actionData the action output data, <code>null</code> if none.
346 * @throws DagEngineException thrown if the callback could not be processed.
347 */
348 public void processCallback(String actionId, String externalStatus, Properties actionData)
349 throws DagEngineException {
350 XLog.Info.get().clearParameter(XLogService.GROUP);
351 XLog.Info.get().clearParameter(XLogService.USER);
352 XCallable<Void> command = null;
353
354 if (useXCommand) {
355 command = new CompletedActionXCommand(actionId, externalStatus, actionData, HIGH_PRIORITY);
356 }
357 else {
358 command = new CompletedActionCommand(actionId, externalStatus, actionData, HIGH_PRIORITY);
359 }
360 if (!Services.get().get(CallableQueueService.class).queue(command)) {
361 LOG.warn(XLog.OPS, "queue is full or system is in SAFEMODE, ignoring callback");
362 }
363 }
364
365 /**
366 * Return the info about a job.
367 *
368 * @param jobId job Id.
369 * @return the workflow job info.
370 * @throws DagEngineException thrown if the job info could not be obtained.
371 */
372 @Override
373 public WorkflowJob getJob(String jobId) throws DagEngineException {
374 try {
375 if (useXCommand) {
376 return new JobXCommand(jobId).call();
377 }
378 else {
379 return new JobCommand(jobId).call();
380 }
381 }
382 catch (CommandException ex) {
383 throw new DagEngineException(ex);
384 }
385 }
386
387 /**
388 * Return the info about a job with actions subset.
389 *
390 * @param jobId job Id
391 * @param start starting from this index in the list of actions belonging to the job
392 * @param length number of actions to be returned
393 * @return the workflow job info.
394 * @throws DagEngineException thrown if the job info could not be obtained.
395 */
396 @Override
397 public WorkflowJob getJob(String jobId, int start, int length) throws DagEngineException {
398 try {
399 if (useXCommand) {
400 return new JobXCommand(jobId, start, length).call();
401 }
402 else {
403 return new JobCommand(jobId, start, length).call();
404 }
405 }
406 catch (CommandException ex) {
407 throw new DagEngineException(ex);
408 }
409 }
410
411 /**
412 * Return the a job definition.
413 *
414 * @param jobId job Id.
415 * @return the job definition.
416 * @throws DagEngineException thrown if the job definition could no be obtained.
417 */
418 @Override
419 public String getDefinition(String jobId) throws DagEngineException {
420 try {
421 if (useXCommand) {
422 return new DefinitionXCommand(jobId).call();
423 }
424 else {
425 return new DefinitionCommand(jobId).call();
426 }
427 }
428 catch (CommandException ex) {
429 throw new DagEngineException(ex);
430 }
431 }
432
433 /**
434 * Stream the log of a job.
435 *
436 * @param jobId job Id.
437 * @param writer writer to stream the log to.
438 * @throws IOException thrown if the log cannot be streamed.
439 * @throws DagEngineException thrown if there is error in getting the Workflow Information for jobId.
440 */
441 @Override
442 public void streamLog(String jobId, Writer writer) throws IOException, DagEngineException {
443 XLogStreamer.Filter filter = new XLogStreamer.Filter();
444 filter.setParameter(DagXLogInfoService.JOB, jobId);
445 WorkflowJob job = getJob(jobId);
446 Date lastTime = job.getEndTime();
447 if (lastTime == null) {
448 lastTime = job.getLastModifiedTime();
449 }
450 Services.get().get(XLogService.class).streamLog(filter, job.getCreatedTime(), lastTime, writer);
451 }
452
453 private static final Set<String> FILTER_NAMES = new HashSet<String>();
454
455 static {
456 FILTER_NAMES.add(OozieClient.FILTER_USER);
457 FILTER_NAMES.add(OozieClient.FILTER_NAME);
458 FILTER_NAMES.add(OozieClient.FILTER_GROUP);
459 FILTER_NAMES.add(OozieClient.FILTER_STATUS);
460 FILTER_NAMES.add(OozieClient.FILTER_ID);
461 }
462
463 /**
464 * Validate a jobs filter.
465 *
466 * @param filter filter to validate.
467 * @return the parsed filter.
468 * @throws DagEngineException thrown if the filter is invalid.
469 */
470 protected Map<String, List<String>> parseFilter(String filter) throws DagEngineException {
471 Map<String, List<String>> map = new HashMap<String, List<String>>();
472 if (filter != null) {
473 StringTokenizer st = new StringTokenizer(filter, ";");
474 while (st.hasMoreTokens()) {
475 String token = st.nextToken();
476 if (token.contains("=")) {
477 String[] pair = token.split("=");
478 if (pair.length != 2) {
479 throw new DagEngineException(ErrorCode.E0420, filter, "elements must be name=value pairs");
480 }
481 if (!FILTER_NAMES.contains(pair[0])) {
482 throw new DagEngineException(ErrorCode.E0420, filter, XLog
483 .format("invalid name [{0}]", pair[0]));
484 }
485 if (pair[0].equals("status")) {
486 try {
487 WorkflowJob.Status.valueOf(pair[1]);
488 }
489 catch (IllegalArgumentException ex) {
490 throw new DagEngineException(ErrorCode.E0420, filter, XLog.format("invalid status [{0}]",
491 pair[1]));
492 }
493 }
494 List<String> list = map.get(pair[0]);
495 if (list == null) {
496 list = new ArrayList<String>();
497 map.put(pair[0], list);
498 }
499 list.add(pair[1]);
500 }
501 else {
502 throw new DagEngineException(ErrorCode.E0420, filter, "elements must be name=value pairs");
503 }
504 }
505 }
506 return map;
507 }
508
509 /**
510 * Return the info about a set of jobs.
511 *
512 * @param filterStr job filter. Refer to the {@link org.apache.oozie.client.OozieClient} for the filter syntax.
513 * @param start offset, base 1.
514 * @param len number of jobs to return.
515 * @return job info for all matching jobs, the jobs don't contain node action information.
516 * @throws DagEngineException thrown if the jobs info could not be obtained.
517 */
518 public WorkflowsInfo getJobs(String filterStr, int start, int len) throws DagEngineException {
519 Map<String, List<String>> filter = parseFilter(filterStr);
520 try {
521 if (useXCommand) {
522 return new JobsXCommand(filter, start, len).call();
523 }
524 else {
525 return new JobsCommand(filter, start, len).call();
526 }
527 }
528 catch (CommandException dce) {
529 throw new DagEngineException(dce);
530 }
531 }
532
533 /**
534 * Return the workflow Job ID for an external ID. <p/> This is reverse lookup for recovery purposes.
535 *
536 * @param externalId external ID provided at job submission time.
537 * @return the associated workflow job ID if any, <code>null</code> if none.
538 * @throws DagEngineException thrown if the lookup could not be done.
539 */
540 @Override
541 public String getJobIdForExternalId(String externalId) throws DagEngineException {
542 try {
543 if (useXCommand) {
544 return new ExternalIdXCommand(externalId).call();
545 }
546 else {
547 return new ExternalIdCommand(externalId).call();
548 }
549 }
550 catch (CommandException dce) {
551 throw new DagEngineException(dce);
552 }
553 }
554
555 @Override
556 public CoordinatorJob getCoordJob(String jobId) throws BaseEngineException {
557 throw new BaseEngineException(new XException(ErrorCode.E0301));
558 }
559
560 @Override
561 public CoordinatorJob getCoordJob(String jobId, int start, int length) throws BaseEngineException {
562 throw new BaseEngineException(new XException(ErrorCode.E0301));
563 }
564
565 public WorkflowActionBean getWorkflowAction(String actionId) throws BaseEngineException {
566 try {
567 if (useXCommand) {
568 return new WorkflowActionInfoXCommand(actionId).call();
569 }
570 else {
571 return new WorkflowActionInfoCommand(actionId).call();
572 }
573 }
574 catch (CommandException ex) {
575 throw new BaseEngineException(ex);
576 }
577 }
578
579 @Override
580 public String dryrunSubmit(Configuration conf, boolean startJob) throws BaseEngineException {
581 return null;
582 }
583 }