This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.action;
019
020 import org.apache.hadoop.fs.FileSystem;
021 import org.apache.hadoop.fs.Path;
022 import org.apache.hadoop.conf.Configuration;
023 import org.apache.oozie.client.WorkflowAction;
024 import org.apache.oozie.client.WorkflowJob;
025 import org.apache.oozie.util.ELEvaluator;
026 import org.apache.oozie.util.ParamChecker;
027 import org.apache.oozie.util.XLog;
028 import org.apache.oozie.service.HadoopAccessorException;
029 import org.apache.oozie.service.Services;
030
031 import java.io.IOException;
032 import java.net.URISyntaxException;
033 import java.util.HashMap;
034 import java.util.Map;
035 import java.util.Properties;
036 import java.util.LinkedHashMap;
037
038 /**
039 * Base action executor class. <p/> All the action executors should extend this class.
040 */
041 public abstract class ActionExecutor {
042
043 /**
044 * Configuration prefix for action executor (sub-classes) properties.
045 */
046 public static final String CONF_PREFIX = "oozie.action.";
047
048 /**
049 * Error code used by {@link #convertException} when there is not register error information for an exception.
050 */
051 public static final String ERROR_OTHER = "OTHER";
052
053 private static class ErrorInfo {
054 ActionExecutorException.ErrorType errorType;
055 String errorCode;
056
057 private ErrorInfo(ActionExecutorException.ErrorType errorType, String errorCode) {
058 this.errorType = errorType;
059 this.errorCode = errorCode;
060 }
061 }
062
063 private static boolean initMode = false;
064 private static Map<String, Map<Class, ErrorInfo>> ERROR_INFOS = new HashMap<String, Map<Class, ErrorInfo>>();
065
066 /**
067 * Context information passed to the ActionExecutor methods.
068 */
069 public interface Context {
070
071 /**
072 * Create the callback URL for the action.
073 *
074 * @param externalStatusVar variable for the caller to inject the external status.
075 * @return the callback URL.
076 */
077 public String getCallbackUrl(String externalStatusVar);
078
079 /**
080 * Return a proto configuration for actions with auth properties already set.
081 *
082 * @return a proto configuration for actions with auth properties already set.
083 */
084 public Configuration getProtoActionConf();
085
086 /**
087 * Return the workflow job.
088 *
089 * @return the workflow job.
090 */
091 public WorkflowJob getWorkflow();
092
093 /**
094 * Return an ELEvaluator with the context injected.
095 *
096 * @return configured ELEvaluator.
097 */
098 public ELEvaluator getELEvaluator();
099
100 /**
101 * Set a workflow action variable. <p/> Convenience method that prefixes the variable name with the action name
102 * plus a '.'.
103 *
104 * @param name variable name.
105 * @param value variable value, <code>null</code> removes the variable.
106 */
107 public void setVar(String name, String value);
108
109 /**
110 * Get a workflow action variable. <p/> Convenience method that prefixes the variable name with the action name
111 * plus a '.'.
112 *
113 * @param name variable name.
114 * @return the variable value, <code>null</code> if not set.
115 */
116 public String getVar(String name);
117
118 /**
119 * Set the action tracking information for an successfully started action.
120 *
121 * @param externalId the action external ID.
122 * @param trackerUri the action tracker URI.
123 * @param consoleUrl the action console URL.
124 */
125 void setStartData(String externalId, String trackerUri, String consoleUrl);
126
127 /**
128 * Set the action execution completion information for an action. The action status is set to {@link
129 * org.apache.oozie.client.WorkflowAction.Status#DONE}
130 *
131 * @param externalStatus the action external end status.
132 * @param actionData the action data on completion, <code>null</code> if none.
133 */
134 void setExecutionData(String externalStatus, Properties actionData);
135
136 /**
137 * Set the action end completion information for a completed action.
138 *
139 * @param status the action end status, it can be {@link org.apache.oozie.client.WorkflowAction.Status#OK} or
140 * {@link org.apache.oozie.client.WorkflowAction.Status#ERROR}.
141 * @param signalValue the action external end status.
142 */
143 void setEndData(WorkflowAction.Status status, String signalValue);
144
145 /**
146 * Return if the executor invocation is a retry or not.
147 *
148 * @return if the executor invocation is a retry or not.
149 */
150 boolean isRetry();
151
152 /**
153 * Sets the external status for the action in context.
154 *
155 * @param externalStatus the external status.
156 */
157 void setExternalStatus(String externalStatus);
158
159 /**
160 * Get the Action Recovery ID.
161 *
162 * @return recovery ID.
163 */
164 String getRecoveryId();
165
166 /*
167 * @return the path that will be used to store action specific data
168 * @throws IOException @throws URISyntaxException @throws HadoopAccessorException
169 */
170 public Path getActionDir() throws HadoopAccessorException, IOException, URISyntaxException;
171
172 /**
173 * @return filesystem handle for the application deployment fs.
174 * @throws IOException
175 * @throws URISyntaxException
176 * @throws HadoopAccessorException
177 */
178 public FileSystem getAppFileSystem() throws HadoopAccessorException, IOException, URISyntaxException;
179
180 public void setErrorInfo(String str, String exMsg);
181 }
182
183 /**
184 * Define the default maximum number of retry attempts for transient errors (total attempts = 1 + MAX_RETRIES).
185 */
186 public static final int MAX_RETRIES = 3;
187
188 /**
189 * Define the default inteval in seconds between retries.
190 */
191 public static final long RETRY_INTERVAL = 60;
192
193 private String type;
194 private int maxRetries;
195 private long retryInterval;
196
197 /**
198 * Create an action executor with default retry parameters.
199 *
200 * @param type action executor type.
201 */
202 protected ActionExecutor(String type) {
203 this(type, MAX_RETRIES, RETRY_INTERVAL);
204 }
205
206 /**
207 * Create an action executor.
208 *
209 * @param type action executor type.
210 * @param retryAttempts retry attempts.
211 * @param retryInterval retry interval, in seconds.
212 */
213 protected ActionExecutor(String type, int retryAttempts, long retryInterval) {
214 this.type = ParamChecker.notEmpty(type, "type");
215 this.maxRetries = retryAttempts;
216 this.retryInterval = retryInterval;
217 }
218
219 /**
220 * Clear all init settings for all action types.
221 */
222 public static void resetInitInfo() {
223 if (!initMode) {
224 throw new IllegalStateException("Error, action type info locked");
225 }
226 ERROR_INFOS.clear();
227 }
228
229 /**
230 * Enable action type initialization.
231 */
232 public static void enableInit() {
233 initMode = true;
234 }
235
236 /**
237 * Disable action type initialization.
238 */
239 public static void disableInit() {
240 initMode = false;
241 }
242
243 /**
244 * Invoked once at system initialization time. <p/> It can be used to register error information for the expected
245 * exceptions. Exceptions should be register from subclasses to superclasses to ensure proper detection, same thing
246 * that it is done in a normal catch. <p/> This method should invoke the {@link #registerError} method to register
247 * all its possible errors. <p/> Subclasses overriding must invoke super.
248 */
249 public void initActionType() {
250 ERROR_INFOS.put(getType(), new LinkedHashMap<Class, ErrorInfo>());
251 }
252
253 /**
254 * Return the system ID, this ID is defined in Oozie configuration.
255 *
256 * @return the system ID.
257 */
258 public String getOozieSystemId() {
259 return Services.get().getSystemId();
260 }
261
262 /**
263 * Return the runtime directory of the Oozie instance. <p/> The directory is created under TMP and it is always a
264 * new directory per system initialization.
265 *
266 * @return the runtime directory of the Oozie instance.
267 */
268 public String getOozieRuntimeDir() {
269 return Services.get().getRuntimeDir();
270 }
271
272 /**
273 * Return Oozie configuration. <p/> This is useful for actions that need access to configuration properties.
274 *
275 * @return Oozie configuration.
276 */
277 public Configuration getOozieConf() {
278 return Services.get().getConf();
279 }
280
281 /**
282 * Register error handling information for an exception.
283 *
284 * @param exClass excpetion class name (to work in case of a particular exception not being in the classpath, needed
285 * to be able to handle multiple version of Hadoop or other JARs used by executors with the same codebase).
286 * @param errorType error type for the exception.
287 * @param errorCode error code for the exception.
288 */
289 protected void registerError(String exClass, ActionExecutorException.ErrorType errorType, String errorCode) {
290 if (!initMode) {
291 throw new IllegalStateException("Error, action type info locked");
292 }
293 try {
294 Class klass = Thread.currentThread().getContextClassLoader().loadClass(exClass);
295 Map<Class, ErrorInfo> executorErrorInfo = ERROR_INFOS.get(getType());
296 executorErrorInfo.put(klass, new ErrorInfo(errorType, errorCode));
297 }
298 catch (ClassNotFoundException ex) {
299 XLog.getLog(getClass()).warn(
300 "Exception [{0}] no in classpath, ActionExecutor [{1}] will handled it as ERROR", exClass,
301 getType());
302 }
303 }
304
305 /**
306 * Return the action executor type.
307 *
308 * @return the action executor type.
309 */
310 public String getType() {
311 return type;
312 }
313
314 /**
315 * Return the maximum number of retries for the action executor.
316 *
317 * @return the maximum number of retries for the action executor.
318 */
319 public int getMaxRetries() {
320 return maxRetries;
321 }
322
323 /**
324 * Set the maximum number of retries for the action executor.
325 *
326 * @param maxRetries the maximum number of retries.
327 */
328 public void setMaxRetries(int maxRetries) {
329 this.maxRetries = maxRetries;
330 }
331
332 /**
333 * Return the retry interval for the action executor in seconds.
334 *
335 * @return the retry interval for the action executor in seconds.
336 */
337 public long getRetryInterval() {
338 return retryInterval;
339 }
340
341 /**
342 * Sets the retry interval for the action executor.
343 *
344 * @param retryInterval retry interval in seconds.
345 */
346 public void setRetryInterval(long retryInterval) {
347 this.retryInterval = retryInterval;
348 }
349
350 /**
351 * Utility method to handle exceptions in the {@link #start}, {@link #end}, {@link #kill} and {@link #check} methods
352 * <p/> It uses the error registry to convert exceptions to {@link ActionExecutorException}s.
353 *
354 * @param ex exception to convert.
355 * @return ActionExecutorException converted exception.
356 */
357 @SuppressWarnings({"ThrowableInstanceNeverThrown"})
358 protected ActionExecutorException convertException(Exception ex) {
359 if (ex instanceof ActionExecutorException) {
360 return (ActionExecutorException) ex;
361 }
362 for (Map.Entry<Class, ErrorInfo> errorInfo : ERROR_INFOS.get(getType()).entrySet()) {
363 if (errorInfo.getKey().isInstance(ex)) {
364 return new ActionExecutorException(errorInfo.getValue().errorType, errorInfo.getValue().errorCode,
365 "{0}", ex.getMessage(), ex);
366 }
367 }
368 String errorCode = ex.getClass().getName();
369 errorCode = errorCode.substring(errorCode.lastIndexOf(".") + 1);
370 return new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, errorCode, "{0}", ex.getMessage(),
371 ex);
372 }
373
374 /**
375 * Convenience method that return the signal for an Action based on the action status.
376 *
377 * @param status action status.
378 * @return the action signal.
379 */
380 protected String getActionSignal(WorkflowAction.Status status) {
381 switch (status) {
382 case OK:
383 return "OK";
384 case ERROR:
385 case KILLED:
386 return "ERROR";
387 default:
388 throw new IllegalArgumentException("Action status for signal can only be OK or ERROR");
389 }
390 }
391
392 /**
393 * Return the path that will be used to store action specific data
394 *
395 * @param jobId Worfklow ID
396 * @param action Action
397 * @param key An Identifier
398 * @param temp temp directory flag
399 * @return A string that has the path
400 */
401 protected String getActionDirPath(String jobId, WorkflowAction action, String key, boolean temp) {
402 String name = jobId + "/" + action.getName() + "--" + key;
403 if (temp) {
404 name += ".temp";
405 }
406 return getOozieSystemId() + "/" + name;
407 }
408
409 /**
410 * Return the path that will be used to store action specific data.
411 *
412 * @param jobId Workflow ID
413 * @param action Action
414 * @param key An identifier
415 * @param temp Temp directory flag
416 * @return Path to the directory
417 */
418 public Path getActionDir(String jobId, WorkflowAction action, String key, boolean temp) {
419 return new Path(getActionDirPath(jobId, action, key, temp));
420 }
421
422 /**
423 * Start an action. <p/> The {@link Context#setStartData} method must be called within this method. <p/> If the
424 * action has completed, the {@link Context#setExecutionData} method must be called within this method.
425 *
426 * @param context executor context.
427 * @param action the action to start.
428 * @throws ActionExecutorException thrown if the action could not start.
429 */
430 public abstract void start(Context context, WorkflowAction action) throws ActionExecutorException;
431
432 /**
433 * End an action after it has executed. <p/> The {@link Context#setEndData} method must be called within this
434 * method.
435 *
436 * @param context executor context.
437 * @param action the action to end.
438 * @throws ActionExecutorException thrown if the action could not end.
439 */
440 public abstract void end(Context context, WorkflowAction action) throws ActionExecutorException;
441
442 /**
443 * Check if an action has completed. This method must be implemented by Async Action Executors. <p/> If the action
444 * has completed, the {@link Context#setExecutionData} method must be called within this method. <p/> If the action
445 * has not completed, the {@link Context#setExternalStatus} method must be called within this method.
446 *
447 * @param context executor context.
448 * @param action the action to end.
449 * @throws ActionExecutorException thrown if the action could not be checked.
450 */
451 public abstract void check(Context context, WorkflowAction action) throws ActionExecutorException;
452
453 /**
454 * Kill an action. <p/> The {@link Context#setEndData} method must be called within this method.
455 *
456 * @param context executor context.
457 * @param action the action to kill.
458 * @throws ActionExecutorException thrown if the action could not be killed.
459 */
460 public abstract void kill(Context context, WorkflowAction action) throws ActionExecutorException;
461
462 /**
463 * Return if the external status indicates that the action has completed.
464 *
465 * @param externalStatus external status to check.
466 * @return if the external status indicates that the action has completed.
467 */
468 public abstract boolean isCompleted(String externalStatus);
469
470 }