This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie;
019
020 import org.apache.oozie.util.XLogStreamer;
021 import org.apache.oozie.service.XLogService;
022 import org.apache.oozie.service.DagXLogInfoService;
023 import org.apache.hadoop.conf.Configuration;
024 import org.apache.oozie.client.CoordinatorJob;
025 import org.apache.oozie.client.WorkflowJob;
026 import org.apache.oozie.client.OozieClient;
027 import org.apache.oozie.command.CommandException;
028 import org.apache.oozie.command.wf.CompletedActionXCommand;
029 import org.apache.oozie.command.wf.DefinitionXCommand;
030 import org.apache.oozie.command.wf.ExternalIdXCommand;
031 import org.apache.oozie.command.wf.JobXCommand;
032 import org.apache.oozie.command.wf.JobsXCommand;
033 import org.apache.oozie.command.wf.KillXCommand;
034 import org.apache.oozie.command.wf.ReRunXCommand;
035 import org.apache.oozie.command.wf.ResumeXCommand;
036 import org.apache.oozie.command.wf.StartXCommand;
037 import org.apache.oozie.command.wf.SubmitHiveXCommand;
038 import org.apache.oozie.command.wf.SubmitHttpXCommand;
039 import org.apache.oozie.command.wf.SubmitMRXCommand;
040 import org.apache.oozie.command.wf.SubmitPigXCommand;
041 import org.apache.oozie.command.wf.SubmitXCommand;
042 import org.apache.oozie.command.wf.SuspendXCommand;
043 import org.apache.oozie.command.wf.WorkflowActionInfoXCommand;
044 import org.apache.oozie.service.Services;
045 import org.apache.oozie.service.CallableQueueService;
046 import org.apache.oozie.util.ParamChecker;
047 import org.apache.oozie.util.XCallable;
048 import org.apache.oozie.util.XLog;
049
050 import java.io.Writer;
051 import java.util.Date;
052 import java.util.List;
053 import java.util.Properties;
054 import java.util.Set;
055 import java.util.HashSet;
056 import java.util.StringTokenizer;
057 import java.util.Map;
058 import java.util.HashMap;
059 import java.util.ArrayList;
060 import java.io.IOException;
061
062 /**
063 * The DagEngine provides all the DAG engine functionality for WS calls.
064 */
065 public class DagEngine extends BaseEngine {
066
067 private static final int HIGH_PRIORITY = 2;
068 private static XLog LOG = XLog.getLog(DagEngine.class);
069
070 /**
071 * Create a system Dag engine, with no user and no group.
072 */
073 public DagEngine() {
074 if (Services.get().getConf().getBoolean(USE_XCOMMAND, true) == false) {
075 LOG.debug("Oozie DagEngine is not using XCommands.");
076 }
077 else {
078 LOG.debug("Oozie DagEngine is using XCommands.");
079 }
080 }
081
082 /**
083 * Create a Dag engine to perform operations on behave of a user.
084 *
085 * @param user user name.
086 */
087 public DagEngine(String user) {
088 this();
089
090 this.user = ParamChecker.notEmpty(user, "user");
091 }
092
093 /**
094 * Submit a workflow job. <p/> It validates configuration properties.
095 *
096 * @param conf job configuration.
097 * @param startJob indicates if the job should be started or not.
098 * @return the job Id.
099 * @throws DagEngineException thrown if the job could not be created.
100 */
101 @Override
102 public String submitJob(Configuration conf, boolean startJob) throws DagEngineException {
103 validateSubmitConfiguration(conf);
104
105 try {
106 String jobId;
107 SubmitXCommand submit = new SubmitXCommand(conf);
108 jobId = submit.call();
109 if (startJob) {
110 start(jobId);
111 }
112 return jobId;
113 }
114 catch (CommandException ex) {
115 throw new DagEngineException(ex);
116 }
117 }
118
119 /**
120 * Submit a workflow through a coordinator. It validates configuration properties.
121 * @param conf job conf
122 * @param parentId parent of workflow
123 * @return
124 * @throws DagEngineException
125 */
126 public String submitJobFromCoordinator(Configuration conf, String parentId) throws DagEngineException {
127 validateSubmitConfiguration(conf);
128 try {
129 String jobId;
130 SubmitXCommand submit = new SubmitXCommand(conf, parentId);
131 jobId = submit.call();
132 start(jobId);
133 return jobId;
134 }
135 catch (CommandException ex) {
136 throw new DagEngineException(ex);
137 }
138 }
139
140 /**
141 * Submit a pig/hive/mapreduce job through HTTP.
142 * <p/>
143 * It validates configuration properties.
144 *
145 * @param conf job configuration.
146 * @param jobType job type - can be "pig", "hive, or "mapreduce".
147 * @return the job Id.
148 * @throws DagEngineException thrown if the job could not be created.
149 */
150 public String submitHttpJob(Configuration conf, String jobType) throws DagEngineException {
151 validateSubmitConfiguration(conf);
152
153 try {
154 String jobId;
155 SubmitHttpXCommand submit = null;
156 if (jobType.equals("pig")) {
157 submit = new SubmitPigXCommand(conf);
158 }
159 else if (jobType.equals("mapreduce")) {
160 submit = new SubmitMRXCommand(conf);
161 }
162 else if (jobType.equals("hive")) {
163 submit = new SubmitHiveXCommand(conf);
164 }
165
166 jobId = submit.call();
167 start(jobId);
168 return jobId;
169 }
170 catch (CommandException ex) {
171 throw new DagEngineException(ex);
172 }
173 }
174
175 private void validateSubmitConfiguration(Configuration conf) throws DagEngineException {
176 if (conf.get(OozieClient.APP_PATH) == null) {
177 throw new DagEngineException(ErrorCode.E0401, OozieClient.APP_PATH);
178 }
179 }
180
181 /**
182 * Start a job.
183 *
184 * @param jobId job Id.
185 * @throws DagEngineException thrown if the job could not be started.
186 */
187 @Override
188 public void start(String jobId) throws DagEngineException {
189 // Changing to synchronous call from asynchronous queuing to prevent the
190 // loss of command if the queue is full or the queue is lost in case of
191 // failure.
192 try {
193 new StartXCommand(jobId).call();
194 }
195 catch (CommandException e) {
196 throw new DagEngineException(e);
197 }
198 }
199
200 /**
201 * Resume a job.
202 *
203 * @param jobId job Id.
204 * @throws DagEngineException thrown if the job could not be resumed.
205 */
206 @Override
207 public void resume(String jobId) throws DagEngineException {
208 // Changing to synchronous call from asynchronous queuing to prevent the
209 // loss of command if the queue is full or the queue is lost in case of
210 // failure.
211 try {
212 new ResumeXCommand(jobId).call();
213 }
214 catch (CommandException e) {
215 throw new DagEngineException(e);
216 }
217 }
218
219 /**
220 * Suspend a job.
221 *
222 * @param jobId job Id.
223 * @throws DagEngineException thrown if the job could not be suspended.
224 */
225 @Override
226 public void suspend(String jobId) throws DagEngineException {
227 // Changing to synchronous call from asynchronous queuing to prevent the
228 // loss of command if the queue is full or the queue is lost in case of
229 // failure.
230 try {
231 new SuspendXCommand(jobId).call();
232 }
233 catch (CommandException e) {
234 throw new DagEngineException(e);
235 }
236 }
237
238 /**
239 * Kill a job.
240 *
241 * @param jobId job Id.
242 * @throws DagEngineException thrown if the job could not be killed.
243 */
244 @Override
245 public void kill(String jobId) throws DagEngineException {
246 // Changing to synchronous call from asynchronous queuing to prevent the
247 // loss of command if the queue is full or the queue is lost in case of
248 // failure.
249 try {
250 new KillXCommand(jobId).call();
251 LOG.info("User " + user + " killed the WF job " + jobId);
252 }
253 catch (CommandException e) {
254 throw new DagEngineException(e);
255 }
256 }
257
258 /* (non-Javadoc)
259 * @see org.apache.oozie.BaseEngine#change(java.lang.String, java.lang.String)
260 */
261 @Override
262 public void change(String jobId, String changeValue) throws DagEngineException {
263 // This code should not be reached.
264 throw new DagEngineException(ErrorCode.E1017);
265 }
266
267 /**
268 * Rerun a job.
269 *
270 * @param jobId job Id to rerun.
271 * @param conf configuration information for the rerun.
272 * @throws DagEngineException thrown if the job could not be rerun.
273 */
274 @Override
275 public void reRun(String jobId, Configuration conf) throws DagEngineException {
276 try {
277 validateReRunConfiguration(conf);
278 new ReRunXCommand(jobId, conf).call();
279 start(jobId);
280 }
281 catch (CommandException ex) {
282 throw new DagEngineException(ex);
283 }
284 }
285
286 private void validateReRunConfiguration(Configuration conf) throws DagEngineException {
287 if (conf.get(OozieClient.APP_PATH) == null) {
288 throw new DagEngineException(ErrorCode.E0401, OozieClient.APP_PATH);
289 }
290 if (conf.get(OozieClient.RERUN_SKIP_NODES) == null && conf.get(OozieClient.RERUN_FAIL_NODES) == null) {
291 throw new DagEngineException(ErrorCode.E0401, OozieClient.RERUN_SKIP_NODES + " OR "
292 + OozieClient.RERUN_FAIL_NODES);
293 }
294 if (conf.get(OozieClient.RERUN_SKIP_NODES) != null && conf.get(OozieClient.RERUN_FAIL_NODES) != null) {
295 throw new DagEngineException(ErrorCode.E0404, OozieClient.RERUN_SKIP_NODES + " OR "
296 + OozieClient.RERUN_FAIL_NODES);
297 }
298 }
299
300 /**
301 * Process an action callback.
302 *
303 * @param actionId the action Id.
304 * @param externalStatus the action external status.
305 * @param actionData the action output data, <code>null</code> if none.
306 * @throws DagEngineException thrown if the callback could not be processed.
307 */
308 public void processCallback(String actionId, String externalStatus, Properties actionData)
309 throws DagEngineException {
310 XLog.Info.get().clearParameter(XLogService.GROUP);
311 XLog.Info.get().clearParameter(XLogService.USER);
312 XCallable<Void> command = null;
313
314 command = new CompletedActionXCommand(actionId, externalStatus,
315 actionData, HIGH_PRIORITY);
316 if (!Services.get().get(CallableQueueService.class).queue(command)) {
317 LOG.warn(XLog.OPS, "queue is full or system is in SAFEMODE, ignoring callback");
318 }
319 }
320
321 /**
322 * Return the info about a job.
323 *
324 * @param jobId job Id.
325 * @return the workflow job info.
326 * @throws DagEngineException thrown if the job info could not be obtained.
327 */
328 @Override
329 public WorkflowJob getJob(String jobId) throws DagEngineException {
330 try {
331 return new JobXCommand(jobId).call();
332 }
333 catch (CommandException ex) {
334 throw new DagEngineException(ex);
335 }
336 }
337
338 /**
339 * Return the info about a job with actions subset.
340 *
341 * @param jobId job Id
342 * @param start starting from this index in the list of actions belonging to the job
343 * @param length number of actions to be returned
344 * @return the workflow job info.
345 * @throws DagEngineException thrown if the job info could not be obtained.
346 */
347 @Override
348 public WorkflowJob getJob(String jobId, int start, int length) throws DagEngineException {
349 try {
350 return new JobXCommand(jobId, start, length).call();
351 }
352 catch (CommandException ex) {
353 throw new DagEngineException(ex);
354 }
355 }
356
357 /**
358 * Return the a job definition.
359 *
360 * @param jobId job Id.
361 * @return the job definition.
362 * @throws DagEngineException thrown if the job definition could no be obtained.
363 */
364 @Override
365 public String getDefinition(String jobId) throws DagEngineException {
366 try {
367 return new DefinitionXCommand(jobId).call();
368 }
369 catch (CommandException ex) {
370 throw new DagEngineException(ex);
371 }
372 }
373
374 /**
375 * Stream the log of a job.
376 *
377 * @param jobId job Id.
378 * @param writer writer to stream the log to.
379 * @throws IOException thrown if the log cannot be streamed.
380 * @throws DagEngineException thrown if there is error in getting the Workflow Information for jobId.
381 */
382 @Override
383 public void streamLog(String jobId, Writer writer) throws IOException, DagEngineException {
384 XLogStreamer.Filter filter = new XLogStreamer.Filter();
385 filter.setParameter(DagXLogInfoService.JOB, jobId);
386 WorkflowJob job = getJob(jobId);
387 Date lastTime = job.getEndTime();
388 if (lastTime == null) {
389 lastTime = job.getLastModifiedTime();
390 }
391 Services.get().get(XLogService.class).streamLog(filter, job.getCreatedTime(), lastTime, writer);
392 }
393
394 private static final Set<String> FILTER_NAMES = new HashSet<String>();
395
396 static {
397 FILTER_NAMES.add(OozieClient.FILTER_USER);
398 FILTER_NAMES.add(OozieClient.FILTER_NAME);
399 FILTER_NAMES.add(OozieClient.FILTER_GROUP);
400 FILTER_NAMES.add(OozieClient.FILTER_STATUS);
401 FILTER_NAMES.add(OozieClient.FILTER_ID);
402 }
403
404 /**
405 * Validate a jobs filter.
406 *
407 * @param filter filter to validate.
408 * @return the parsed filter.
409 * @throws DagEngineException thrown if the filter is invalid.
410 */
411 protected Map<String, List<String>> parseFilter(String filter) throws DagEngineException {
412 Map<String, List<String>> map = new HashMap<String, List<String>>();
413 if (filter != null) {
414 StringTokenizer st = new StringTokenizer(filter, ";");
415 while (st.hasMoreTokens()) {
416 String token = st.nextToken();
417 if (token.contains("=")) {
418 String[] pair = token.split("=");
419 if (pair.length != 2) {
420 throw new DagEngineException(ErrorCode.E0420, filter, "elements must be name=value pairs");
421 }
422 if (!FILTER_NAMES.contains(pair[0])) {
423 throw new DagEngineException(ErrorCode.E0420, filter, XLog
424 .format("invalid name [{0}]", pair[0]));
425 }
426 if (pair[0].equals("status")) {
427 try {
428 WorkflowJob.Status.valueOf(pair[1]);
429 }
430 catch (IllegalArgumentException ex) {
431 throw new DagEngineException(ErrorCode.E0420, filter, XLog.format("invalid status [{0}]",
432 pair[1]));
433 }
434 }
435 List<String> list = map.get(pair[0]);
436 if (list == null) {
437 list = new ArrayList<String>();
438 map.put(pair[0], list);
439 }
440 list.add(pair[1]);
441 }
442 else {
443 throw new DagEngineException(ErrorCode.E0420, filter, "elements must be name=value pairs");
444 }
445 }
446 }
447 return map;
448 }
449
450 /**
451 * Return the info about a set of jobs.
452 *
453 * @param filter job filter. Refer to the {@link org.apache.oozie.client.OozieClient} for the filter syntax.
454 * @param start offset, base 1.
455 * @param len number of jobs to return.
456 * @return job info for all matching jobs, the jobs don't contain node action information.
457 * @throws DagEngineException thrown if the jobs info could not be obtained.
458 */
459 public WorkflowsInfo getJobs(String filter, int start, int len) throws DagEngineException {
460 Map<String, List<String>> filterList = parseFilter(filter);
461 try {
462 return new JobsXCommand(filterList, start, len).call();
463 }
464 catch (CommandException dce) {
465 throw new DagEngineException(dce);
466 }
467 }
468
469 /**
470 * Return the workflow Job ID for an external ID. <p/> This is reverse lookup for recovery purposes.
471 *
472 * @param externalId external ID provided at job submission time.
473 * @return the associated workflow job ID if any, <code>null</code> if none.
474 * @throws DagEngineException thrown if the lookup could not be done.
475 */
476 @Override
477 public String getJobIdForExternalId(String externalId) throws DagEngineException {
478 try {
479 return new ExternalIdXCommand(externalId).call();
480 }
481 catch (CommandException dce) {
482 throw new DagEngineException(dce);
483 }
484 }
485
486 @Override
487 public CoordinatorJob getCoordJob(String jobId) throws BaseEngineException {
488 throw new BaseEngineException(new XException(ErrorCode.E0301, "cannot get a coordinator job from DagEngine"));
489 }
490
491 @Override
492 public CoordinatorJob getCoordJob(String jobId, String filter, int start, int length, boolean desc) throws BaseEngineException {
493 throw new BaseEngineException(new XException(ErrorCode.E0301, "cannot get a coordinator job from DagEngine"));
494 }
495
496 public WorkflowActionBean getWorkflowAction(String actionId) throws BaseEngineException {
497 try {
498 return new WorkflowActionInfoXCommand(actionId).call();
499 }
500 catch (CommandException ex) {
501 throw new BaseEngineException(ex);
502 }
503 }
504
505 /* (non-Javadoc)
506 * @see org.apache.oozie.BaseEngine#dryRunSubmit(org.apache.hadoop.conf.Configuration)
507 */
508 @Override
509 public String dryRunSubmit(Configuration conf) throws BaseEngineException {
510 try {
511 SubmitXCommand submit = new SubmitXCommand(true, conf);
512 return submit.call();
513 } catch (CommandException ex) {
514 throw new DagEngineException(ex);
515 }
516 }
517 }