001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.action;
020
021import org.apache.commons.lang.StringUtils;
022import org.apache.hadoop.fs.FileSystem;
023import org.apache.hadoop.fs.Path;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.oozie.client.WorkflowAction;
026import org.apache.oozie.client.WorkflowJob;
027import org.apache.oozie.service.ConfigurationService;
028import org.apache.oozie.util.ELEvaluator;
029import org.apache.oozie.util.ParamChecker;
030import org.apache.oozie.util.XLog;
031import org.apache.oozie.service.HadoopAccessorException;
032import org.apache.oozie.service.Services;
033
034import java.io.ByteArrayOutputStream;
035import java.io.IOException;
036import java.io.PrintStream;
037import java.net.URISyntaxException;
038import java.util.HashMap;
039import java.util.Map;
040import java.util.Properties;
041import java.util.LinkedHashMap;
042
043/**
044 * Base action executor class. <p> All the action executors should extend this class.
045 */
046public abstract class ActionExecutor {
047
048    /**
049     * Configuration prefix for action executor (sub-classes) properties.
050     */
051        public static final String CONF_PREFIX = "oozie.action.";
052
053    public static final String MAX_RETRIES = CONF_PREFIX + "retries.max";
054
055    public static final String ACTION_RETRY_INTERVAL = CONF_PREFIX + "retry.interval";
056
057    public static final String ACTION_RETRY_POLICY = CONF_PREFIX + "retry.policy";
058
059    public static final String OOZIE_ACTION_YARN_TAG = "oozie.action.yarn.tag";
060
061
062    /**
063     * Error code used by {@link #convertException} when there is not register error information for an exception.
064     */
065    public static final String ERROR_OTHER = "OTHER";
066
067    public static enum RETRYPOLICY {
068        EXPONENTIAL, PERIODIC
069    }
070
071    private static class ErrorInfo {
072        ActionExecutorException.ErrorType errorType;
073        String errorCode;
074        Class<?> errorClass;
075
076        private ErrorInfo(ActionExecutorException.ErrorType errorType, String errorCode, Class<?> errorClass) {
077            this.errorType = errorType;
078            this.errorCode = errorCode;
079            this.errorClass = errorClass;
080        }
081    }
082
083    private static boolean initMode = false;
084    private static Map<String, Map<String, ErrorInfo>> ERROR_INFOS = new HashMap<String, Map<String, ErrorInfo>>();
085
086    /**
087     * Context information passed to the ActionExecutor methods.
088     */
089    public interface Context {
090
091        /**
092         * Create the callback URL for the action.
093         *
094         * @param externalStatusVar variable for the caller to inject the external status.
095         * @return the callback URL.
096         */
097        String getCallbackUrl(String externalStatusVar);
098
099        /**
100         * Return a proto configuration for actions with auth properties already set.
101         *
102         * @return a proto configuration for actions with auth properties already set.
103         */
104        Configuration getProtoActionConf();
105
106        /**
107         * Return the workflow job.
108         *
109         * @return the workflow job.
110         */
111        WorkflowJob getWorkflow();
112
113        /**
114         * Return an ELEvaluator with the context injected.
115         *
116         * @return configured ELEvaluator.
117         */
118        ELEvaluator getELEvaluator();
119
120        /**
121         * Set a workflow action variable. <p> Convenience method that prefixes the variable name with the action name
122         * plus a '.'.
123         *
124         * @param name variable name.
125         * @param value variable value, <code>null</code> removes the variable.
126         */
127        void setVar(String name, String value);
128
129        /**
130         * Get a workflow action variable. <p> Convenience method that prefixes the variable name with the action name
131         * plus a '.'.
132         *
133         * @param name variable name.
134         * @return the variable value, <code>null</code> if not set.
135         */
136        String getVar(String name);
137
138        /**
139         * Set the action tracking information for an successfully started action.
140         *
141         * @param externalId the action external ID.
142         * @param trackerUri the action tracker URI.
143         * @param consoleUrl the action console URL.
144         */
145        void setStartData(String externalId, String trackerUri, String consoleUrl);
146
147        /**
148         * Set the action execution completion information for an action. The action status is set to {@link
149         * org.apache.oozie.client.WorkflowAction.Status#DONE}
150         *
151         * @param externalStatus the action external end status.
152         * @param actionData the action data on completion, <code>null</code> if none.
153         */
154        void setExecutionData(String externalStatus, Properties actionData);
155
156        /**
157         * Set execution statistics information for a particular action. The action status is set to {@link
158         * org.apache.oozie.client.WorkflowAction.Status#DONE}
159         *
160         * @param jsonStats the JSON string representation of the stats.
161         */
162        void setExecutionStats(String jsonStats);
163
164        /**
165         * Set external child IDs for a particular action (Eg: pig). The action status is set to {@link
166         * org.apache.oozie.client.WorkflowAction.Status#DONE}
167         *
168         * @param externalChildIDs the external child IDs as a comma-delimited string.
169         */
170        void setExternalChildIDs(String externalChildIDs);
171
172        /**
173         * Set the action end completion information for a completed action.
174         *
175         * @param status the action end status, it can be {@link org.apache.oozie.client.WorkflowAction.Status#OK} or
176         * {@link org.apache.oozie.client.WorkflowAction.Status#ERROR}.
177         * @param signalValue the action external end status.
178         */
179        void setEndData(WorkflowAction.Status status, String signalValue);
180
181        /**
182         * Return if the executor invocation is a retry or not.
183         *
184         * @return if the executor invocation is a retry or not.
185         */
186        boolean isRetry();
187
188        /**
189         * Sets the external status for the action in context.
190         *
191         * @param externalStatus the external status.
192         */
193        void setExternalStatus(String externalStatus);
194
195        /**
196         * Get the Action Recovery ID.
197         *
198         * @return recovery ID.
199         */
200        String getRecoveryId();
201
202        /*
203         * @return the path that will be used to store action specific data
204         * @throws IOException @throws URISyntaxException @throws HadoopAccessorException
205         */
206        Path getActionDir() throws HadoopAccessorException, IOException, URISyntaxException;
207
208        /**
209         * @return filesystem handle for the application deployment fs.
210         * @throws IOException
211         * @throws URISyntaxException
212         * @throws HadoopAccessorException
213         */
214        FileSystem getAppFileSystem() throws HadoopAccessorException, IOException, URISyntaxException;
215
216        void setErrorInfo(String str, String exMsg);
217    }
218
219
220    /**
221     * Define the default inteval in seconds between retries.
222     */
223    public static final long RETRY_INTERVAL = 60;
224
225    private String type;
226    private int maxRetries;
227    private long retryInterval;
228    private RETRYPOLICY retryPolicy;
229
230    /**
231     * Create an action executor with default retry parameters.
232     *
233     * @param type action executor type.
234     */
235    protected ActionExecutor(String type) {
236        this(type, RETRY_INTERVAL);
237    }
238
239    /**
240     * Create an action executor.
241     *
242     * @param type action executor type.
243     * @param defaultRetryInterval retry interval, in seconds.
244     */
245    protected ActionExecutor(String type, long defaultRetryInterval) {
246        this.type = ParamChecker.notEmpty(type, "type");
247        this.maxRetries = ConfigurationService.getInt(MAX_RETRIES);
248        int retryInterval = ConfigurationService.getInt(ACTION_RETRY_INTERVAL);
249        this.retryInterval = retryInterval > 0 ? retryInterval : defaultRetryInterval;
250        this.retryPolicy = getRetryPolicyFromConf();
251    }
252
253    private RETRYPOLICY getRetryPolicyFromConf() {
254        String retryPolicy = ConfigurationService.get(ACTION_RETRY_POLICY);
255        if (StringUtils.isBlank(retryPolicy)) {
256            return RETRYPOLICY.PERIODIC;
257        } else {
258            try {
259                return RETRYPOLICY.valueOf(retryPolicy.toUpperCase().trim());
260            } catch (IllegalArgumentException e) {
261                return RETRYPOLICY.PERIODIC;
262            }
263        }
264    }
265
266    /**
267     * Clear all init settings for all action types.
268     */
269    public static void resetInitInfo() {
270        if (!initMode) {
271            throw new IllegalStateException("Error, action type info locked");
272        }
273        ERROR_INFOS.clear();
274    }
275
276    /**
277     * Enable action type initialization.
278     */
279    public static void enableInit() {
280        initMode = true;
281    }
282
283    /**
284     * Disable action type initialization.
285     */
286    public static void disableInit() {
287        initMode = false;
288    }
289
290    /**
291     * Invoked once at system initialization time. <p> It can be used to register error information for the expected
292     * exceptions. Exceptions should be register from subclasses to superclasses to ensure proper detection, same thing
293     * that it is done in a normal catch. <p> This method should invoke the {@link #registerError} method to register
294     * all its possible errors. <p> Subclasses overriding must invoke super.
295     */
296    public void initActionType() {
297        XLog.getLog(getClass()).trace(" Init Action Type : [{0}]", getType());
298        ERROR_INFOS.put(getType(), new LinkedHashMap<String, ErrorInfo>());
299    }
300
301    /**
302     * Return the system ID, this ID is defined in Oozie configuration.
303     *
304     * @return the system ID.
305     */
306    public String getOozieSystemId() {
307        return Services.get().getSystemId();
308    }
309
310    /**
311     * Return the runtime directory of the Oozie instance. <p> The directory is created under TMP and it is always a
312     * new directory per system initialization.
313     *
314     * @return the runtime directory of the Oozie instance.
315     */
316    public String getOozieRuntimeDir() {
317        return Services.get().getRuntimeDir();
318    }
319
320    /**
321     * Return Oozie configuration. <p> This is useful for actions that need access to configuration properties.
322     *
323     * @return Oozie configuration.
324     */
325    public Configuration getOozieConf() {
326        return Services.get().getConf();
327    }
328
329    /**
330     * Register error handling information for an exception.
331     *
332     * @param exClass exception class name (to work in case of a particular exception not being in the classpath, needed
333     * to be able to handle multiple version of Hadoop  or other JARs used by executors with the same codebase).
334     * @param errorType error type for the exception.
335     * @param errorCode error code for the exception.
336     */
337    protected void registerError(String exClass, ActionExecutorException.ErrorType errorType, String errorCode) {
338        if (!initMode) {
339            throw new IllegalStateException("Error, action type info locked");
340        }
341        try {
342            Class errorClass = Thread.currentThread().getContextClassLoader().loadClass(exClass);
343            Map<String, ErrorInfo> executorErrorInfo = ERROR_INFOS.get(getType());
344            executorErrorInfo.put(exClass, new ErrorInfo(errorType, errorCode, errorClass));
345        }
346        catch (ClassNotFoundException cnfe) {
347            XLog.getLog(getClass()).warn(
348                    "Exception [{0}] not in classpath, ActionExecutor [{1}] will handle it as ERROR", exClass,
349                    getType());
350        }
351        catch (java.lang.NoClassDefFoundError err) {
352            ByteArrayOutputStream baos = new ByteArrayOutputStream();
353            err.printStackTrace(new PrintStream(baos));
354            XLog.getLog(getClass()).warn(baos.toString());
355        }
356    }
357
358    /**
359     * Return the action executor type.
360     *
361     * @return the action executor type.
362     */
363    public String getType() {
364        return type;
365    }
366
367    /**
368     * Return the maximum number of retries for the action executor.
369     *
370     * @return the maximum number of retries for the action executor.
371     */
372    public int getMaxRetries() {
373        return maxRetries;
374    }
375
376    /**
377     * Set the maximum number of retries for the action executor.
378     *
379     * @param maxRetries the maximum number of retries.
380     */
381    public void setMaxRetries(int maxRetries) {
382        this.maxRetries = maxRetries;
383    }
384
385    /**
386     * Return the retry policy for the action executor.
387     *
388     * @return the retry policy for the action executor.
389     */
390    public RETRYPOLICY getRetryPolicy() {
391        return retryPolicy;
392    }
393
394    /**
395     * Sets the retry policy for the action executor.
396     *
397     * @param retryPolicy retry policy for the action executor.
398     */
399    public void setRetryPolicy(RETRYPOLICY retryPolicy) {
400        this.retryPolicy = retryPolicy;
401    }
402
403    /**
404     * Return the retry interval for the action executor in seconds.
405     *
406     * @return the retry interval for the action executor in seconds.
407     */
408    public long getRetryInterval() {
409        return retryInterval;
410    }
411
412    /**
413     * Sets the retry interval for the action executor.
414     *
415     * @param retryInterval retry interval in seconds.
416     */
417    public void setRetryInterval(long retryInterval) {
418        this.retryInterval = retryInterval;
419    }
420
421    /**
422     * Utility method to handle exceptions in the {@link #start}, {@link #end}, {@link #kill} and {@link #check} methods
423     * <p> It uses the error registry to convert exceptions to {@link ActionExecutorException}s.
424     *
425     * @param ex exception to convert.
426     * @return ActionExecutorException converted exception.
427     */
428    @SuppressWarnings({"ThrowableInstanceNeverThrown"})
429    protected ActionExecutorException convertException(Exception ex) {
430        if (ex instanceof ActionExecutorException) {
431            return (ActionExecutorException) ex;
432        }
433
434        ActionExecutorException aee = null;
435        // Check the cause of the exception first
436        if (ex.getCause() != null) {
437            aee = convertExceptionHelper(ex.getCause());
438        }
439        // If the cause isn't registered or doesn't exist, check the exception itself
440        if (aee == null) {
441            aee = convertExceptionHelper(ex);
442            // If the cause isn't registered either, then just create a new ActionExecutorException
443            if (aee == null) {
444                String exClass = ex.getClass().getName();
445                String errorCode = exClass.substring(exClass.lastIndexOf(".") + 1);
446                aee = new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, errorCode, "{0}", ex.getMessage(), ex);
447            }
448        }
449        return aee;
450    }
451
452    private ActionExecutorException convertExceptionHelper(Throwable ex) {
453        Map<String, ErrorInfo> executorErrorInfo = ERROR_INFOS.get(getType());
454        // Check if we have registered ex
455        ErrorInfo classErrorInfo = executorErrorInfo.get(ex.getClass().getName());
456        if (classErrorInfo != null) {
457            return new ActionExecutorException(classErrorInfo.errorType, classErrorInfo.errorCode, "{0}", ex.getMessage(), ex);
458        }
459        // Else, check if a parent class of ex is registered
460        else {
461            for (ErrorInfo errorInfo : executorErrorInfo.values()) {
462                if (errorInfo.errorClass.isInstance(ex)) {
463                    return new ActionExecutorException(errorInfo.errorType, errorInfo.errorCode, "{0}", ex.getMessage(), ex);
464                }
465            }
466        }
467        return null;
468    }
469
470    /**
471     * Convenience method that return the signal for an Action based on the action status.
472     *
473     * @param status action status.
474     * @return the action signal.
475     */
476    protected String getActionSignal(WorkflowAction.Status status) {
477        switch (status) {
478            case OK:
479                return "OK";
480            case ERROR:
481            case KILLED:
482                return "ERROR";
483            default:
484                throw new IllegalArgumentException("Action status for signal can only be OK or ERROR");
485        }
486    }
487
488    /**
489     * Return the path that will be used to store action specific data
490     *
491     * @param jobId Worfklow ID
492     * @param action Action
493     * @param key An Identifier
494     * @param temp temp directory flag
495     * @return A string that has the path
496     */
497    protected String getActionDirPath(String jobId, WorkflowAction action, String key, boolean temp) {
498        String name = jobId + "/" + action.getName() + "--" + key;
499        if (temp) {
500            name += ".temp";
501        }
502        return getOozieSystemId() + "/" + name;
503    }
504
505    /**
506     * Return the path that will be used to store action specific data.
507     *
508     * @param jobId Workflow ID
509     * @param action Action
510     * @param key An identifier
511     * @param temp Temp directory flag
512     * @return Path to the directory
513     */
514    public Path getActionDir(String jobId, WorkflowAction action, String key, boolean temp) {
515        return new Path(getActionDirPath(jobId, action, key, temp));
516    }
517
518    /**
519     * Start an action. <p> The {@link Context#setStartData} method must be called within this method. <p> If the
520     * action has completed, the {@link Context#setExecutionData} method must be called within this method.
521     *
522     * @param context executor context.
523     * @param action the action to start.
524     * @throws ActionExecutorException thrown if the action could not start.
525     */
526    public abstract void start(Context context, WorkflowAction action) throws ActionExecutorException;
527
528    /**
529     * End an action after it has executed. <p> The {@link Context#setEndData} method must be called within this
530     * method.
531     *
532     * @param context executor context.
533     * @param action the action to end.
534     * @throws ActionExecutorException thrown if the action could not end.
535     */
536    public abstract void end(Context context, WorkflowAction action) throws ActionExecutorException;
537
538    /**
539     * Check if an action has completed. This method must be implemented by Async Action Executors. <p> If the action
540     * has completed, the {@link Context#setExecutionData} method must be called within this method. <p> If the action
541     * has not completed, the {@link Context#setExternalStatus} method must be called within this method.
542     *
543     * @param context executor context.
544     * @param action the action to end.
545     * @throws ActionExecutorException thrown if the action could not be checked.
546     */
547    public abstract void check(Context context, WorkflowAction action) throws ActionExecutorException;
548
549    /**
550     * Kill an action. <p> The {@link Context#setEndData} method must be called within this method.
551     *
552     * @param context executor context.
553     * @param action the action to kill.
554     * @throws ActionExecutorException thrown if the action could not be killed.
555     */
556    public abstract void kill(Context context, WorkflowAction action) throws ActionExecutorException;
557
558    /**
559     * Return if the external status indicates that the action has completed.
560     *
561     * @param externalStatus external status to check.
562     * @return if the external status indicates that the action has completed.
563     */
564    public abstract boolean isCompleted(String externalStatus);
565
566    /**
567     * Returns true if this action type requires a NameNode and JobTracker.  These can either be specified directly in the action
568     * via &lt;name-node&gt; and &lt;job-tracker&gt;, from the fields in the global section, or from their default values.  If
569     * false, Oozie won't ensure (i.e. won't throw an Exception if non-existant) that this action type has these values.
570     *
571     * @return true if a NameNode and JobTracker are required; false if not
572     */
573    public boolean requiresNameNodeJobTracker() {
574        return false;
575    }
576
577    /**
578     * Returns true if this action type supports a Configuration and JobXML.  In this case, Oozie will include the
579     * &lt;configuration&gt; and &lt;job-xml&gt; elements from the global section (if provided) with the action.  If false, Oozie
580     * won't add these.
581     *
582     * @return true if the global section's Configuration and JobXML should be given; false if not
583     */
584    public boolean supportsConfigurationJobXML() {
585        return false;
586    }
587
588    /**
589     * Creating and forwarding the tag, It will be useful during repeat attempts of Launcher, to ensure only
590     * one child job is running. Tag is formed as follows:
591     * For workflow job, tag = action-id
592     * For Coord job, tag = coord-action-id@action-name (if not part of sub flow), else
593     * coord-action-id@subflow-action-name@action-name.
594     * @param conf the conf
595     * @param wfJob the wf job
596     * @param action the action
597     * @return the action yarn tag
598     */
599    public String getActionYarnTag(Configuration conf, WorkflowJob wfJob, WorkflowAction action) {
600        if (conf.get(OOZIE_ACTION_YARN_TAG) != null) {
601            return conf.get(OOZIE_ACTION_YARN_TAG) + "@" + action.getName();
602        }
603        else if (wfJob.getParentId() != null) {
604            return wfJob.getParentId() + "@" + action.getName();
605        }
606        else {
607            return action.getId();
608        }
609    }
610}