This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.wf;
019
020 import java.io.IOException;
021 import java.io.StringReader;
022 import java.net.URI;
023 import java.net.URISyntaxException;
024 import java.util.Date;
025 import java.util.Properties;
026 import java.util.Set;
027
028 import org.apache.hadoop.conf.Configuration;
029 import org.apache.hadoop.fs.FileSystem;
030 import org.apache.hadoop.fs.Path;
031 import org.apache.oozie.DagELFunctions;
032 import org.apache.oozie.ErrorCode;
033 import org.apache.oozie.WorkflowActionBean;
034 import org.apache.oozie.WorkflowJobBean;
035 import org.apache.oozie.action.ActionExecutor;
036 import org.apache.oozie.client.WorkflowAction;
037 import org.apache.oozie.client.WorkflowJob;
038 import org.apache.oozie.command.CommandException;
039 import org.apache.oozie.command.coord.CoordActionUpdateXCommand;
040 import org.apache.oozie.service.CallbackService;
041 import org.apache.oozie.service.ELService;
042 import org.apache.oozie.service.HadoopAccessorException;
043 import org.apache.oozie.service.HadoopAccessorService;
044 import org.apache.oozie.service.JPAService;
045 import org.apache.oozie.service.LiteWorkflowStoreService;
046 import org.apache.oozie.service.Services;
047 import org.apache.oozie.util.ELEvaluator;
048 import org.apache.oozie.util.InstrumentUtils;
049 import org.apache.oozie.util.Instrumentation;
050 import org.apache.oozie.util.XConfiguration;
051 import org.apache.oozie.workflow.WorkflowException;
052 import org.apache.oozie.workflow.WorkflowInstance;
053 import org.apache.oozie.workflow.lite.LiteWorkflowInstance;
054
055 /**
056 * Base class for Action execution commands. Provides common functionality to handle different types of errors while
057 * attempting to start or end an action.
058 */
059 public abstract class ActionXCommand<T> extends WorkflowXCommand<Void> {
060 private static final String INSTRUMENTATION_GROUP = "action.executors";
061
062 protected static final String INSTR_FAILED_JOBS_COUNTER = "failed";
063
064 protected static final String RECOVERY_ID_SEPARATOR = "@";
065
066 public ActionXCommand(String name, String type, int priority) {
067 super(name, type, priority);
068 }
069
070 /**
071 * Takes care of Transient failures. Sets the action status to retry and increments the retry count if not enough
072 * attempts have been made. Otherwise returns false.
073 *
074 * @param context the execution context.
075 * @param executor the executor instance being used.
076 * @param status the status to be set for the action.
077 * @return true if the action is scheduled for another retry. false if the number of retries has exceeded the
078 * maximum number of configured retries.
079 * @throws CommandException thrown if unable to handle transient
080 */
081 protected boolean handleTransient(ActionExecutor.Context context, ActionExecutor executor,
082 WorkflowAction.Status status) throws CommandException {
083 LOG.debug("Attempting to retry");
084 ActionExecutorContext aContext = (ActionExecutorContext) context;
085 WorkflowActionBean action = (WorkflowActionBean) aContext.getAction();
086 incrActionErrorCounter(action.getType(), "transient", 1);
087
088 int actionRetryCount = action.getRetries();
089 if (actionRetryCount >= executor.getMaxRetries()) {
090 LOG.warn("Exceeded max retry count [{0}]. Suspending Job", executor.getMaxRetries());
091 return false;
092 }
093 else {
094 action.setStatus(status);
095 action.setPending();
096 action.incRetries();
097 long retryDelayMillis = executor.getRetryInterval() * 1000;
098 action.setPendingAge(new Date(System.currentTimeMillis() + retryDelayMillis));
099 LOG.info("Next Retry, Attempt Number [{0}] in [{1}] milliseconds", actionRetryCount + 1, retryDelayMillis);
100 this.resetUsed();
101 queue(this, retryDelayMillis);
102 return true;
103 }
104 }
105
106 /**
107 * Takes care of non transient failures. The job is suspended, and the state of the action is changed to *MANUAL and
108 * set pending flag of action to false
109 *
110 * @param context the execution context.
111 * @param executor the executor instance being used.
112 * @param status the status to be set for the action.
113 * @throws CommandException thrown if unable to suspend job
114 */
115 protected void handleNonTransient(ActionExecutor.Context context, ActionExecutor executor,
116 WorkflowAction.Status status) throws CommandException {
117 ActionExecutorContext aContext = (ActionExecutorContext) context;
118 WorkflowActionBean action = (WorkflowActionBean) aContext.getAction();
119 incrActionErrorCounter(action.getType(), "nontransient", 1);
120 WorkflowJobBean workflow = (WorkflowJobBean) context.getWorkflow();
121 String id = workflow.getId();
122 action.setStatus(status);
123 action.resetPendingOnly();
124 LOG.warn("Suspending Workflow Job id=" + id);
125 try {
126 SuspendXCommand.suspendJob(Services.get().get(JPAService.class), workflow, id, action.getId(), null);
127 }
128 catch (Exception e) {
129 throw new CommandException(ErrorCode.E0727, id, e.getMessage());
130 }
131 finally {
132 // update coordinator action
133 new CoordActionUpdateXCommand(workflow, 3).call();
134 }
135 }
136
137 /**
138 * Takes care of errors. </p> For errors while attempting to start the action, the job state is updated and an
139 * {@link ActionEndCommand} is queued. </p> For errors while attempting to end the action, the job state is updated.
140 * </p>
141 *
142 * @param context the execution context.
143 * @param executor the executor instance being used.
144 * @param message
145 * @param isStart whether the error was generated while starting or ending an action.
146 * @param status the status to be set for the action.
147 * @throws CommandException thrown if unable to handle action error
148 */
149 protected void handleError(ActionExecutor.Context context, ActionExecutor executor, String message,
150 boolean isStart, WorkflowAction.Status status) throws CommandException {
151 LOG.warn("Setting Action Status to [{0}]", status);
152 ActionExecutorContext aContext = (ActionExecutorContext) context;
153 WorkflowActionBean action = (WorkflowActionBean) aContext.getAction();
154
155 if (!handleUserRetry(action)) {
156 incrActionErrorCounter(action.getType(), "error", 1);
157 action.setPending();
158 if (isStart) {
159 action.setExecutionData(message, null);
160 queue(new ActionEndXCommand(action.getId(), action.getType()));
161 }
162 else {
163 action.setEndData(status, WorkflowAction.Status.ERROR.toString());
164 }
165 }
166 }
167
168 /**
169 * Fail the job due to failed action
170 *
171 * @param context the execution context.
172 * @throws CommandException thrown if unable to fail job
173 */
174 public void failJob(ActionExecutor.Context context) throws CommandException {
175 ActionExecutorContext aContext = (ActionExecutorContext) context;
176 WorkflowActionBean action = (WorkflowActionBean) aContext.getAction();
177 WorkflowJobBean workflow = (WorkflowJobBean) context.getWorkflow();
178
179 if (!handleUserRetry(action)) {
180 incrActionErrorCounter(action.getType(), "failed", 1);
181 LOG.warn("Failing Job due to failed action [{0}]", action.getName());
182 try {
183 workflow.getWorkflowInstance().fail(action.getName());
184 WorkflowInstance wfInstance = workflow.getWorkflowInstance();
185 ((LiteWorkflowInstance) wfInstance).setStatus(WorkflowInstance.Status.FAILED);
186 workflow.setWorkflowInstance(wfInstance);
187 workflow.setStatus(WorkflowJob.Status.FAILED);
188 action.setStatus(WorkflowAction.Status.FAILED);
189 action.resetPending();
190 queue(new NotificationXCommand(workflow, action));
191 queue(new KillXCommand(workflow.getId()));
192 InstrumentUtils.incrJobCounter(INSTR_FAILED_JOBS_COUNTER, 1, getInstrumentation());
193 }
194 catch (WorkflowException ex) {
195 throw new CommandException(ex);
196 }
197 }
198 }
199
200 /**
201 * Execute retry for action if this action is eligible for user-retry
202 *
203 * @param context the execution context.
204 * @return true if user-retry has to be handled for this action
205 * @throws CommandException thrown if unable to fail job
206 */
207 public boolean handleUserRetry(WorkflowActionBean action) throws CommandException {
208 String errorCode = action.getErrorCode();
209 Set<String> allowedRetryCode = LiteWorkflowStoreService.getUserRetryErrorCode();
210
211 if (allowedRetryCode.contains(errorCode) && action.getUserRetryCount() < action.getUserRetryMax()) {
212 LOG.info("Preparing retry this action [{0}], errorCode [{1}], userRetryCount [{2}], "
213 + "userRetryMax [{3}], userRetryInterval [{4}]", action.getId(), errorCode, action
214 .getUserRetryCount(), action.getUserRetryMax(), action.getUserRetryInterval());
215 int interval = action.getUserRetryInterval() * 60 * 1000;
216 action.setStatus(WorkflowAction.Status.USER_RETRY);
217 action.incrmentUserRetryCount();
218 action.setPending();
219 queue(new ActionStartXCommand(action.getId(), action.getType()), interval);
220 return true;
221 }
222 return false;
223 }
224
225 /*
226 * In case of action error increment the error count for instrumentation
227 */
228 private void incrActionErrorCounter(String type, String error, int count) {
229 getInstrumentation().incr(INSTRUMENTATION_GROUP, type + "#ex." + error, count);
230 }
231
232 /**
233 * Increment the action counter in the instrumentation log. indicating how
234 * many times the action was executed since the start Oozie server
235 */
236 protected void incrActionCounter(String type, int count) {
237 getInstrumentation().incr(INSTRUMENTATION_GROUP, type + "#" + getName(), count);
238 }
239
240 /**
241 * Adding a cron for the instrumentation time for the given Instrumentation
242 * group
243 */
244 protected void addActionCron(String type, Instrumentation.Cron cron) {
245 getInstrumentation().addCron(INSTRUMENTATION_GROUP, type + "#" + getName(), cron);
246 }
247
248 /**
249 * Workflow action executor context
250 *
251 */
252 public static class ActionExecutorContext implements ActionExecutor.Context {
253 private final WorkflowJobBean workflow;
254 private Configuration protoConf;
255 private final WorkflowActionBean action;
256 private final boolean isRetry;
257 private final boolean isUserRetry;
258 private boolean started;
259 private boolean ended;
260 private boolean executed;
261
262 /**
263 * Constructing the ActionExecutorContext, setting the private members
264 * and constructing the proto configuration
265 */
266 public ActionExecutorContext(WorkflowJobBean workflow, WorkflowActionBean action, boolean isRetry, boolean isUserRetry) {
267 this.workflow = workflow;
268 this.action = action;
269 this.isRetry = isRetry;
270 this.isUserRetry = isUserRetry;
271 try {
272 protoConf = new XConfiguration(new StringReader(workflow.getProtoActionConf()));
273 }
274 catch (IOException ex) {
275 throw new RuntimeException("It should not happen", ex);
276 }
277 }
278
279 /*
280 * (non-Javadoc)
281 * @see org.apache.oozie.action.ActionExecutor.Context#getCallbackUrl(java.lang.String)
282 */
283 public String getCallbackUrl(String externalStatusVar) {
284 return Services.get().get(CallbackService.class).createCallBackUrl(action.getId(), externalStatusVar);
285 }
286
287 /*
288 * (non-Javadoc)
289 * @see org.apache.oozie.action.ActionExecutor.Context#getProtoActionConf()
290 */
291 public Configuration getProtoActionConf() {
292 return protoConf;
293 }
294
295 /*
296 * (non-Javadoc)
297 * @see org.apache.oozie.action.ActionExecutor.Context#getWorkflow()
298 */
299 public WorkflowJob getWorkflow() {
300 return workflow;
301 }
302
303 /**
304 * Returns the workflow action of the given action context
305 *
306 * @return the workflow action of the given action context
307 */
308 public WorkflowAction getAction() {
309 return action;
310 }
311
312 /*
313 * (non-Javadoc)
314 * @see org.apache.oozie.action.ActionExecutor.Context#getELEvaluator()
315 */
316 public ELEvaluator getELEvaluator() {
317 ELEvaluator evaluator = Services.get().get(ELService.class).createEvaluator("workflow");
318 DagELFunctions.configureEvaluator(evaluator, workflow, action);
319 return evaluator;
320 }
321
322 /*
323 * (non-Javadoc)
324 * @see org.apache.oozie.action.ActionExecutor.Context#setVar(java.lang.String, java.lang.String)
325 */
326 public void setVar(String name, String value) {
327 name = action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + name;
328 WorkflowInstance wfInstance = workflow.getWorkflowInstance();
329 wfInstance.setVar(name, value);
330 workflow.setWorkflowInstance(wfInstance);
331 }
332
333 /*
334 * (non-Javadoc)
335 * @see org.apache.oozie.action.ActionExecutor.Context#getVar(java.lang.String)
336 */
337 public String getVar(String name) {
338 name = action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + name;
339 return workflow.getWorkflowInstance().getVar(name);
340 }
341
342 /*
343 * (non-Javadoc)
344 * @see org.apache.oozie.action.ActionExecutor.Context#setStartData(java.lang.String, java.lang.String, java.lang.String)
345 */
346 public void setStartData(String externalId, String trackerUri, String consoleUrl) {
347 action.setStartData(externalId, trackerUri, consoleUrl);
348 started = true;
349 }
350
351 /**
352 * Setting the start time of the action
353 */
354 public void setStartTime() {
355 Date now = new Date();
356 action.setStartTime(now);
357 }
358
359 /*
360 * (non-Javadoc)
361 * @see org.apache.oozie.action.ActionExecutor.Context#setExecutionData(java.lang.String, java.util.Properties)
362 */
363 public void setExecutionData(String externalStatus, Properties actionData) {
364 action.setExecutionData(externalStatus, actionData);
365 executed = true;
366 }
367
368 /*
369 * (non-Javadoc)
370 * @see org.apache.oozie.action.ActionExecutor.Context#setExecutionStats(java.lang.String)
371 */
372 public void setExecutionStats(String jsonStats) {
373 action.setExecutionStats(jsonStats);
374 executed = true;
375 }
376
377 /*
378 * (non-Javadoc)
379 * @see org.apache.oozie.action.ActionExecutor.Context#setExternalChildIDs(java.lang.String)
380 */
381 public void setExternalChildIDs(String externalChildIDs) {
382 action.setExternalChildIDs(externalChildIDs);
383 executed = true;
384 }
385
386 /*
387 * (non-Javadoc)
388 * @see org.apache.oozie.action.ActionExecutor.Context#setEndData(org.apache.oozie.client.WorkflowAction.Status, java.lang.String)
389 */
390 public void setEndData(WorkflowAction.Status status, String signalValue) {
391 action.setEndData(status, signalValue);
392 ended = true;
393 }
394
395 /*
396 * (non-Javadoc)
397 * @see org.apache.oozie.action.ActionExecutor.Context#isRetry()
398 */
399 public boolean isRetry() {
400 return isRetry;
401 }
402
403 /**
404 * Return if the executor invocation is a user retry or not.
405 *
406 * @return if the executor invocation is a user retry or not.
407 */
408 public boolean isUserRetry() {
409 return isUserRetry;
410 }
411
412 /**
413 * Returns whether setStartData has been called or not.
414 *
415 * @return true if start completion info has been set.
416 */
417 public boolean isStarted() {
418 return started;
419 }
420
421 /**
422 * Returns whether setExecutionData has been called or not.
423 *
424 * @return true if execution completion info has been set, otherwise false.
425 */
426 public boolean isExecuted() {
427 return executed;
428 }
429
430 /**
431 * Returns whether setEndData has been called or not.
432 *
433 * @return true if end completion info has been set.
434 */
435 public boolean isEnded() {
436 return ended;
437 }
438
439 public void setExternalStatus(String externalStatus) {
440 action.setExternalStatus(externalStatus);
441 }
442
443 @Override
444 public String getRecoveryId() {
445 return action.getId() + RECOVERY_ID_SEPARATOR + workflow.getRun();
446 }
447
448 /* (non-Javadoc)
449 * @see org.apache.oozie.action.ActionExecutor.Context#getActionDir()
450 */
451 public Path getActionDir() throws HadoopAccessorException, IOException, URISyntaxException {
452 String name = getWorkflow().getId() + "/" + action.getName() + "--" + action.getType();
453 FileSystem fs = getAppFileSystem();
454 String actionDirPath = Services.get().getSystemId() + "/" + name;
455 Path fqActionDir = new Path(fs.getHomeDirectory(), actionDirPath);
456 return fqActionDir;
457 }
458
459 /* (non-Javadoc)
460 * @see org.apache.oozie.action.ActionExecutor.Context#getAppFileSystem()
461 */
462 public FileSystem getAppFileSystem() throws HadoopAccessorException, IOException, URISyntaxException {
463 WorkflowJob workflow = getWorkflow();
464 URI uri = new URI(getWorkflow().getAppPath());
465 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
466 Configuration fsConf = has.createJobConf(uri.getAuthority());
467 return has.createFileSystem(workflow.getUser(), uri, fsConf);
468
469 }
470
471 /* (non-Javadoc)
472 * @see org.apache.oozie.action.ActionExecutor.Context#setErrorInfo(java.lang.String, java.lang.String)
473 */
474 @Override
475 public void setErrorInfo(String str, String exMsg) {
476 action.setErrorInfo(str, exMsg);
477 }
478 }
479
480 }