001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 * 
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 * 
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.action;
019
020import org.apache.hadoop.fs.FileSystem;
021import org.apache.hadoop.fs.Path;
022import org.apache.hadoop.conf.Configuration;
023import org.apache.oozie.client.WorkflowAction;
024import org.apache.oozie.client.WorkflowJob;
025import org.apache.oozie.util.ELEvaluator;
026import org.apache.oozie.util.ParamChecker;
027import org.apache.oozie.util.XLog;
028import org.apache.oozie.service.HadoopAccessorException;
029import org.apache.oozie.service.Services;
030
031import java.io.ByteArrayOutputStream;
032import java.io.IOException;
033import java.io.PrintStream;
034import java.net.URISyntaxException;
035import java.util.HashMap;
036import java.util.Map;
037import java.util.Properties;
038import java.util.LinkedHashMap;
039
040/**
041 * Base action executor class. <p/> All the action executors should extend this class.
042 */
043public abstract class ActionExecutor {
044
045    /**
046     * Configuration prefix for action executor (sub-classes) properties.
047     */
048        public static final String CONF_PREFIX = "oozie.action.";
049
050        public static final String MAX_RETRIES = CONF_PREFIX + "retries.max";
051
052    /**
053     * Error code used by {@link #convertException} when there is not register error information for an exception.
054     */
055    public static final String ERROR_OTHER = "OTHER";
056    
057    public boolean requiresNNJT = false;
058
059    private static class ErrorInfo {
060        ActionExecutorException.ErrorType errorType;
061        String errorCode;
062        Class<?> errorClass;
063
064        private ErrorInfo(ActionExecutorException.ErrorType errorType, String errorCode, Class<?> errorClass) {
065            this.errorType = errorType;
066            this.errorCode = errorCode;
067            this.errorClass = errorClass;
068        }
069    }
070
071    private static boolean initMode = false;
072    private static Map<String, Map<String, ErrorInfo>> ERROR_INFOS = new HashMap<String, Map<String, ErrorInfo>>();
073
074    /**
075     * Context information passed to the ActionExecutor methods.
076     */
077    public interface Context {
078
079        /**
080         * Create the callback URL for the action.
081         *
082         * @param externalStatusVar variable for the caller to inject the external status.
083         * @return the callback URL.
084         */
085        public String getCallbackUrl(String externalStatusVar);
086
087        /**
088         * Return a proto configuration for actions with auth properties already set.
089         *
090         * @return a proto configuration for actions with auth properties already set.
091         */
092        public Configuration getProtoActionConf();
093
094        /**
095         * Return the workflow job.
096         *
097         * @return the workflow job.
098         */
099        public WorkflowJob getWorkflow();
100
101        /**
102         * Return an ELEvaluator with the context injected.
103         *
104         * @return configured ELEvaluator.
105         */
106        public ELEvaluator getELEvaluator();
107
108        /**
109         * Set a workflow action variable. <p/> Convenience method that prefixes the variable name with the action name
110         * plus a '.'.
111         *
112         * @param name variable name.
113         * @param value variable value, <code>null</code> removes the variable.
114         */
115        public void setVar(String name, String value);
116
117        /**
118         * Get a workflow action variable. <p/> Convenience method that prefixes the variable name with the action name
119         * plus a '.'.
120         *
121         * @param name variable name.
122         * @return the variable value, <code>null</code> if not set.
123         */
124        public String getVar(String name);
125
126        /**
127         * Set the action tracking information for an successfully started action.
128         *
129         * @param externalId the action external ID.
130         * @param trackerUri the action tracker URI.
131         * @param consoleUrl the action console URL.
132         */
133        void setStartData(String externalId, String trackerUri, String consoleUrl);
134
135        /**
136         * Set the action execution completion information for an action. The action status is set to {@link
137         * org.apache.oozie.client.WorkflowAction.Status#DONE}
138         *
139         * @param externalStatus the action external end status.
140         * @param actionData the action data on completion, <code>null</code> if none.
141         */
142        void setExecutionData(String externalStatus, Properties actionData);
143
144        /**
145         * Set execution statistics information for a particular action. The action status is set to {@link
146         * org.apache.oozie.client.WorkflowAction.Status#DONE}
147         *
148         * @param jsonStats the JSON string representation of the stats.
149         */
150        void setExecutionStats(String jsonStats);
151
152        /**
153         * Set external child IDs for a particular action (Eg: pig). The action status is set to {@link
154         * org.apache.oozie.client.WorkflowAction.Status#DONE}
155         *
156         * @param externalChildIDs the external child IDs as a comma-delimited string.
157         */
158        void setExternalChildIDs(String externalChildIDs);
159
160        /**
161         * Set the action end completion information for a completed action.
162         *
163         * @param status the action end status, it can be {@link org.apache.oozie.client.WorkflowAction.Status#OK} or
164         * {@link org.apache.oozie.client.WorkflowAction.Status#ERROR}.
165         * @param signalValue the action external end status.
166         */
167        void setEndData(WorkflowAction.Status status, String signalValue);
168
169        /**
170         * Return if the executor invocation is a retry or not.
171         *
172         * @return if the executor invocation is a retry or not.
173         */
174        boolean isRetry();
175
176        /**
177         * Sets the external status for the action in context.
178         *
179         * @param externalStatus the external status.
180         */
181        void setExternalStatus(String externalStatus);
182
183        /**
184         * Get the Action Recovery ID.
185         *
186         * @return recovery ID.
187         */
188        String getRecoveryId();
189
190        /*
191         * @return the path that will be used to store action specific data
192         * @throws IOException @throws URISyntaxException @throws HadoopAccessorException
193         */
194        public Path getActionDir() throws HadoopAccessorException, IOException, URISyntaxException;
195
196        /**
197         * @return filesystem handle for the application deployment fs.
198         * @throws IOException
199         * @throws URISyntaxException
200         * @throws HadoopAccessorException
201         */
202        public FileSystem getAppFileSystem() throws HadoopAccessorException, IOException, URISyntaxException;
203
204        public void setErrorInfo(String str, String exMsg);
205    }
206
207
208    /**
209     * Define the default inteval in seconds between retries.
210     */
211    public static final long RETRY_INTERVAL = 60;
212
213    private String type;
214    private int maxRetries;
215    private long retryInterval;
216
217    /**
218     * Create an action executor with default retry parameters.
219     *
220     * @param type action executor type.
221     */
222    protected ActionExecutor(String type) {
223        this(type, RETRY_INTERVAL);
224    }
225
226    /**
227     * Create an action executor.
228     *
229     * @param type action executor type.
230     * @param retryAttempts retry attempts.
231     * @param retryInterval retry interval, in seconds.
232     */
233    protected ActionExecutor(String type, long retryInterval) {
234        this.type = ParamChecker.notEmpty(type, "type");
235        this.maxRetries = getOozieConf().getInt(MAX_RETRIES, 3);
236        this.retryInterval = retryInterval;
237    }
238
239    /**
240     * Clear all init settings for all action types.
241     */
242    public static void resetInitInfo() {
243        if (!initMode) {
244            throw new IllegalStateException("Error, action type info locked");
245        }
246        ERROR_INFOS.clear();
247    }
248
249    /**
250     * Enable action type initialization.
251     */
252    public static void enableInit() {
253        initMode = true;
254    }
255
256    /**
257     * Disable action type initialization.
258     */
259    public static void disableInit() {
260        initMode = false;
261    }
262
263    /**
264     * Invoked once at system initialization time. <p/> It can be used to register error information for the expected
265     * exceptions. Exceptions should be register from subclasses to superclasses to ensure proper detection, same thing
266     * that it is done in a normal catch. <p/> This method should invoke the {@link #registerError} method to register
267     * all its possible errors. <p/> Subclasses overriding must invoke super.
268     */
269    public void initActionType() {
270        XLog.getLog(getClass()).trace(" Init Action Type : [{0}]", getType());
271        ERROR_INFOS.put(getType(), new LinkedHashMap<String, ErrorInfo>());
272    }
273
274    /**
275     * Return the system ID, this ID is defined in Oozie configuration.
276     *
277     * @return the system ID.
278     */
279    public String getOozieSystemId() {
280        return Services.get().getSystemId();
281    }
282
283    /**
284     * Return the runtime directory of the Oozie instance. <p/> The directory is created under TMP and it is always a
285     * new directory per system initialization.
286     *
287     * @return the runtime directory of the Oozie instance.
288     */
289    public String getOozieRuntimeDir() {
290        return Services.get().getRuntimeDir();
291    }
292
293    /**
294     * Return Oozie configuration. <p/> This is useful for actions that need access to configuration properties.
295     *
296     * @return Oozie configuration.
297     */
298    public Configuration getOozieConf() {
299        return Services.get().getConf();
300    }
301
302    /**
303     * Register error handling information for an exception.
304     *
305     * @param exClass excpetion class name (to work in case of a particular exception not being in the classpath, needed
306     * to be able to handle multiple version of Hadoop  or other JARs used by executors with the same codebase).
307     * @param errorType error type for the exception.
308     * @param errorCode error code for the exception.
309     */
310    protected void registerError(String exClass, ActionExecutorException.ErrorType errorType, String errorCode) {
311        if (!initMode) {
312            throw new IllegalStateException("Error, action type info locked");
313        }
314        try {
315            Class errorClass = Thread.currentThread().getContextClassLoader().loadClass(exClass);
316            Map<String, ErrorInfo> executorErrorInfo = ERROR_INFOS.get(getType());
317            executorErrorInfo.put(exClass, new ErrorInfo(errorType, errorCode, errorClass));
318        }
319        catch (ClassNotFoundException cnfe) {
320            XLog.getLog(getClass()).warn(
321                    "Exception [{0}] not in classpath, ActionExecutor [{1}] will handle it as ERROR", exClass,
322                    getType());
323        }
324        catch (java.lang.NoClassDefFoundError err) {
325            ByteArrayOutputStream baos = new ByteArrayOutputStream();
326            err.printStackTrace(new PrintStream(baos));
327            XLog.getLog(getClass()).warn(baos.toString());
328        }
329    }
330
331    /**
332     * Return the action executor type.
333     *
334     * @return the action executor type.
335     */
336    public String getType() {
337        return type;
338    }
339
340    /**
341     * Return the maximum number of retries for the action executor.
342     *
343     * @return the maximum number of retries for the action executor.
344     */
345    public int getMaxRetries() {
346        return maxRetries;
347    }
348
349    /**
350     * Set the maximum number of retries for the action executor.
351     *
352     * @param maxRetries the maximum number of retries.
353     */
354    public void setMaxRetries(int maxRetries) {
355        this.maxRetries = maxRetries;
356    }
357
358    /**
359     * Return the retry interval for the action executor in seconds.
360     *
361     * @return the retry interval for the action executor in seconds.
362     */
363    public long getRetryInterval() {
364        return retryInterval;
365    }
366
367    /**
368     * Sets the retry interval for the action executor.
369     *
370     * @param retryInterval retry interval in seconds.
371     */
372    public void setRetryInterval(long retryInterval) {
373        this.retryInterval = retryInterval;
374    }
375
376    /**
377     * Utility method to handle exceptions in the {@link #start}, {@link #end}, {@link #kill} and {@link #check} methods
378     * <p/> It uses the error registry to convert exceptions to {@link ActionExecutorException}s.
379     *
380     * @param ex exception to convert.
381     * @return ActionExecutorException converted exception.
382     */
383    @SuppressWarnings({"ThrowableInstanceNeverThrown"})
384    protected ActionExecutorException convertException(Exception ex) {
385        if (ex instanceof ActionExecutorException) {
386            return (ActionExecutorException) ex;
387        }
388
389        ActionExecutorException aee = null;
390        // Check the cause of the exception first
391        if (ex.getCause() != null) {
392            aee = convertExceptionHelper(ex.getCause());
393        }
394        // If the cause isn't registered or doesn't exist, check the exception itself
395        if (aee == null) {
396            aee = convertExceptionHelper(ex);
397            // If the cause isn't registered either, then just create a new ActionExecutorException
398            if (aee == null) {
399                String exClass = ex.getClass().getName();
400                String errorCode = exClass.substring(exClass.lastIndexOf(".") + 1);
401                aee = new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, errorCode, "{0}", ex.getMessage(), ex);
402            }
403        }
404        return aee;
405    }
406
407    private ActionExecutorException convertExceptionHelper(Throwable ex) {
408        Map<String, ErrorInfo> executorErrorInfo = ERROR_INFOS.get(getType());
409        // Check if we have registered ex
410        ErrorInfo classErrorInfo = executorErrorInfo.get(ex.getClass().getName());
411        if (classErrorInfo != null) {
412            return new ActionExecutorException(classErrorInfo.errorType, classErrorInfo.errorCode, "{0}", ex.getMessage(), ex);
413        }
414        // Else, check if a parent class of ex is registered
415        else {
416            for (ErrorInfo errorInfo : executorErrorInfo.values()) {
417                if (errorInfo.errorClass.isInstance(ex)) {
418                    return new ActionExecutorException(errorInfo.errorType, errorInfo.errorCode, "{0}", ex.getMessage(), ex);
419                }
420            }
421        }
422        return null;
423    }
424
425    /**
426     * Convenience method that return the signal for an Action based on the action status.
427     *
428     * @param status action status.
429     * @return the action signal.
430     */
431    protected String getActionSignal(WorkflowAction.Status status) {
432        switch (status) {
433            case OK:
434                return "OK";
435            case ERROR:
436            case KILLED:
437                return "ERROR";
438            default:
439                throw new IllegalArgumentException("Action status for signal can only be OK or ERROR");
440        }
441    }
442
443    /**
444     * Return the path that will be used to store action specific data
445     *
446     * @param jobId Worfklow ID
447     * @param action Action
448     * @param key An Identifier
449     * @param temp temp directory flag
450     * @return A string that has the path
451     */
452    protected String getActionDirPath(String jobId, WorkflowAction action, String key, boolean temp) {
453        String name = jobId + "/" + action.getName() + "--" + key;
454        if (temp) {
455            name += ".temp";
456        }
457        return getOozieSystemId() + "/" + name;
458    }
459
460    /**
461     * Return the path that will be used to store action specific data.
462     *
463     * @param jobId Workflow ID
464     * @param action Action
465     * @param key An identifier
466     * @param temp Temp directory flag
467     * @return Path to the directory
468     */
469    public Path getActionDir(String jobId, WorkflowAction action, String key, boolean temp) {
470        return new Path(getActionDirPath(jobId, action, key, temp));
471    }
472
473    /**
474     * Start an action. <p/> The {@link Context#setStartData} method must be called within this method. <p/> If the
475     * action has completed, the {@link Context#setExecutionData} method must be called within this method.
476     *
477     * @param context executor context.
478     * @param action the action to start.
479     * @throws ActionExecutorException thrown if the action could not start.
480     */
481    public abstract void start(Context context, WorkflowAction action) throws ActionExecutorException;
482
483    /**
484     * End an action after it has executed. <p/> The {@link Context#setEndData} method must be called within this
485     * method.
486     *
487     * @param context executor context.
488     * @param action the action to end.
489     * @throws ActionExecutorException thrown if the action could not end.
490     */
491    public abstract void end(Context context, WorkflowAction action) throws ActionExecutorException;
492
493    /**
494     * Check if an action has completed. This method must be implemented by Async Action Executors. <p/> If the action
495     * has completed, the {@link Context#setExecutionData} method must be called within this method. <p/> If the action
496     * has not completed, the {@link Context#setExternalStatus} method must be called within this method.
497     *
498     * @param context executor context.
499     * @param action the action to end.
500     * @throws ActionExecutorException thrown if the action could not be checked.
501     */
502    public abstract void check(Context context, WorkflowAction action) throws ActionExecutorException;
503
504    /**
505     * Kill an action. <p/> The {@link Context#setEndData} method must be called within this method.
506     *
507     * @param context executor context.
508     * @param action the action to kill.
509     * @throws ActionExecutorException thrown if the action could not be killed.
510     */
511    public abstract void kill(Context context, WorkflowAction action) throws ActionExecutorException;
512
513    /**
514     * Return if the external status indicates that the action has completed.
515     *
516     * @param externalStatus external status to check.
517     * @return if the external status indicates that the action has completed.
518     */
519    public abstract boolean isCompleted(String externalStatus);
520
521}