001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.action;
019    
020    import org.apache.hadoop.fs.FileSystem;
021    import org.apache.hadoop.fs.Path;
022    import org.apache.hadoop.conf.Configuration;
023    import org.apache.oozie.client.WorkflowAction;
024    import org.apache.oozie.client.WorkflowJob;
025    import org.apache.oozie.util.ELEvaluator;
026    import org.apache.oozie.util.ParamChecker;
027    import org.apache.oozie.util.XLog;
028    import org.apache.oozie.service.HadoopAccessorException;
029    import org.apache.oozie.service.Services;
030    
031    import java.io.IOException;
032    import java.net.URISyntaxException;
033    import java.util.HashMap;
034    import java.util.Map;
035    import java.util.Properties;
036    import java.util.LinkedHashMap;
037    
038    /**
039     * Base action executor class. <p/> All the action executors should extend this class.
040     */
041    public abstract class ActionExecutor {
042    
043        /**
044         * Configuration prefix for action executor (sub-classes) properties.
045         */
046        public static final String CONF_PREFIX = "oozie.action.";
047    
048        /**
049         * Error code used by {@link #convertException} when there is not register error information for an exception.
050         */
051        public static final String ERROR_OTHER = "OTHER";
052    
053        private static class ErrorInfo {
054            ActionExecutorException.ErrorType errorType;
055            String errorCode;
056    
057            private ErrorInfo(ActionExecutorException.ErrorType errorType, String errorCode) {
058                this.errorType = errorType;
059                this.errorCode = errorCode;
060            }
061        }
062    
063        private static boolean initMode = false;
064        private static Map<String, Map<Class, ErrorInfo>> ERROR_INFOS = new HashMap<String, Map<Class, ErrorInfo>>();
065    
066        /**
067         * Context information passed to the ActionExecutor methods.
068         */
069        public interface Context {
070    
071            /**
072             * Create the callback URL for the action.
073             *
074             * @param externalStatusVar variable for the caller to inject the external status.
075             * @return the callback URL.
076             */
077            public String getCallbackUrl(String externalStatusVar);
078    
079            /**
080             * Return a proto configuration for actions with auth properties already set.
081             *
082             * @return a proto configuration for actions with auth properties already set.
083             */
084            public Configuration getProtoActionConf();
085    
086            /**
087             * Return the workflow job.
088             *
089             * @return the workflow job.
090             */
091            public WorkflowJob getWorkflow();
092    
093            /**
094             * Return an ELEvaluator with the context injected.
095             *
096             * @return configured ELEvaluator.
097             */
098            public ELEvaluator getELEvaluator();
099    
100            /**
101             * Set a workflow action variable. <p/> Convenience method that prefixes the variable name with the action name
102             * plus a '.'.
103             *
104             * @param name variable name.
105             * @param value variable value, <code>null</code> removes the variable.
106             */
107            public void setVar(String name, String value);
108    
109            /**
110             * Get a workflow action variable. <p/> Convenience method that prefixes the variable name with the action name
111             * plus a '.'.
112             *
113             * @param name variable name.
114             * @return the variable value, <code>null</code> if not set.
115             */
116            public String getVar(String name);
117    
118            /**
119             * Set the action tracking information for an successfully started action.
120             *
121             * @param externalId the action external ID.
122             * @param trackerUri the action tracker URI.
123             * @param consoleUrl the action console URL.
124             */
125            void setStartData(String externalId, String trackerUri, String consoleUrl);
126    
127            /**
128             * Set the action execution completion information for an action. The action status is set to {@link
129             * org.apache.oozie.client.WorkflowAction.Status#DONE}
130             *
131             * @param externalStatus the action external end status.
132             * @param actionData the action data on completion, <code>null</code> if none.
133             */
134            void setExecutionData(String externalStatus, Properties actionData);
135    
136            /**
137             * Set the action end completion information for a completed action.
138             *
139             * @param status the action end status, it can be {@link org.apache.oozie.client.WorkflowAction.Status#OK} or
140             * {@link org.apache.oozie.client.WorkflowAction.Status#ERROR}.
141             * @param signalValue the action external end status.
142             */
143            void setEndData(WorkflowAction.Status status, String signalValue);
144    
145            /**
146             * Return if the executor invocation is a retry or not.
147             *
148             * @return if the executor invocation is a retry or not.
149             */
150            boolean isRetry();
151    
152            /**
153             * Sets the external status for the action in context.
154             *
155             * @param externalStatus the external status.
156             */
157            void setExternalStatus(String externalStatus);
158    
159            /**
160             * Get the Action Recovery ID.
161             *
162             * @return recovery ID.
163             */
164            String getRecoveryId();
165    
166            /*
167             * @return the path that will be used to store action specific data
168             * @throws IOException @throws URISyntaxException @throws HadoopAccessorException
169             */
170            public Path getActionDir() throws HadoopAccessorException, IOException, URISyntaxException;
171    
172            /**
173             * @return filesystem handle for the application deployment fs.
174             * @throws IOException
175             * @throws URISyntaxException
176             * @throws HadoopAccessorException
177             */
178            public FileSystem getAppFileSystem() throws HadoopAccessorException, IOException, URISyntaxException;
179    
180            public void setErrorInfo(String str, String exMsg);
181        }
182    
183        /**
184         * Define the default maximum number of retry attempts for transient errors (total attempts = 1 + MAX_RETRIES).
185         */
186        public static final int MAX_RETRIES = 3;
187    
188        /**
189         * Define the default inteval in seconds between retries.
190         */
191        public static final long RETRY_INTERVAL = 60;
192    
193        private String type;
194        private int maxRetries;
195        private long retryInterval;
196    
197        /**
198         * Create an action executor with default retry parameters.
199         *
200         * @param type action executor type.
201         */
202        protected ActionExecutor(String type) {
203            this(type, MAX_RETRIES, RETRY_INTERVAL);
204        }
205    
206        /**
207         * Create an action executor.
208         *
209         * @param type action executor type.
210         * @param retryAttempts retry attempts.
211         * @param retryInterval retry interval, in seconds.
212         */
213        protected ActionExecutor(String type, int retryAttempts, long retryInterval) {
214            this.type = ParamChecker.notEmpty(type, "type");
215            this.maxRetries = retryAttempts;
216            this.retryInterval = retryInterval;
217        }
218    
219        /**
220         * Clear all init settings for all action types.
221         */
222        public static void resetInitInfo() {
223            if (!initMode) {
224                throw new IllegalStateException("Error, action type info locked");
225            }
226            ERROR_INFOS.clear();
227        }
228    
229        /**
230         * Enable action type initialization.
231         */
232        public static void enableInit() {
233            initMode = true;
234        }
235    
236        /**
237         * Disable action type initialization.
238         */
239        public static void disableInit() {
240            initMode = false;
241        }
242    
243        /**
244         * Invoked once at system initialization time. <p/> It can be used to register error information for the expected
245         * exceptions. Exceptions should be register from subclasses to superclasses to ensure proper detection, same thing
246         * that it is done in a normal catch. <p/> This method should invoke the {@link #registerError} method to register
247         * all its possible errors. <p/> Subclasses overriding must invoke super.
248         */
249        public void initActionType() {
250            ERROR_INFOS.put(getType(), new LinkedHashMap<Class, ErrorInfo>());
251        }
252    
253        /**
254         * Return the system ID, this ID is defined in Oozie configuration.
255         *
256         * @return the system ID.
257         */
258        public String getOozieSystemId() {
259            return Services.get().getSystemId();
260        }
261    
262        /**
263         * Return the runtime directory of the Oozie instance. <p/> The directory is created under TMP and it is always a
264         * new directory per system initialization.
265         *
266         * @return the runtime directory of the Oozie instance.
267         */
268        public String getOozieRuntimeDir() {
269            return Services.get().getRuntimeDir();
270        }
271    
272        /**
273         * Return Oozie configuration. <p/> This is useful for actions that need access to configuration properties.
274         *
275         * @return Oozie configuration.
276         */
277        public Configuration getOozieConf() {
278            return Services.get().getConf();
279        }
280    
281        /**
282         * Register error handling information for an exception.
283         *
284         * @param exClass excpetion class name (to work in case of a particular exception not being in the classpath, needed
285         * to be able to handle multiple version of Hadoop  or other JARs used by executors with the same codebase).
286         * @param errorType error type for the exception.
287         * @param errorCode error code for the exception.
288         */
289        protected void registerError(String exClass, ActionExecutorException.ErrorType errorType, String errorCode) {
290            if (!initMode) {
291                throw new IllegalStateException("Error, action type info locked");
292            }
293            try {
294                Class klass = Thread.currentThread().getContextClassLoader().loadClass(exClass);
295                Map<Class, ErrorInfo> executorErrorInfo = ERROR_INFOS.get(getType());
296                executorErrorInfo.put(klass, new ErrorInfo(errorType, errorCode));
297            }
298            catch (ClassNotFoundException ex) {
299                XLog.getLog(getClass()).warn(
300                        "Exception [{0}] no in classpath, ActionExecutor [{1}] will handled it as ERROR", exClass,
301                        getType());
302            }
303        }
304    
305        /**
306         * Return the action executor type.
307         *
308         * @return the action executor type.
309         */
310        public String getType() {
311            return type;
312        }
313    
314        /**
315         * Return the maximum number of retries for the action executor.
316         *
317         * @return the maximum number of retries for the action executor.
318         */
319        public int getMaxRetries() {
320            return maxRetries;
321        }
322    
323        /**
324         * Set the maximum number of retries for the action executor.
325         *
326         * @param maxRetries the maximum number of retries.
327         */
328        public void setMaxRetries(int maxRetries) {
329            this.maxRetries = maxRetries;
330        }
331    
332        /**
333         * Return the retry interval for the action executor in seconds.
334         *
335         * @return the retry interval for the action executor in seconds.
336         */
337        public long getRetryInterval() {
338            return retryInterval;
339        }
340    
341        /**
342         * Sets the retry interval for the action executor.
343         *
344         * @param retryInterval retry interval in seconds.
345         */
346        public void setRetryInterval(long retryInterval) {
347            this.retryInterval = retryInterval;
348        }
349    
350        /**
351         * Utility method to handle exceptions in the {@link #start}, {@link #end}, {@link #kill} and {@link #check} methods
352         * <p/> It uses the error registry to convert exceptions to {@link ActionExecutorException}s.
353         *
354         * @param ex exception to convert.
355         * @return ActionExecutorException converted exception.
356         */
357        @SuppressWarnings({"ThrowableInstanceNeverThrown"})
358        protected ActionExecutorException convertException(Exception ex) {
359            if (ex instanceof ActionExecutorException) {
360                return (ActionExecutorException) ex;
361            }
362            for (Map.Entry<Class, ErrorInfo> errorInfo : ERROR_INFOS.get(getType()).entrySet()) {
363                if (errorInfo.getKey().isInstance(ex)) {
364                    return new ActionExecutorException(errorInfo.getValue().errorType, errorInfo.getValue().errorCode,
365                                                       "{0}", ex.getMessage(), ex);
366                }
367            }
368            String errorCode = ex.getClass().getName();
369            errorCode = errorCode.substring(errorCode.lastIndexOf(".") + 1);
370            return new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, errorCode, "{0}", ex.getMessage(),
371                                               ex);
372        }
373    
374        /**
375         * Convenience method that return the signal for an Action based on the action status.
376         *
377         * @param status action status.
378         * @return the action signal.
379         */
380        protected String getActionSignal(WorkflowAction.Status status) {
381            switch (status) {
382                case OK:
383                    return "OK";
384                case ERROR:
385                case KILLED:
386                    return "ERROR";
387                default:
388                    throw new IllegalArgumentException("Action status for signal can only be OK or ERROR");
389            }
390        }
391    
392        /**
393         * Return the path that will be used to store action specific data
394         *
395         * @param jobId Worfklow ID
396         * @param action Action
397         * @param key An Identifier
398         * @param temp temp directory flag
399         * @return A string that has the path
400         */
401        protected String getActionDirPath(String jobId, WorkflowAction action, String key, boolean temp) {
402            String name = jobId + "/" + action.getName() + "--" + key;
403            if (temp) {
404                name += ".temp";
405            }
406            return getOozieSystemId() + "/" + name;
407        }
408    
409        /**
410         * Return the path that will be used to store action specific data.
411         *
412         * @param jobId Workflow ID
413         * @param action Action
414         * @param key An identifier
415         * @param temp Temp directory flag
416         * @return Path to the directory
417         */
418        public Path getActionDir(String jobId, WorkflowAction action, String key, boolean temp) {
419            return new Path(getActionDirPath(jobId, action, key, temp));
420        }
421    
422        /**
423         * Start an action. <p/> The {@link Context#setStartData} method must be called within this method. <p/> If the
424         * action has completed, the {@link Context#setExecutionData} method must be called within this method.
425         *
426         * @param context executor context.
427         * @param action the action to start.
428         * @throws ActionExecutorException thrown if the action could not start.
429         */
430        public abstract void start(Context context, WorkflowAction action) throws ActionExecutorException;
431    
432        /**
433         * End an action after it has executed. <p/> The {@link Context#setEndData} method must be called within this
434         * method.
435         *
436         * @param context executor context.
437         * @param action the action to end.
438         * @throws ActionExecutorException thrown if the action could not end.
439         */
440        public abstract void end(Context context, WorkflowAction action) throws ActionExecutorException;
441    
442        /**
443         * Check if an action has completed. This method must be implemented by Async Action Executors. <p/> If the action
444         * has completed, the {@link Context#setExecutionData} method must be called within this method. <p/> If the action
445         * has not completed, the {@link Context#setExternalStatus} method must be called within this method.
446         *
447         * @param context executor context.
448         * @param action the action to end.
449         * @throws ActionExecutorException thrown if the action could not be checked.
450         */
451        public abstract void check(Context context, WorkflowAction action) throws ActionExecutorException;
452    
453        /**
454         * Kill an action. <p/> The {@link Context#setEndData} method must be called within this method.
455         *
456         * @param context executor context.
457         * @param action the action to kill.
458         * @throws ActionExecutorException thrown if the action could not be killed.
459         */
460        public abstract void kill(Context context, WorkflowAction action) throws ActionExecutorException;
461    
462        /**
463         * Return if the external status indicates that the action has completed.
464         *
465         * @param externalStatus external status to check.
466         * @return if the external status indicates that the action has completed.
467         */
468        public abstract boolean isCompleted(String externalStatus);
469    
470    }