This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.action;
019
020 import org.apache.hadoop.fs.FileSystem;
021 import org.apache.hadoop.fs.Path;
022 import org.apache.hadoop.conf.Configuration;
023 import org.apache.oozie.client.WorkflowAction;
024 import org.apache.oozie.client.WorkflowJob;
025 import org.apache.oozie.util.ELEvaluator;
026 import org.apache.oozie.util.ParamChecker;
027 import org.apache.oozie.util.XLog;
028 import org.apache.oozie.service.HadoopAccessorException;
029 import org.apache.oozie.service.Services;
030 import org.apache.oozie.servlet.CallbackServlet;
031
032 import java.io.IOException;
033 import java.net.URISyntaxException;
034 import java.util.HashMap;
035 import java.util.Map;
036 import java.util.Properties;
037 import java.util.LinkedHashMap;
038
039 /**
040 * Base action executor class. <p/> All the action executors should extend this class.
041 */
042 public abstract class ActionExecutor {
043
044 /**
045 * Configuration prefix for action executor (sub-classes) properties.
046 */
047 public static final String CONF_PREFIX = "oozie.action.";
048
049 public static final String MAX_RETRIES = CONF_PREFIX + "retries.max";
050
051 /**
052 * Error code used by {@link #convertException} when there is not register error information for an exception.
053 */
054 public static final String ERROR_OTHER = "OTHER";
055
056 private static class ErrorInfo {
057 ActionExecutorException.ErrorType errorType;
058 String errorCode;
059
060 private ErrorInfo(ActionExecutorException.ErrorType errorType, String errorCode) {
061 this.errorType = errorType;
062 this.errorCode = errorCode;
063 }
064 }
065
066 private static boolean initMode = false;
067 private static Map<String, Map<Class, ErrorInfo>> ERROR_INFOS = new HashMap<String, Map<Class, ErrorInfo>>();
068
069 /**
070 * Context information passed to the ActionExecutor methods.
071 */
072 public interface Context {
073
074 /**
075 * Create the callback URL for the action.
076 *
077 * @param externalStatusVar variable for the caller to inject the external status.
078 * @return the callback URL.
079 */
080 public String getCallbackUrl(String externalStatusVar);
081
082 /**
083 * Return a proto configuration for actions with auth properties already set.
084 *
085 * @return a proto configuration for actions with auth properties already set.
086 */
087 public Configuration getProtoActionConf();
088
089 /**
090 * Return the workflow job.
091 *
092 * @return the workflow job.
093 */
094 public WorkflowJob getWorkflow();
095
096 /**
097 * Return an ELEvaluator with the context injected.
098 *
099 * @return configured ELEvaluator.
100 */
101 public ELEvaluator getELEvaluator();
102
103 /**
104 * Set a workflow action variable. <p/> Convenience method that prefixes the variable name with the action name
105 * plus a '.'.
106 *
107 * @param name variable name.
108 * @param value variable value, <code>null</code> removes the variable.
109 */
110 public void setVar(String name, String value);
111
112 /**
113 * Get a workflow action variable. <p/> Convenience method that prefixes the variable name with the action name
114 * plus a '.'.
115 *
116 * @param name variable name.
117 * @return the variable value, <code>null</code> if not set.
118 */
119 public String getVar(String name);
120
121 /**
122 * Set the action tracking information for an successfully started action.
123 *
124 * @param externalId the action external ID.
125 * @param trackerUri the action tracker URI.
126 * @param consoleUrl the action console URL.
127 */
128 void setStartData(String externalId, String trackerUri, String consoleUrl);
129
130 /**
131 * Set the action execution completion information for an action. The action status is set to {@link
132 * org.apache.oozie.client.WorkflowAction.Status#DONE}
133 *
134 * @param externalStatus the action external end status.
135 * @param actionData the action data on completion, <code>null</code> if none.
136 */
137 void setExecutionData(String externalStatus, Properties actionData);
138
139 /**
140 * Set execution statistics information for a particular action. The action status is set to {@link
141 * org.apache.oozie.client.WorkflowAction.Status#DONE}
142 *
143 * @param jsonStats the JSON string representation of the stats.
144 */
145 void setExecutionStats(String jsonStats);
146
147 /**
148 * Set external child IDs for a particular action (Eg: pig). The action status is set to {@link
149 * org.apache.oozie.client.WorkflowAction.Status#DONE}
150 *
151 * @param externalChildIDs the external child IDs as a comma-delimited string.
152 */
153 void setExternalChildIDs(String externalChildIDs);
154
155 /**
156 * Set the action end completion information for a completed action.
157 *
158 * @param status the action end status, it can be {@link org.apache.oozie.client.WorkflowAction.Status#OK} or
159 * {@link org.apache.oozie.client.WorkflowAction.Status#ERROR}.
160 * @param signalValue the action external end status.
161 */
162 void setEndData(WorkflowAction.Status status, String signalValue);
163
164 /**
165 * Return if the executor invocation is a retry or not.
166 *
167 * @return if the executor invocation is a retry or not.
168 */
169 boolean isRetry();
170
171 /**
172 * Sets the external status for the action in context.
173 *
174 * @param externalStatus the external status.
175 */
176 void setExternalStatus(String externalStatus);
177
178 /**
179 * Get the Action Recovery ID.
180 *
181 * @return recovery ID.
182 */
183 String getRecoveryId();
184
185 /*
186 * @return the path that will be used to store action specific data
187 * @throws IOException @throws URISyntaxException @throws HadoopAccessorException
188 */
189 public Path getActionDir() throws HadoopAccessorException, IOException, URISyntaxException;
190
191 /**
192 * @return filesystem handle for the application deployment fs.
193 * @throws IOException
194 * @throws URISyntaxException
195 * @throws HadoopAccessorException
196 */
197 public FileSystem getAppFileSystem() throws HadoopAccessorException, IOException, URISyntaxException;
198
199 public void setErrorInfo(String str, String exMsg);
200 }
201
202
203 /**
204 * Define the default inteval in seconds between retries.
205 */
206 public static final long RETRY_INTERVAL = 60;
207
208 private String type;
209 private int maxRetries;
210 private long retryInterval;
211
212 /**
213 * Create an action executor with default retry parameters.
214 *
215 * @param type action executor type.
216 */
217 protected ActionExecutor(String type) {
218 this(type, RETRY_INTERVAL);
219 }
220
221 /**
222 * Create an action executor.
223 *
224 * @param type action executor type.
225 * @param retryAttempts retry attempts.
226 * @param retryInterval retry interval, in seconds.
227 */
228 protected ActionExecutor(String type, long retryInterval) {
229 this.type = ParamChecker.notEmpty(type, "type");
230 this.maxRetries = getOozieConf().getInt(MAX_RETRIES, 3);
231 this.retryInterval = retryInterval;
232 }
233
234 /**
235 * Clear all init settings for all action types.
236 */
237 public static void resetInitInfo() {
238 if (!initMode) {
239 throw new IllegalStateException("Error, action type info locked");
240 }
241 ERROR_INFOS.clear();
242 }
243
244 /**
245 * Enable action type initialization.
246 */
247 public static void enableInit() {
248 initMode = true;
249 }
250
251 /**
252 * Disable action type initialization.
253 */
254 public static void disableInit() {
255 initMode = false;
256 }
257
258 /**
259 * Invoked once at system initialization time. <p/> It can be used to register error information for the expected
260 * exceptions. Exceptions should be register from subclasses to superclasses to ensure proper detection, same thing
261 * that it is done in a normal catch. <p/> This method should invoke the {@link #registerError} method to register
262 * all its possible errors. <p/> Subclasses overriding must invoke super.
263 */
264 public void initActionType() {
265 ERROR_INFOS.put(getType(), new LinkedHashMap<Class, ErrorInfo>());
266 }
267
268 /**
269 * Return the system ID, this ID is defined in Oozie configuration.
270 *
271 * @return the system ID.
272 */
273 public String getOozieSystemId() {
274 return Services.get().getSystemId();
275 }
276
277 /**
278 * Return the runtime directory of the Oozie instance. <p/> The directory is created under TMP and it is always a
279 * new directory per system initialization.
280 *
281 * @return the runtime directory of the Oozie instance.
282 */
283 public String getOozieRuntimeDir() {
284 return Services.get().getRuntimeDir();
285 }
286
287 /**
288 * Return Oozie configuration. <p/> This is useful for actions that need access to configuration properties.
289 *
290 * @return Oozie configuration.
291 */
292 public Configuration getOozieConf() {
293 return Services.get().getConf();
294 }
295
296 /**
297 * Register error handling information for an exception.
298 *
299 * @param exClass excpetion class name (to work in case of a particular exception not being in the classpath, needed
300 * to be able to handle multiple version of Hadoop or other JARs used by executors with the same codebase).
301 * @param errorType error type for the exception.
302 * @param errorCode error code for the exception.
303 */
304 protected void registerError(String exClass, ActionExecutorException.ErrorType errorType, String errorCode) {
305 if (!initMode) {
306 throw new IllegalStateException("Error, action type info locked");
307 }
308 try {
309 Class klass = Thread.currentThread().getContextClassLoader().loadClass(exClass);
310 Map<Class, ErrorInfo> executorErrorInfo = ERROR_INFOS.get(getType());
311 executorErrorInfo.put(klass, new ErrorInfo(errorType, errorCode));
312 }
313 catch (ClassNotFoundException ex) {
314 XLog.getLog(getClass()).warn(
315 "Exception [{0}] no in classpath, ActionExecutor [{1}] will handled it as ERROR", exClass,
316 getType());
317 }
318 }
319
320 /**
321 * Return the action executor type.
322 *
323 * @return the action executor type.
324 */
325 public String getType() {
326 return type;
327 }
328
329 /**
330 * Return the maximum number of retries for the action executor.
331 *
332 * @return the maximum number of retries for the action executor.
333 */
334 public int getMaxRetries() {
335 return maxRetries;
336 }
337
338 /**
339 * Set the maximum number of retries for the action executor.
340 *
341 * @param maxRetries the maximum number of retries.
342 */
343 public void setMaxRetries(int maxRetries) {
344 this.maxRetries = maxRetries;
345 }
346
347 /**
348 * Return the retry interval for the action executor in seconds.
349 *
350 * @return the retry interval for the action executor in seconds.
351 */
352 public long getRetryInterval() {
353 return retryInterval;
354 }
355
356 /**
357 * Sets the retry interval for the action executor.
358 *
359 * @param retryInterval retry interval in seconds.
360 */
361 public void setRetryInterval(long retryInterval) {
362 this.retryInterval = retryInterval;
363 }
364
365 /**
366 * Utility method to handle exceptions in the {@link #start}, {@link #end}, {@link #kill} and {@link #check} methods
367 * <p/> It uses the error registry to convert exceptions to {@link ActionExecutorException}s.
368 *
369 * @param ex exception to convert.
370 * @return ActionExecutorException converted exception.
371 */
372 @SuppressWarnings({"ThrowableInstanceNeverThrown"})
373 protected ActionExecutorException convertException(Exception ex) {
374 if (ex instanceof ActionExecutorException) {
375 return (ActionExecutorException) ex;
376 }
377 for (Map.Entry<Class, ErrorInfo> errorInfo : ERROR_INFOS.get(getType()).entrySet()) {
378 if (errorInfo.getKey().isInstance(ex)) {
379 return new ActionExecutorException(errorInfo.getValue().errorType, errorInfo.getValue().errorCode,
380 "{0}", ex.getMessage(), ex);
381 }
382 }
383 String errorCode = ex.getClass().getName();
384 errorCode = errorCode.substring(errorCode.lastIndexOf(".") + 1);
385 return new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, errorCode, "{0}", ex.getMessage(),
386 ex);
387 }
388
389 /**
390 * Convenience method that return the signal for an Action based on the action status.
391 *
392 * @param status action status.
393 * @return the action signal.
394 */
395 protected String getActionSignal(WorkflowAction.Status status) {
396 switch (status) {
397 case OK:
398 return "OK";
399 case ERROR:
400 case KILLED:
401 return "ERROR";
402 default:
403 throw new IllegalArgumentException("Action status for signal can only be OK or ERROR");
404 }
405 }
406
407 /**
408 * Return the path that will be used to store action specific data
409 *
410 * @param jobId Worfklow ID
411 * @param action Action
412 * @param key An Identifier
413 * @param temp temp directory flag
414 * @return A string that has the path
415 */
416 protected String getActionDirPath(String jobId, WorkflowAction action, String key, boolean temp) {
417 String name = jobId + "/" + action.getName() + "--" + key;
418 if (temp) {
419 name += ".temp";
420 }
421 return getOozieSystemId() + "/" + name;
422 }
423
424 /**
425 * Return the path that will be used to store action specific data.
426 *
427 * @param jobId Workflow ID
428 * @param action Action
429 * @param key An identifier
430 * @param temp Temp directory flag
431 * @return Path to the directory
432 */
433 public Path getActionDir(String jobId, WorkflowAction action, String key, boolean temp) {
434 return new Path(getActionDirPath(jobId, action, key, temp));
435 }
436
437 /**
438 * Start an action. <p/> The {@link Context#setStartData} method must be called within this method. <p/> If the
439 * action has completed, the {@link Context#setExecutionData} method must be called within this method.
440 *
441 * @param context executor context.
442 * @param action the action to start.
443 * @throws ActionExecutorException thrown if the action could not start.
444 */
445 public abstract void start(Context context, WorkflowAction action) throws ActionExecutorException;
446
447 /**
448 * End an action after it has executed. <p/> The {@link Context#setEndData} method must be called within this
449 * method.
450 *
451 * @param context executor context.
452 * @param action the action to end.
453 * @throws ActionExecutorException thrown if the action could not end.
454 */
455 public abstract void end(Context context, WorkflowAction action) throws ActionExecutorException;
456
457 /**
458 * Check if an action has completed. This method must be implemented by Async Action Executors. <p/> If the action
459 * has completed, the {@link Context#setExecutionData} method must be called within this method. <p/> If the action
460 * has not completed, the {@link Context#setExternalStatus} method must be called within this method.
461 *
462 * @param context executor context.
463 * @param action the action to end.
464 * @throws ActionExecutorException thrown if the action could not be checked.
465 */
466 public abstract void check(Context context, WorkflowAction action) throws ActionExecutorException;
467
468 /**
469 * Kill an action. <p/> The {@link Context#setEndData} method must be called within this method.
470 *
471 * @param context executor context.
472 * @param action the action to kill.
473 * @throws ActionExecutorException thrown if the action could not be killed.
474 */
475 public abstract void kill(Context context, WorkflowAction action) throws ActionExecutorException;
476
477 /**
478 * Return if the external status indicates that the action has completed.
479 *
480 * @param externalStatus external status to check.
481 * @return if the external status indicates that the action has completed.
482 */
483 public abstract boolean isCompleted(String externalStatus);
484
485 }