This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.wf;
019
020 import java.io.IOException;
021 import java.io.StringReader;
022 import java.net.URI;
023 import java.net.URISyntaxException;
024 import java.util.Date;
025 import java.util.Properties;
026 import java.util.Set;
027
028 import org.apache.hadoop.conf.Configuration;
029 import org.apache.hadoop.fs.FileSystem;
030 import org.apache.hadoop.fs.Path;
031 import org.apache.oozie.DagELFunctions;
032 import org.apache.oozie.ErrorCode;
033 import org.apache.oozie.WorkflowActionBean;
034 import org.apache.oozie.WorkflowJobBean;
035 import org.apache.oozie.action.ActionExecutor;
036 import org.apache.oozie.client.WorkflowAction;
037 import org.apache.oozie.client.WorkflowJob;
038 import org.apache.oozie.command.CommandException;
039 import org.apache.oozie.command.coord.CoordActionUpdateXCommand;
040 import org.apache.oozie.service.CallbackService;
041 import org.apache.oozie.service.ELService;
042 import org.apache.oozie.service.HadoopAccessorException;
043 import org.apache.oozie.service.HadoopAccessorService;
044 import org.apache.oozie.service.JPAService;
045 import org.apache.oozie.service.LiteWorkflowStoreService;
046 import org.apache.oozie.service.Services;
047 import org.apache.oozie.util.ELEvaluator;
048 import org.apache.oozie.util.InstrumentUtils;
049 import org.apache.oozie.util.Instrumentation;
050 import org.apache.oozie.util.XConfiguration;
051 import org.apache.oozie.workflow.WorkflowException;
052 import org.apache.oozie.workflow.WorkflowInstance;
053 import org.apache.oozie.workflow.lite.LiteWorkflowInstance;
054
055 /**
056 * Base class for Action execution commands. Provides common functionality to handle different types of errors while
057 * attempting to start or end an action.
058 */
059 public abstract class ActionXCommand<T> extends WorkflowXCommand<Void> {
060 private static final String INSTRUMENTATION_GROUP = "action.executors";
061
062 protected static final String INSTR_FAILED_JOBS_COUNTER = "failed";
063
064 protected static final String RECOVERY_ID_SEPARATOR = "@";
065
066 public ActionXCommand(String name, String type, int priority) {
067 super(name, type, priority);
068 }
069
070 /**
071 * Takes care of Transient failures. Sets the action status to retry and increments the retry count if not enough
072 * attempts have been made. Otherwise returns false.
073 *
074 * @param context the execution context.
075 * @param executor the executor instance being used.
076 * @param status the status to be set for the action.
077 * @return true if the action is scheduled for another retry. false if the number of retries has exceeded the
078 * maximum number of configured retries.
079 * @throws CommandException thrown if unable to handle transient
080 */
081 protected boolean handleTransient(ActionExecutor.Context context, ActionExecutor executor,
082 WorkflowAction.Status status) throws CommandException {
083 LOG.debug("Attempting to retry");
084 ActionExecutorContext aContext = (ActionExecutorContext) context;
085 WorkflowActionBean action = (WorkflowActionBean) aContext.getAction();
086 incrActionErrorCounter(action.getType(), "transient", 1);
087
088 int actionRetryCount = action.getRetries();
089 if (actionRetryCount >= executor.getMaxRetries()) {
090 LOG.warn("Exceeded max retry count [{0}]. Suspending Job", executor.getMaxRetries());
091 return false;
092 }
093 else {
094 action.setStatus(status);
095 action.setPending();
096 action.incRetries();
097 long retryDelayMillis = executor.getRetryInterval() * 1000;
098 action.setPendingAge(new Date(System.currentTimeMillis() + retryDelayMillis));
099 LOG.info("Next Retry, Attempt Number [{0}] in [{1}] milliseconds", actionRetryCount + 1, retryDelayMillis);
100 this.resetUsed();
101 queue(this, retryDelayMillis);
102 return true;
103 }
104 }
105
106 /**
107 * Takes care of non transient failures. The job is suspended, and the state of the action is changed to *MANUAL and
108 * set pending flag of action to false
109 *
110 * @param context the execution context.
111 * @param executor the executor instance being used.
112 * @param status the status to be set for the action.
113 * @throws CommandException thrown if unable to suspend job
114 */
115 protected void handleNonTransient(ActionExecutor.Context context, ActionExecutor executor,
116 WorkflowAction.Status status) throws CommandException {
117 ActionExecutorContext aContext = (ActionExecutorContext) context;
118 WorkflowActionBean action = (WorkflowActionBean) aContext.getAction();
119 incrActionErrorCounter(action.getType(), "nontransient", 1);
120 WorkflowJobBean workflow = (WorkflowJobBean) context.getWorkflow();
121 String id = workflow.getId();
122 action.setStatus(status);
123 action.resetPendingOnly();
124 LOG.warn("Suspending Workflow Job id=" + id);
125 try {
126 SuspendXCommand.suspendJob(Services.get().get(JPAService.class), workflow, id, action.getId(), null);
127 }
128 catch (Exception e) {
129 throw new CommandException(ErrorCode.E0727, id, e.getMessage());
130 }
131 finally {
132 // update coordinator action
133 new CoordActionUpdateXCommand(workflow, 3).call();
134 }
135 }
136
137 /**
138 * Takes care of errors. </p> For errors while attempting to start the action, the job state is updated and an
139 * {@link ActionEndCommand} is queued. </p> For errors while attempting to end the action, the job state is updated.
140 * </p>
141 *
142 * @param context the execution context.
143 * @param executor the executor instance being used.
144 * @param message
145 * @param isStart whether the error was generated while starting or ending an action.
146 * @param status the status to be set for the action.
147 * @throws CommandException thrown if unable to handle action error
148 */
149 protected void handleError(ActionExecutor.Context context, ActionExecutor executor, String message,
150 boolean isStart, WorkflowAction.Status status) throws CommandException {
151 LOG.warn("Setting Action Status to [{0}]", status);
152 ActionExecutorContext aContext = (ActionExecutorContext) context;
153 WorkflowActionBean action = (WorkflowActionBean) aContext.getAction();
154
155 if (!handleUserRetry(action)) {
156 incrActionErrorCounter(action.getType(), "error", 1);
157 action.setPending();
158 if (isStart) {
159 action.setExecutionData(message, null);
160 queue(new ActionEndXCommand(action.getId(), action.getType()));
161 }
162 else {
163 action.setEndData(status, WorkflowAction.Status.ERROR.toString());
164 }
165 }
166 }
167
168 /**
169 * Fail the job due to failed action
170 *
171 * @param context the execution context.
172 * @throws CommandException thrown if unable to fail job
173 */
174 public void failJob(ActionExecutor.Context context) throws CommandException {
175 ActionExecutorContext aContext = (ActionExecutorContext) context;
176 WorkflowActionBean action = (WorkflowActionBean) aContext.getAction();
177 failJob(context, action);
178 }
179
180 /**
181 * Fail the job due to failed action
182 *
183 * @param context the execution context.
184 * @param action the action that caused the workflow to fail
185 * @throws CommandException thrown if unable to fail job
186 */
187 public void failJob(ActionExecutor.Context context, WorkflowActionBean action) throws CommandException {
188 WorkflowJobBean workflow = (WorkflowJobBean) context.getWorkflow();
189 if (!handleUserRetry(action)) {
190 incrActionErrorCounter(action.getType(), "failed", 1);
191 LOG.warn("Failing Job due to failed action [{0}]", action.getName());
192 try {
193 workflow.getWorkflowInstance().fail(action.getName());
194 WorkflowInstance wfInstance = workflow.getWorkflowInstance();
195 ((LiteWorkflowInstance) wfInstance).setStatus(WorkflowInstance.Status.FAILED);
196 workflow.setWorkflowInstance(wfInstance);
197 workflow.setStatus(WorkflowJob.Status.FAILED);
198 action.setStatus(WorkflowAction.Status.FAILED);
199 action.resetPending();
200 queue(new NotificationXCommand(workflow, action));
201 queue(new KillXCommand(workflow.getId()));
202 InstrumentUtils.incrJobCounter(INSTR_FAILED_JOBS_COUNTER, 1, getInstrumentation());
203 }
204 catch (WorkflowException ex) {
205 throw new CommandException(ex);
206 }
207 }
208 }
209
210 /**
211 * Execute retry for action if this action is eligible for user-retry
212 *
213 * @param context the execution context.
214 * @return true if user-retry has to be handled for this action
215 * @throws CommandException thrown if unable to fail job
216 */
217 public boolean handleUserRetry(WorkflowActionBean action) throws CommandException {
218 String errorCode = action.getErrorCode();
219 Set<String> allowedRetryCode = LiteWorkflowStoreService.getUserRetryErrorCode();
220
221 if (allowedRetryCode.contains(errorCode) && action.getUserRetryCount() < action.getUserRetryMax()) {
222 LOG.info("Preparing retry this action [{0}], errorCode [{1}], userRetryCount [{2}], "
223 + "userRetryMax [{3}], userRetryInterval [{4}]", action.getId(), errorCode, action
224 .getUserRetryCount(), action.getUserRetryMax(), action.getUserRetryInterval());
225 int interval = action.getUserRetryInterval() * 60 * 1000;
226 action.setStatus(WorkflowAction.Status.USER_RETRY);
227 action.incrmentUserRetryCount();
228 action.setPending();
229 queue(new ActionStartXCommand(action.getId(), action.getType()), interval);
230 return true;
231 }
232 return false;
233 }
234
235 /*
236 * In case of action error increment the error count for instrumentation
237 */
238 private void incrActionErrorCounter(String type, String error, int count) {
239 getInstrumentation().incr(INSTRUMENTATION_GROUP, type + "#ex." + error, count);
240 }
241
242 /**
243 * Increment the action counter in the instrumentation log. indicating how
244 * many times the action was executed since the start Oozie server
245 */
246 protected void incrActionCounter(String type, int count) {
247 getInstrumentation().incr(INSTRUMENTATION_GROUP, type + "#" + getName(), count);
248 }
249
250 /**
251 * Adding a cron for the instrumentation time for the given Instrumentation
252 * group
253 */
254 protected void addActionCron(String type, Instrumentation.Cron cron) {
255 getInstrumentation().addCron(INSTRUMENTATION_GROUP, type + "#" + getName(), cron);
256 }
257
258 /**
259 * Workflow action executor context
260 *
261 */
262 public static class ActionExecutorContext implements ActionExecutor.Context {
263 private final WorkflowJobBean workflow;
264 private Configuration protoConf;
265 private final WorkflowActionBean action;
266 private final boolean isRetry;
267 private final boolean isUserRetry;
268 private boolean started;
269 private boolean ended;
270 private boolean executed;
271
272 /**
273 * Constructing the ActionExecutorContext, setting the private members
274 * and constructing the proto configuration
275 */
276 public ActionExecutorContext(WorkflowJobBean workflow, WorkflowActionBean action, boolean isRetry, boolean isUserRetry) {
277 this.workflow = workflow;
278 this.action = action;
279 this.isRetry = isRetry;
280 this.isUserRetry = isUserRetry;
281 try {
282 protoConf = new XConfiguration(new StringReader(workflow.getProtoActionConf()));
283 }
284 catch (IOException ex) {
285 throw new RuntimeException("It should not happen", ex);
286 }
287 }
288
289 /*
290 * (non-Javadoc)
291 * @see org.apache.oozie.action.ActionExecutor.Context#getCallbackUrl(java.lang.String)
292 */
293 public String getCallbackUrl(String externalStatusVar) {
294 return Services.get().get(CallbackService.class).createCallBackUrl(action.getId(), externalStatusVar);
295 }
296
297 /*
298 * (non-Javadoc)
299 * @see org.apache.oozie.action.ActionExecutor.Context#getProtoActionConf()
300 */
301 public Configuration getProtoActionConf() {
302 return protoConf;
303 }
304
305 /*
306 * (non-Javadoc)
307 * @see org.apache.oozie.action.ActionExecutor.Context#getWorkflow()
308 */
309 public WorkflowJob getWorkflow() {
310 return workflow;
311 }
312
313 /**
314 * Returns the workflow action of the given action context
315 *
316 * @return the workflow action of the given action context
317 */
318 public WorkflowAction getAction() {
319 return action;
320 }
321
322 /*
323 * (non-Javadoc)
324 * @see org.apache.oozie.action.ActionExecutor.Context#getELEvaluator()
325 */
326 public ELEvaluator getELEvaluator() {
327 ELEvaluator evaluator = Services.get().get(ELService.class).createEvaluator("workflow");
328 DagELFunctions.configureEvaluator(evaluator, workflow, action);
329 return evaluator;
330 }
331
332 /*
333 * (non-Javadoc)
334 * @see org.apache.oozie.action.ActionExecutor.Context#setVar(java.lang.String, java.lang.String)
335 */
336 public void setVar(String name, String value) {
337 name = action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + name;
338 WorkflowInstance wfInstance = workflow.getWorkflowInstance();
339 wfInstance.setVar(name, value);
340 workflow.setWorkflowInstance(wfInstance);
341 }
342
343 /*
344 * (non-Javadoc)
345 * @see org.apache.oozie.action.ActionExecutor.Context#getVar(java.lang.String)
346 */
347 public String getVar(String name) {
348 name = action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + name;
349 return workflow.getWorkflowInstance().getVar(name);
350 }
351
352 /*
353 * (non-Javadoc)
354 * @see org.apache.oozie.action.ActionExecutor.Context#setStartData(java.lang.String, java.lang.String, java.lang.String)
355 */
356 public void setStartData(String externalId, String trackerUri, String consoleUrl) {
357 action.setStartData(externalId, trackerUri, consoleUrl);
358 started = true;
359 }
360
361 /**
362 * Setting the start time of the action
363 */
364 public void setStartTime() {
365 Date now = new Date();
366 action.setStartTime(now);
367 }
368
369 /*
370 * (non-Javadoc)
371 * @see org.apache.oozie.action.ActionExecutor.Context#setExecutionData(java.lang.String, java.util.Properties)
372 */
373 public void setExecutionData(String externalStatus, Properties actionData) {
374 action.setExecutionData(externalStatus, actionData);
375 executed = true;
376 }
377
378 /*
379 * (non-Javadoc)
380 * @see org.apache.oozie.action.ActionExecutor.Context#setExecutionStats(java.lang.String)
381 */
382 public void setExecutionStats(String jsonStats) {
383 action.setExecutionStats(jsonStats);
384 executed = true;
385 }
386
387 /*
388 * (non-Javadoc)
389 * @see org.apache.oozie.action.ActionExecutor.Context#setExternalChildIDs(java.lang.String)
390 */
391 public void setExternalChildIDs(String externalChildIDs) {
392 action.setExternalChildIDs(externalChildIDs);
393 executed = true;
394 }
395
396 /*
397 * (non-Javadoc)
398 * @see org.apache.oozie.action.ActionExecutor.Context#setEndData(org.apache.oozie.client.WorkflowAction.Status, java.lang.String)
399 */
400 public void setEndData(WorkflowAction.Status status, String signalValue) {
401 action.setEndData(status, signalValue);
402 ended = true;
403 }
404
405 /*
406 * (non-Javadoc)
407 * @see org.apache.oozie.action.ActionExecutor.Context#isRetry()
408 */
409 public boolean isRetry() {
410 return isRetry;
411 }
412
413 /**
414 * Return if the executor invocation is a user retry or not.
415 *
416 * @return if the executor invocation is a user retry or not.
417 */
418 public boolean isUserRetry() {
419 return isUserRetry;
420 }
421
422 /**
423 * Returns whether setStartData has been called or not.
424 *
425 * @return true if start completion info has been set.
426 */
427 public boolean isStarted() {
428 return started;
429 }
430
431 /**
432 * Returns whether setExecutionData has been called or not.
433 *
434 * @return true if execution completion info has been set, otherwise false.
435 */
436 public boolean isExecuted() {
437 return executed;
438 }
439
440 /**
441 * Returns whether setEndData has been called or not.
442 *
443 * @return true if end completion info has been set.
444 */
445 public boolean isEnded() {
446 return ended;
447 }
448
449 public void setExternalStatus(String externalStatus) {
450 action.setExternalStatus(externalStatus);
451 }
452
453 @Override
454 public String getRecoveryId() {
455 return action.getId() + RECOVERY_ID_SEPARATOR + workflow.getRun();
456 }
457
458 /* (non-Javadoc)
459 * @see org.apache.oozie.action.ActionExecutor.Context#getActionDir()
460 */
461 public Path getActionDir() throws HadoopAccessorException, IOException, URISyntaxException {
462 String name = getWorkflow().getId() + "/" + action.getName() + "--" + action.getType();
463 FileSystem fs = getAppFileSystem();
464 String actionDirPath = Services.get().getSystemId() + "/" + name;
465 Path fqActionDir = new Path(fs.getHomeDirectory(), actionDirPath);
466 return fqActionDir;
467 }
468
469 /* (non-Javadoc)
470 * @see org.apache.oozie.action.ActionExecutor.Context#getAppFileSystem()
471 */
472 public FileSystem getAppFileSystem() throws HadoopAccessorException, IOException, URISyntaxException {
473 WorkflowJob workflow = getWorkflow();
474 URI uri = new URI(getWorkflow().getAppPath());
475 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
476 Configuration fsConf = has.createJobConf(uri.getAuthority());
477 return has.createFileSystem(workflow.getUser(), uri, fsConf);
478
479 }
480
481 /* (non-Javadoc)
482 * @see org.apache.oozie.action.ActionExecutor.Context#setErrorInfo(java.lang.String, java.lang.String)
483 */
484 @Override
485 public void setErrorInfo(String str, String exMsg) {
486 action.setErrorInfo(str, exMsg);
487 }
488 }
489
490 }