This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.wf;
019
020 import java.io.IOException;
021 import java.io.StringReader;
022 import java.net.URI;
023 import java.net.URISyntaxException;
024 import java.util.Date;
025 import java.util.Properties;
026
027 import org.apache.hadoop.conf.Configuration;
028 import org.apache.hadoop.fs.FileSystem;
029 import org.apache.hadoop.fs.Path;
030 import org.apache.oozie.DagELFunctions;
031 import org.apache.oozie.WorkflowActionBean;
032 import org.apache.oozie.WorkflowJobBean;
033 import org.apache.oozie.action.ActionExecutor;
034 import org.apache.oozie.client.WorkflowAction;
035 import org.apache.oozie.client.WorkflowJob;
036 import org.apache.oozie.command.CommandException;
037 import org.apache.oozie.service.CallbackService;
038 import org.apache.oozie.service.ELService;
039 import org.apache.oozie.service.HadoopAccessorException;
040 import org.apache.oozie.service.HadoopAccessorService;
041 import org.apache.oozie.service.Services;
042 import org.apache.oozie.store.StoreException;
043 import org.apache.oozie.store.WorkflowStore;
044 import org.apache.oozie.util.ELEvaluator;
045 import org.apache.oozie.util.Instrumentation;
046 import org.apache.oozie.util.XConfiguration;
047 import org.apache.oozie.util.XLog;
048 import org.apache.oozie.workflow.WorkflowException;
049 import org.apache.oozie.workflow.WorkflowInstance;
050 import org.apache.oozie.workflow.lite.LiteWorkflowInstance;
051
052 /**
053 * Base class for Action execution commands. Provides common functionality to handle different types of errors while
054 * attempting to start or end an action.
055 */
056 public abstract class ActionCommand<T> extends WorkflowCommand<Void> {
057 private static final String INSTRUMENTATION_GROUP = "action.executors";
058
059 protected static final String INSTR_FAILED_JOBS_COUNTER = "failed";
060
061 protected static final String RECOVERY_ID_SEPARATOR = "@";
062
063 public ActionCommand(String name, String type, int priority) {
064 super(name, type, priority, XLog.STD);
065 }
066
067 /**
068 * Takes care of Transient failures. Sets the action status to retry and increments the retry count if not enough
069 * attempts have been made. Otherwise returns false.
070 *
071 * @param context the execution context.
072 * @param executor the executor instance being used.
073 * @param status the status to be set for the action.
074 * @return true if the action is scheduled for another retry. false if the number of retries has exceeded the
075 * maximum number of configured retries.
076 * @throws StoreException
077 * @throws org.apache.oozie.command.CommandException
078 */
079 protected boolean handleTransient(ActionExecutor.Context context, ActionExecutor executor, WorkflowAction.Status status)
080 throws StoreException, CommandException {
081 XLog.getLog(getClass()).debug("Attempting to retry");
082 ActionExecutorContext aContext = (ActionExecutorContext) context;
083 WorkflowActionBean action = (WorkflowActionBean) aContext.getAction();
084 incrActionErrorCounter(action.getType(), "transient", 1);
085
086 int actionRetryCount = action.getRetries();
087 if (actionRetryCount >= executor.getMaxRetries()) {
088 XLog.getLog(getClass()).warn("Exceeded max retry count [{0}]. Suspending Job", executor.getMaxRetries());
089 return false;
090 }
091 else {
092 action.setStatus(status);
093 action.setPending();
094 action.incRetries();
095 long retryDelayMillis = executor.getRetryInterval() * 1000;
096 action.setPendingAge(new Date(System.currentTimeMillis() + retryDelayMillis));
097 XLog.getLog(getClass()).info("Next Retry, Attempt Number [{0}] in [{1}] milliseconds",
098 actionRetryCount + 1, retryDelayMillis);
099 queueCallable(this, retryDelayMillis);
100 return true;
101 }
102 }
103
104 /**
105 * Takes care of non transient failures. The job is suspended, and the state of the action is changed to *MANUAL
106 * and set pending flag of action to false
107 *
108 * @param store WorkflowStore
109 * @param context the execution context.
110 * @param executor the executor instance being used.
111 * @param status the status to be set for the action.
112 * @throws StoreException
113 * @throws CommandException
114 */
115 protected void handleNonTransient(WorkflowStore store, ActionExecutor.Context context, ActionExecutor executor,
116 WorkflowAction.Status status)
117 throws StoreException, CommandException {
118 ActionExecutorContext aContext = (ActionExecutorContext) context;
119 WorkflowActionBean action = (WorkflowActionBean) aContext.getAction();
120 incrActionErrorCounter(action.getType(), "nontransient", 1);
121 WorkflowJobBean workflow = (WorkflowJobBean) context.getWorkflow();
122 String id = workflow.getId();
123 action.setStatus(status);
124 action.resetPendingOnly();
125 XLog.getLog(getClass()).warn("Suspending Workflow Job id=" + id);
126 try {
127 SuspendCommand.suspendJob(store, workflow, id, action.getId());
128 }
129 catch (WorkflowException e) {
130 throw new CommandException(e);
131 }
132 }
133
134 /**
135 * Takes care of errors. </p> For errors while attempting to start the action, the job state is updated and an
136 * {@link ActionEndCommand} is queued. </p> For errors while attempting to end the action, the job state is updated.
137 * </p>
138 *
139 * @param context the execution context.
140 * @param executor the executor instance being used.
141 * @param message
142 * @param isStart whether the error was generated while starting or ending an action.
143 * @param status the status to be set for the action.
144 * @throws org.apache.oozie.command.CommandException
145 */
146 protected void handleError(ActionExecutor.Context context, ActionExecutor executor, String message,
147 boolean isStart, WorkflowAction.Status status) throws CommandException {
148 XLog.getLog(getClass()).warn("Setting Action Status to [{0}]", status);
149 ActionExecutorContext aContext = (ActionExecutorContext) context;
150 WorkflowActionBean action = (WorkflowActionBean) aContext.getAction();
151 incrActionErrorCounter(action.getType(), "error", 1);
152 action.setPending();
153 if (isStart) {
154 action.setExecutionData(message, null);
155 queueCallable(new ActionEndCommand(action.getId(), action.getType()));
156 }
157 else {
158 action.setEndData(status, WorkflowAction.Status.ERROR.toString());
159 }
160 }
161
162 public void failJob(ActionExecutor.Context context) throws CommandException {
163 ActionExecutorContext aContext = (ActionExecutorContext) context;
164 WorkflowActionBean action = (WorkflowActionBean) aContext.getAction();
165 incrActionErrorCounter(action.getType(), "failed", 1);
166 WorkflowJobBean workflow = (WorkflowJobBean) context.getWorkflow();
167 XLog.getLog(getClass()).warn("Failing Job due to failed action [{0}]", action.getName());
168 try {
169 workflow.getWorkflowInstance().fail(action.getName());
170 WorkflowInstance wfInstance = workflow.getWorkflowInstance();
171 ((LiteWorkflowInstance) wfInstance).setStatus(WorkflowInstance.Status.FAILED);
172 workflow.setWorkflowInstance(wfInstance);
173 workflow.setStatus(WorkflowJob.Status.FAILED);
174 action.setStatus(WorkflowAction.Status.FAILED);
175 action.resetPending();
176 queueCallable(new NotificationCommand(workflow, action));
177 queueCallable(new KillCommand(workflow.getId()));
178 incrJobCounter(INSTR_FAILED_JOBS_COUNTER, 1);
179 }
180 catch (WorkflowException ex) {
181 throw new CommandException(ex);
182 }
183 }
184
185 private void incrActionErrorCounter(String type, String error, int count) {
186 getInstrumentation().incr(INSTRUMENTATION_GROUP, type + "#ex." + error, count);
187 }
188
189 protected void incrActionCounter(String type, int count) {
190 getInstrumentation().incr(INSTRUMENTATION_GROUP, type + "#" + getName(), count);
191 }
192
193 protected void addActionCron(String type, Instrumentation.Cron cron) {
194 getInstrumentation().addCron(INSTRUMENTATION_GROUP, type + "#" + getName(), cron);
195 }
196
197 public static class ActionExecutorContext implements ActionExecutor.Context {
198 private WorkflowJobBean workflow;
199 private Configuration protoConf;
200 private WorkflowActionBean action;
201 private boolean isRetry;
202 private boolean started;
203 private boolean ended;
204 private boolean executed;
205
206 public ActionExecutorContext(WorkflowJobBean workflow, WorkflowActionBean action, boolean isRetry) {
207 this.workflow = workflow;
208 this.action = action;
209 this.isRetry = isRetry;
210 try {
211 protoConf = new XConfiguration(new StringReader(workflow.getProtoActionConf()));
212 }
213 catch (IOException ex) {
214 throw new RuntimeException("It should not happen", ex);
215 }
216 }
217
218 public String getCallbackUrl(String externalStatusVar) {
219 return Services.get().get(CallbackService.class).createCallBackUrl(action.getId(), externalStatusVar);
220 }
221
222 public Configuration getProtoActionConf() {
223 return protoConf;
224 }
225
226 public WorkflowJob getWorkflow() {
227 return workflow;
228 }
229
230 public WorkflowAction getAction() {
231 return action;
232 }
233
234 public ELEvaluator getELEvaluator() {
235 ELEvaluator evaluator = Services.get().get(ELService.class).createEvaluator("workflow");
236 DagELFunctions.configureEvaluator(evaluator, workflow, action);
237 return evaluator;
238 }
239
240 public void setVar(String name, String value) {
241 name = action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + name;
242 WorkflowInstance wfInstance = workflow.getWorkflowInstance();
243 wfInstance.setVar(name, value);
244 //workflow.getWorkflowInstance().setVar(name, value);
245 workflow.setWorkflowInstance(wfInstance);
246 }
247
248 public String getVar(String name) {
249 name = action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + name;
250 return workflow.getWorkflowInstance().getVar(name);
251 }
252
253 public void setStartData(String externalId, String trackerUri, String consoleUrl) {
254 action.setStartData(externalId, trackerUri, consoleUrl);
255 started = true;
256 }
257
258 public void setExecutionData(String externalStatus, Properties actionData) {
259 action.setExecutionData(externalStatus, actionData);
260 executed = true;
261 }
262
263 public void setEndData(WorkflowAction.Status status, String signalValue) {
264 action.setEndData(status, signalValue);
265 ended = true;
266 }
267
268 public boolean isRetry() {
269 return isRetry;
270 }
271
272 /**
273 * Returns whether setStartData has been called or not.
274 *
275 * @return true if start completion info has been set.
276 */
277 public boolean isStarted() {
278 return started;
279 }
280
281 /**
282 * Returns whether setExecutionData has been called or not.
283 *
284 * @return true if execution completion info has been set, otherwise false.
285 */
286 public boolean isExecuted() {
287 return executed;
288 }
289
290
291 /**
292 * Returns whether setEndData has been called or not.
293 *
294 * @return true if end completion info has been set.
295 */
296 public boolean isEnded() {
297 return ended;
298 }
299
300 public void setExternalStatus(String externalStatus) {
301 action.setExternalStatus(externalStatus);
302 }
303
304 @Override
305 public String getRecoveryId() {
306 return action.getId() + RECOVERY_ID_SEPARATOR + workflow.getRun();
307 }
308
309 /* (non-Javadoc)
310 * @see org.apache.oozie.action.ActionExecutor.Context#getActionDir()
311 */
312 public Path getActionDir() throws HadoopAccessorException, IOException, URISyntaxException {
313 String name = getWorkflow().getId() + "/" + action.getName() + "--" + action.getType();
314 FileSystem fs = getAppFileSystem();
315 String actionDirPath = Services.get().getSystemId() + "/" + name;
316 Path fqActionDir = new Path(fs.getHomeDirectory(), actionDirPath);
317 return fqActionDir;
318 }
319
320 /* (non-Javadoc)
321 * @see org.apache.oozie.action.ActionExecutor.Context#getAppFileSystem()
322 */
323 public FileSystem getAppFileSystem() throws HadoopAccessorException, IOException, URISyntaxException {
324 WorkflowJob workflow = getWorkflow();
325 XConfiguration jobConf = new XConfiguration(new StringReader(workflow.getConf()));
326 Configuration fsConf = new Configuration();
327 XConfiguration.copy(jobConf, fsConf);
328 return Services.get().get(HadoopAccessorService.class).createFileSystem(workflow.getUser(),
329 workflow.getGroup(), new URI(getWorkflow().getAppPath()), fsConf);
330
331 }
332
333 @Override
334 public void setErrorInfo(String str, String exMsg) {
335 action.setErrorInfo(str, exMsg);
336 }
337 }
338
339 }