This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.action;
019
020 import org.apache.hadoop.fs.FileSystem;
021 import org.apache.hadoop.fs.Path;
022 import org.apache.hadoop.conf.Configuration;
023 import org.apache.oozie.client.WorkflowAction;
024 import org.apache.oozie.client.WorkflowJob;
025 import org.apache.oozie.util.ELEvaluator;
026 import org.apache.oozie.util.ParamChecker;
027 import org.apache.oozie.util.XLog;
028 import org.apache.oozie.service.HadoopAccessorException;
029 import org.apache.oozie.service.Services;
030
031 import java.io.ByteArrayOutputStream;
032 import java.io.IOException;
033 import java.io.PrintStream;
034 import java.net.URISyntaxException;
035 import java.util.HashMap;
036 import java.util.Map;
037 import java.util.Properties;
038 import java.util.LinkedHashMap;
039
040 /**
041 * Base action executor class. <p/> All the action executors should extend this class.
042 */
043 public abstract class ActionExecutor {
044
045 /**
046 * Configuration prefix for action executor (sub-classes) properties.
047 */
048 public static final String CONF_PREFIX = "oozie.action.";
049
050 public static final String MAX_RETRIES = CONF_PREFIX + "retries.max";
051
052 /**
053 * Error code used by {@link #convertException} when there is not register error information for an exception.
054 */
055 public static final String ERROR_OTHER = "OTHER";
056
057 public boolean requiresNNJT = false;
058
059 private static class ErrorInfo {
060 ActionExecutorException.ErrorType errorType;
061 String errorCode;
062 Class<?> errorClass;
063
064 private ErrorInfo(ActionExecutorException.ErrorType errorType, String errorCode, Class<?> errorClass) {
065 this.errorType = errorType;
066 this.errorCode = errorCode;
067 this.errorClass = errorClass;
068 }
069 }
070
071 private static boolean initMode = false;
072 private static Map<String, Map<String, ErrorInfo>> ERROR_INFOS = new HashMap<String, Map<String, ErrorInfo>>();
073
074 /**
075 * Context information passed to the ActionExecutor methods.
076 */
077 public interface Context {
078
079 /**
080 * Create the callback URL for the action.
081 *
082 * @param externalStatusVar variable for the caller to inject the external status.
083 * @return the callback URL.
084 */
085 public String getCallbackUrl(String externalStatusVar);
086
087 /**
088 * Return a proto configuration for actions with auth properties already set.
089 *
090 * @return a proto configuration for actions with auth properties already set.
091 */
092 public Configuration getProtoActionConf();
093
094 /**
095 * Return the workflow job.
096 *
097 * @return the workflow job.
098 */
099 public WorkflowJob getWorkflow();
100
101 /**
102 * Return an ELEvaluator with the context injected.
103 *
104 * @return configured ELEvaluator.
105 */
106 public ELEvaluator getELEvaluator();
107
108 /**
109 * Set a workflow action variable. <p/> Convenience method that prefixes the variable name with the action name
110 * plus a '.'.
111 *
112 * @param name variable name.
113 * @param value variable value, <code>null</code> removes the variable.
114 */
115 public void setVar(String name, String value);
116
117 /**
118 * Get a workflow action variable. <p/> Convenience method that prefixes the variable name with the action name
119 * plus a '.'.
120 *
121 * @param name variable name.
122 * @return the variable value, <code>null</code> if not set.
123 */
124 public String getVar(String name);
125
126 /**
127 * Set the action tracking information for an successfully started action.
128 *
129 * @param externalId the action external ID.
130 * @param trackerUri the action tracker URI.
131 * @param consoleUrl the action console URL.
132 */
133 void setStartData(String externalId, String trackerUri, String consoleUrl);
134
135 /**
136 * Set the action execution completion information for an action. The action status is set to {@link
137 * org.apache.oozie.client.WorkflowAction.Status#DONE}
138 *
139 * @param externalStatus the action external end status.
140 * @param actionData the action data on completion, <code>null</code> if none.
141 */
142 void setExecutionData(String externalStatus, Properties actionData);
143
144 /**
145 * Set execution statistics information for a particular action. The action status is set to {@link
146 * org.apache.oozie.client.WorkflowAction.Status#DONE}
147 *
148 * @param jsonStats the JSON string representation of the stats.
149 */
150 void setExecutionStats(String jsonStats);
151
152 /**
153 * Set external child IDs for a particular action (Eg: pig). The action status is set to {@link
154 * org.apache.oozie.client.WorkflowAction.Status#DONE}
155 *
156 * @param externalChildIDs the external child IDs as a comma-delimited string.
157 */
158 void setExternalChildIDs(String externalChildIDs);
159
160 /**
161 * Set the action end completion information for a completed action.
162 *
163 * @param status the action end status, it can be {@link org.apache.oozie.client.WorkflowAction.Status#OK} or
164 * {@link org.apache.oozie.client.WorkflowAction.Status#ERROR}.
165 * @param signalValue the action external end status.
166 */
167 void setEndData(WorkflowAction.Status status, String signalValue);
168
169 /**
170 * Return if the executor invocation is a retry or not.
171 *
172 * @return if the executor invocation is a retry or not.
173 */
174 boolean isRetry();
175
176 /**
177 * Sets the external status for the action in context.
178 *
179 * @param externalStatus the external status.
180 */
181 void setExternalStatus(String externalStatus);
182
183 /**
184 * Get the Action Recovery ID.
185 *
186 * @return recovery ID.
187 */
188 String getRecoveryId();
189
190 /*
191 * @return the path that will be used to store action specific data
192 * @throws IOException @throws URISyntaxException @throws HadoopAccessorException
193 */
194 public Path getActionDir() throws HadoopAccessorException, IOException, URISyntaxException;
195
196 /**
197 * @return filesystem handle for the application deployment fs.
198 * @throws IOException
199 * @throws URISyntaxException
200 * @throws HadoopAccessorException
201 */
202 public FileSystem getAppFileSystem() throws HadoopAccessorException, IOException, URISyntaxException;
203
204 public void setErrorInfo(String str, String exMsg);
205 }
206
207
208 /**
209 * Define the default inteval in seconds between retries.
210 */
211 public static final long RETRY_INTERVAL = 60;
212
213 private String type;
214 private int maxRetries;
215 private long retryInterval;
216
217 /**
218 * Create an action executor with default retry parameters.
219 *
220 * @param type action executor type.
221 */
222 protected ActionExecutor(String type) {
223 this(type, RETRY_INTERVAL);
224 }
225
226 /**
227 * Create an action executor.
228 *
229 * @param type action executor type.
230 * @param retryAttempts retry attempts.
231 * @param retryInterval retry interval, in seconds.
232 */
233 protected ActionExecutor(String type, long retryInterval) {
234 this.type = ParamChecker.notEmpty(type, "type");
235 this.maxRetries = getOozieConf().getInt(MAX_RETRIES, 3);
236 this.retryInterval = retryInterval;
237 }
238
239 /**
240 * Clear all init settings for all action types.
241 */
242 public static void resetInitInfo() {
243 if (!initMode) {
244 throw new IllegalStateException("Error, action type info locked");
245 }
246 ERROR_INFOS.clear();
247 }
248
249 /**
250 * Enable action type initialization.
251 */
252 public static void enableInit() {
253 initMode = true;
254 }
255
256 /**
257 * Disable action type initialization.
258 */
259 public static void disableInit() {
260 initMode = false;
261 }
262
263 /**
264 * Invoked once at system initialization time. <p/> It can be used to register error information for the expected
265 * exceptions. Exceptions should be register from subclasses to superclasses to ensure proper detection, same thing
266 * that it is done in a normal catch. <p/> This method should invoke the {@link #registerError} method to register
267 * all its possible errors. <p/> Subclasses overriding must invoke super.
268 */
269 public void initActionType() {
270 XLog.getLog(getClass()).trace(" Init Action Type : [{0}]", getType());
271 ERROR_INFOS.put(getType(), new LinkedHashMap<String, ErrorInfo>());
272 }
273
274 /**
275 * Return the system ID, this ID is defined in Oozie configuration.
276 *
277 * @return the system ID.
278 */
279 public String getOozieSystemId() {
280 return Services.get().getSystemId();
281 }
282
283 /**
284 * Return the runtime directory of the Oozie instance. <p/> The directory is created under TMP and it is always a
285 * new directory per system initialization.
286 *
287 * @return the runtime directory of the Oozie instance.
288 */
289 public String getOozieRuntimeDir() {
290 return Services.get().getRuntimeDir();
291 }
292
293 /**
294 * Return Oozie configuration. <p/> This is useful for actions that need access to configuration properties.
295 *
296 * @return Oozie configuration.
297 */
298 public Configuration getOozieConf() {
299 return Services.get().getConf();
300 }
301
302 /**
303 * Register error handling information for an exception.
304 *
305 * @param exClass excpetion class name (to work in case of a particular exception not being in the classpath, needed
306 * to be able to handle multiple version of Hadoop or other JARs used by executors with the same codebase).
307 * @param errorType error type for the exception.
308 * @param errorCode error code for the exception.
309 */
310 protected void registerError(String exClass, ActionExecutorException.ErrorType errorType, String errorCode) {
311 if (!initMode) {
312 throw new IllegalStateException("Error, action type info locked");
313 }
314 try {
315 Class errorClass = Thread.currentThread().getContextClassLoader().loadClass(exClass);
316 Map<String, ErrorInfo> executorErrorInfo = ERROR_INFOS.get(getType());
317 executorErrorInfo.put(exClass, new ErrorInfo(errorType, errorCode, errorClass));
318 }
319 catch (ClassNotFoundException cnfe) {
320 XLog.getLog(getClass()).warn(
321 "Exception [{0}] not in classpath, ActionExecutor [{1}] will handle it as ERROR", exClass,
322 getType());
323 }
324 catch (java.lang.NoClassDefFoundError err) {
325 ByteArrayOutputStream baos = new ByteArrayOutputStream();
326 err.printStackTrace(new PrintStream(baos));
327 XLog.getLog(getClass()).warn(baos.toString());
328 }
329 }
330
331 /**
332 * Return the action executor type.
333 *
334 * @return the action executor type.
335 */
336 public String getType() {
337 return type;
338 }
339
340 /**
341 * Return the maximum number of retries for the action executor.
342 *
343 * @return the maximum number of retries for the action executor.
344 */
345 public int getMaxRetries() {
346 return maxRetries;
347 }
348
349 /**
350 * Set the maximum number of retries for the action executor.
351 *
352 * @param maxRetries the maximum number of retries.
353 */
354 public void setMaxRetries(int maxRetries) {
355 this.maxRetries = maxRetries;
356 }
357
358 /**
359 * Return the retry interval for the action executor in seconds.
360 *
361 * @return the retry interval for the action executor in seconds.
362 */
363 public long getRetryInterval() {
364 return retryInterval;
365 }
366
367 /**
368 * Sets the retry interval for the action executor.
369 *
370 * @param retryInterval retry interval in seconds.
371 */
372 public void setRetryInterval(long retryInterval) {
373 this.retryInterval = retryInterval;
374 }
375
376 /**
377 * Utility method to handle exceptions in the {@link #start}, {@link #end}, {@link #kill} and {@link #check} methods
378 * <p/> It uses the error registry to convert exceptions to {@link ActionExecutorException}s.
379 *
380 * @param ex exception to convert.
381 * @return ActionExecutorException converted exception.
382 */
383 @SuppressWarnings({"ThrowableInstanceNeverThrown"})
384 protected ActionExecutorException convertException(Exception ex) {
385 if (ex instanceof ActionExecutorException) {
386 return (ActionExecutorException) ex;
387 }
388
389 ActionExecutorException aee = null;
390 // Check the cause of the exception first
391 if (ex.getCause() != null) {
392 aee = convertExceptionHelper(ex.getCause());
393 }
394 // If the cause isn't registered or doesn't exist, check the exception itself
395 if (aee == null) {
396 aee = convertExceptionHelper(ex);
397 // If the cause isn't registered either, then just create a new ActionExecutorException
398 if (aee == null) {
399 String exClass = ex.getClass().getName();
400 String errorCode = exClass.substring(exClass.lastIndexOf(".") + 1);
401 aee = new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, errorCode, "{0}", ex.getMessage(), ex);
402 }
403 }
404 return aee;
405 }
406
407 private ActionExecutorException convertExceptionHelper(Throwable ex) {
408 Map<String, ErrorInfo> executorErrorInfo = ERROR_INFOS.get(getType());
409 // Check if we have registered ex
410 ErrorInfo classErrorInfo = executorErrorInfo.get(ex.getClass().getName());
411 if (classErrorInfo != null) {
412 return new ActionExecutorException(classErrorInfo.errorType, classErrorInfo.errorCode, "{0}", ex.getMessage(), ex);
413 }
414 // Else, check if a parent class of ex is registered
415 else {
416 for (ErrorInfo errorInfo : executorErrorInfo.values()) {
417 if (errorInfo.errorClass.isInstance(ex)) {
418 return new ActionExecutorException(errorInfo.errorType, errorInfo.errorCode, "{0}", ex.getMessage(), ex);
419 }
420 }
421 }
422 return null;
423 }
424
425 /**
426 * Convenience method that return the signal for an Action based on the action status.
427 *
428 * @param status action status.
429 * @return the action signal.
430 */
431 protected String getActionSignal(WorkflowAction.Status status) {
432 switch (status) {
433 case OK:
434 return "OK";
435 case ERROR:
436 case KILLED:
437 return "ERROR";
438 default:
439 throw new IllegalArgumentException("Action status for signal can only be OK or ERROR");
440 }
441 }
442
443 /**
444 * Return the path that will be used to store action specific data
445 *
446 * @param jobId Worfklow ID
447 * @param action Action
448 * @param key An Identifier
449 * @param temp temp directory flag
450 * @return A string that has the path
451 */
452 protected String getActionDirPath(String jobId, WorkflowAction action, String key, boolean temp) {
453 String name = jobId + "/" + action.getName() + "--" + key;
454 if (temp) {
455 name += ".temp";
456 }
457 return getOozieSystemId() + "/" + name;
458 }
459
460 /**
461 * Return the path that will be used to store action specific data.
462 *
463 * @param jobId Workflow ID
464 * @param action Action
465 * @param key An identifier
466 * @param temp Temp directory flag
467 * @return Path to the directory
468 */
469 public Path getActionDir(String jobId, WorkflowAction action, String key, boolean temp) {
470 return new Path(getActionDirPath(jobId, action, key, temp));
471 }
472
473 /**
474 * Start an action. <p/> The {@link Context#setStartData} method must be called within this method. <p/> If the
475 * action has completed, the {@link Context#setExecutionData} method must be called within this method.
476 *
477 * @param context executor context.
478 * @param action the action to start.
479 * @throws ActionExecutorException thrown if the action could not start.
480 */
481 public abstract void start(Context context, WorkflowAction action) throws ActionExecutorException;
482
483 /**
484 * End an action after it has executed. <p/> The {@link Context#setEndData} method must be called within this
485 * method.
486 *
487 * @param context executor context.
488 * @param action the action to end.
489 * @throws ActionExecutorException thrown if the action could not end.
490 */
491 public abstract void end(Context context, WorkflowAction action) throws ActionExecutorException;
492
493 /**
494 * Check if an action has completed. This method must be implemented by Async Action Executors. <p/> If the action
495 * has completed, the {@link Context#setExecutionData} method must be called within this method. <p/> If the action
496 * has not completed, the {@link Context#setExternalStatus} method must be called within this method.
497 *
498 * @param context executor context.
499 * @param action the action to end.
500 * @throws ActionExecutorException thrown if the action could not be checked.
501 */
502 public abstract void check(Context context, WorkflowAction action) throws ActionExecutorException;
503
504 /**
505 * Kill an action. <p/> The {@link Context#setEndData} method must be called within this method.
506 *
507 * @param context executor context.
508 * @param action the action to kill.
509 * @throws ActionExecutorException thrown if the action could not be killed.
510 */
511 public abstract void kill(Context context, WorkflowAction action) throws ActionExecutorException;
512
513 /**
514 * Return if the external status indicates that the action has completed.
515 *
516 * @param externalStatus external status to check.
517 * @return if the external status indicates that the action has completed.
518 */
519 public abstract boolean isCompleted(String externalStatus);
520
521 }