001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.action;
019    
020    import org.apache.hadoop.fs.FileSystem;
021    import org.apache.hadoop.fs.Path;
022    import org.apache.hadoop.conf.Configuration;
023    import org.apache.oozie.client.WorkflowAction;
024    import org.apache.oozie.client.WorkflowJob;
025    import org.apache.oozie.util.ELEvaluator;
026    import org.apache.oozie.util.ParamChecker;
027    import org.apache.oozie.util.XLog;
028    import org.apache.oozie.service.HadoopAccessorException;
029    import org.apache.oozie.service.Services;
030    import org.apache.oozie.servlet.CallbackServlet;
031    
032    import java.io.IOException;
033    import java.net.URISyntaxException;
034    import java.util.HashMap;
035    import java.util.Map;
036    import java.util.Properties;
037    import java.util.LinkedHashMap;
038    
039    /**
040     * Base action executor class. <p/> All the action executors should extend this class.
041     */
042    public abstract class ActionExecutor {
043    
044        /**
045         * Configuration prefix for action executor (sub-classes) properties.
046         */
047            public static final String CONF_PREFIX = "oozie.action.";
048    
049            public static final String MAX_RETRIES = CONF_PREFIX + "retries.max";
050    
051        /**
052         * Error code used by {@link #convertException} when there is not register error information for an exception.
053         */
054        public static final String ERROR_OTHER = "OTHER";
055    
056        private static class ErrorInfo {
057            ActionExecutorException.ErrorType errorType;
058            String errorCode;
059    
060            private ErrorInfo(ActionExecutorException.ErrorType errorType, String errorCode) {
061                this.errorType = errorType;
062                this.errorCode = errorCode;
063            }
064        }
065    
066        private static boolean initMode = false;
067        private static Map<String, Map<Class, ErrorInfo>> ERROR_INFOS = new HashMap<String, Map<Class, ErrorInfo>>();
068    
069        /**
070         * Context information passed to the ActionExecutor methods.
071         */
072        public interface Context {
073    
074            /**
075             * Create the callback URL for the action.
076             *
077             * @param externalStatusVar variable for the caller to inject the external status.
078             * @return the callback URL.
079             */
080            public String getCallbackUrl(String externalStatusVar);
081    
082            /**
083             * Return a proto configuration for actions with auth properties already set.
084             *
085             * @return a proto configuration for actions with auth properties already set.
086             */
087            public Configuration getProtoActionConf();
088    
089            /**
090             * Return the workflow job.
091             *
092             * @return the workflow job.
093             */
094            public WorkflowJob getWorkflow();
095    
096            /**
097             * Return an ELEvaluator with the context injected.
098             *
099             * @return configured ELEvaluator.
100             */
101            public ELEvaluator getELEvaluator();
102    
103            /**
104             * Set a workflow action variable. <p/> Convenience method that prefixes the variable name with the action name
105             * plus a '.'.
106             *
107             * @param name variable name.
108             * @param value variable value, <code>null</code> removes the variable.
109             */
110            public void setVar(String name, String value);
111    
112            /**
113             * Get a workflow action variable. <p/> Convenience method that prefixes the variable name with the action name
114             * plus a '.'.
115             *
116             * @param name variable name.
117             * @return the variable value, <code>null</code> if not set.
118             */
119            public String getVar(String name);
120    
121            /**
122             * Set the action tracking information for an successfully started action.
123             *
124             * @param externalId the action external ID.
125             * @param trackerUri the action tracker URI.
126             * @param consoleUrl the action console URL.
127             */
128            void setStartData(String externalId, String trackerUri, String consoleUrl);
129    
130            /**
131             * Set the action execution completion information for an action. The action status is set to {@link
132             * org.apache.oozie.client.WorkflowAction.Status#DONE}
133             *
134             * @param externalStatus the action external end status.
135             * @param actionData the action data on completion, <code>null</code> if none.
136             */
137            void setExecutionData(String externalStatus, Properties actionData);
138    
139            /**
140             * Set execution statistics information for a particular action. The action status is set to {@link
141             * org.apache.oozie.client.WorkflowAction.Status#DONE}
142             *
143             * @param jsonStats the JSON string representation of the stats.
144             */
145            void setExecutionStats(String jsonStats);
146    
147            /**
148             * Set external child IDs for a particular action (Eg: pig). The action status is set to {@link
149             * org.apache.oozie.client.WorkflowAction.Status#DONE}
150             *
151             * @param externalChildIDs the external child IDs as a comma-delimited string.
152             */
153            void setExternalChildIDs(String externalChildIDs);
154    
155            /**
156             * Set the action end completion information for a completed action.
157             *
158             * @param status the action end status, it can be {@link org.apache.oozie.client.WorkflowAction.Status#OK} or
159             * {@link org.apache.oozie.client.WorkflowAction.Status#ERROR}.
160             * @param signalValue the action external end status.
161             */
162            void setEndData(WorkflowAction.Status status, String signalValue);
163    
164            /**
165             * Return if the executor invocation is a retry or not.
166             *
167             * @return if the executor invocation is a retry or not.
168             */
169            boolean isRetry();
170    
171            /**
172             * Sets the external status for the action in context.
173             *
174             * @param externalStatus the external status.
175             */
176            void setExternalStatus(String externalStatus);
177    
178            /**
179             * Get the Action Recovery ID.
180             *
181             * @return recovery ID.
182             */
183            String getRecoveryId();
184    
185            /*
186             * @return the path that will be used to store action specific data
187             * @throws IOException @throws URISyntaxException @throws HadoopAccessorException
188             */
189            public Path getActionDir() throws HadoopAccessorException, IOException, URISyntaxException;
190    
191            /**
192             * @return filesystem handle for the application deployment fs.
193             * @throws IOException
194             * @throws URISyntaxException
195             * @throws HadoopAccessorException
196             */
197            public FileSystem getAppFileSystem() throws HadoopAccessorException, IOException, URISyntaxException;
198    
199            public void setErrorInfo(String str, String exMsg);
200        }
201    
202    
203        /**
204         * Define the default inteval in seconds between retries.
205         */
206        public static final long RETRY_INTERVAL = 60;
207    
208        private String type;
209        private int maxRetries;
210        private long retryInterval;
211    
212        /**
213         * Create an action executor with default retry parameters.
214         *
215         * @param type action executor type.
216         */
217        protected ActionExecutor(String type) {
218            this(type, RETRY_INTERVAL);
219        }
220    
221        /**
222         * Create an action executor.
223         *
224         * @param type action executor type.
225         * @param retryAttempts retry attempts.
226         * @param retryInterval retry interval, in seconds.
227         */
228        protected ActionExecutor(String type, long retryInterval) {
229            this.type = ParamChecker.notEmpty(type, "type");
230            this.maxRetries = getOozieConf().getInt(MAX_RETRIES, 3);
231            this.retryInterval = retryInterval;
232        }
233    
234        /**
235         * Clear all init settings for all action types.
236         */
237        public static void resetInitInfo() {
238            if (!initMode) {
239                throw new IllegalStateException("Error, action type info locked");
240            }
241            ERROR_INFOS.clear();
242        }
243    
244        /**
245         * Enable action type initialization.
246         */
247        public static void enableInit() {
248            initMode = true;
249        }
250    
251        /**
252         * Disable action type initialization.
253         */
254        public static void disableInit() {
255            initMode = false;
256        }
257    
258        /**
259         * Invoked once at system initialization time. <p/> It can be used to register error information for the expected
260         * exceptions. Exceptions should be register from subclasses to superclasses to ensure proper detection, same thing
261         * that it is done in a normal catch. <p/> This method should invoke the {@link #registerError} method to register
262         * all its possible errors. <p/> Subclasses overriding must invoke super.
263         */
264        public void initActionType() {
265            ERROR_INFOS.put(getType(), new LinkedHashMap<Class, ErrorInfo>());
266        }
267    
268        /**
269         * Return the system ID, this ID is defined in Oozie configuration.
270         *
271         * @return the system ID.
272         */
273        public String getOozieSystemId() {
274            return Services.get().getSystemId();
275        }
276    
277        /**
278         * Return the runtime directory of the Oozie instance. <p/> The directory is created under TMP and it is always a
279         * new directory per system initialization.
280         *
281         * @return the runtime directory of the Oozie instance.
282         */
283        public String getOozieRuntimeDir() {
284            return Services.get().getRuntimeDir();
285        }
286    
287        /**
288         * Return Oozie configuration. <p/> This is useful for actions that need access to configuration properties.
289         *
290         * @return Oozie configuration.
291         */
292        public Configuration getOozieConf() {
293            return Services.get().getConf();
294        }
295    
296        /**
297         * Register error handling information for an exception.
298         *
299         * @param exClass excpetion class name (to work in case of a particular exception not being in the classpath, needed
300         * to be able to handle multiple version of Hadoop  or other JARs used by executors with the same codebase).
301         * @param errorType error type for the exception.
302         * @param errorCode error code for the exception.
303         */
304        protected void registerError(String exClass, ActionExecutorException.ErrorType errorType, String errorCode) {
305            if (!initMode) {
306                throw new IllegalStateException("Error, action type info locked");
307            }
308            try {
309                Class klass = Thread.currentThread().getContextClassLoader().loadClass(exClass);
310                Map<Class, ErrorInfo> executorErrorInfo = ERROR_INFOS.get(getType());
311                executorErrorInfo.put(klass, new ErrorInfo(errorType, errorCode));
312            }
313            catch (ClassNotFoundException ex) {
314                XLog.getLog(getClass()).warn(
315                        "Exception [{0}] no in classpath, ActionExecutor [{1}] will handled it as ERROR", exClass,
316                        getType());
317            }
318        }
319    
320        /**
321         * Return the action executor type.
322         *
323         * @return the action executor type.
324         */
325        public String getType() {
326            return type;
327        }
328    
329        /**
330         * Return the maximum number of retries for the action executor.
331         *
332         * @return the maximum number of retries for the action executor.
333         */
334        public int getMaxRetries() {
335            return maxRetries;
336        }
337    
338        /**
339         * Set the maximum number of retries for the action executor.
340         *
341         * @param maxRetries the maximum number of retries.
342         */
343        public void setMaxRetries(int maxRetries) {
344            this.maxRetries = maxRetries;
345        }
346    
347        /**
348         * Return the retry interval for the action executor in seconds.
349         *
350         * @return the retry interval for the action executor in seconds.
351         */
352        public long getRetryInterval() {
353            return retryInterval;
354        }
355    
356        /**
357         * Sets the retry interval for the action executor.
358         *
359         * @param retryInterval retry interval in seconds.
360         */
361        public void setRetryInterval(long retryInterval) {
362            this.retryInterval = retryInterval;
363        }
364    
365        /**
366         * Utility method to handle exceptions in the {@link #start}, {@link #end}, {@link #kill} and {@link #check} methods
367         * <p/> It uses the error registry to convert exceptions to {@link ActionExecutorException}s.
368         *
369         * @param ex exception to convert.
370         * @return ActionExecutorException converted exception.
371         */
372        @SuppressWarnings({"ThrowableInstanceNeverThrown"})
373        protected ActionExecutorException convertException(Exception ex) {
374            if (ex instanceof ActionExecutorException) {
375                return (ActionExecutorException) ex;
376            }
377            for (Map.Entry<Class, ErrorInfo> errorInfo : ERROR_INFOS.get(getType()).entrySet()) {
378                if (errorInfo.getKey().isInstance(ex)) {
379                    return new ActionExecutorException(errorInfo.getValue().errorType, errorInfo.getValue().errorCode,
380                                                       "{0}", ex.getMessage(), ex);
381                }
382            }
383            String errorCode = ex.getClass().getName();
384            errorCode = errorCode.substring(errorCode.lastIndexOf(".") + 1);
385            return new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, errorCode, "{0}", ex.getMessage(),
386                                               ex);
387        }
388    
389        /**
390         * Convenience method that return the signal for an Action based on the action status.
391         *
392         * @param status action status.
393         * @return the action signal.
394         */
395        protected String getActionSignal(WorkflowAction.Status status) {
396            switch (status) {
397                case OK:
398                    return "OK";
399                case ERROR:
400                case KILLED:
401                    return "ERROR";
402                default:
403                    throw new IllegalArgumentException("Action status for signal can only be OK or ERROR");
404            }
405        }
406    
407        /**
408         * Return the path that will be used to store action specific data
409         *
410         * @param jobId Worfklow ID
411         * @param action Action
412         * @param key An Identifier
413         * @param temp temp directory flag
414         * @return A string that has the path
415         */
416        protected String getActionDirPath(String jobId, WorkflowAction action, String key, boolean temp) {
417            String name = jobId + "/" + action.getName() + "--" + key;
418            if (temp) {
419                name += ".temp";
420            }
421            return getOozieSystemId() + "/" + name;
422        }
423    
424        /**
425         * Return the path that will be used to store action specific data.
426         *
427         * @param jobId Workflow ID
428         * @param action Action
429         * @param key An identifier
430         * @param temp Temp directory flag
431         * @return Path to the directory
432         */
433        public Path getActionDir(String jobId, WorkflowAction action, String key, boolean temp) {
434            return new Path(getActionDirPath(jobId, action, key, temp));
435        }
436    
437        /**
438         * Start an action. <p/> The {@link Context#setStartData} method must be called within this method. <p/> If the
439         * action has completed, the {@link Context#setExecutionData} method must be called within this method.
440         *
441         * @param context executor context.
442         * @param action the action to start.
443         * @throws ActionExecutorException thrown if the action could not start.
444         */
445        public abstract void start(Context context, WorkflowAction action) throws ActionExecutorException;
446    
447        /**
448         * End an action after it has executed. <p/> The {@link Context#setEndData} method must be called within this
449         * method.
450         *
451         * @param context executor context.
452         * @param action the action to end.
453         * @throws ActionExecutorException thrown if the action could not end.
454         */
455        public abstract void end(Context context, WorkflowAction action) throws ActionExecutorException;
456    
457        /**
458         * Check if an action has completed. This method must be implemented by Async Action Executors. <p/> If the action
459         * has completed, the {@link Context#setExecutionData} method must be called within this method. <p/> If the action
460         * has not completed, the {@link Context#setExternalStatus} method must be called within this method.
461         *
462         * @param context executor context.
463         * @param action the action to end.
464         * @throws ActionExecutorException thrown if the action could not be checked.
465         */
466        public abstract void check(Context context, WorkflowAction action) throws ActionExecutorException;
467    
468        /**
469         * Kill an action. <p/> The {@link Context#setEndData} method must be called within this method.
470         *
471         * @param context executor context.
472         * @param action the action to kill.
473         * @throws ActionExecutorException thrown if the action could not be killed.
474         */
475        public abstract void kill(Context context, WorkflowAction action) throws ActionExecutorException;
476    
477        /**
478         * Return if the external status indicates that the action has completed.
479         *
480         * @param externalStatus external status to check.
481         * @return if the external status indicates that the action has completed.
482         */
483        public abstract boolean isCompleted(String externalStatus);
484    
485    }