001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.action;
019    
020    import org.apache.hadoop.fs.FileSystem;
021    import org.apache.hadoop.fs.Path;
022    import org.apache.hadoop.conf.Configuration;
023    import org.apache.oozie.client.WorkflowAction;
024    import org.apache.oozie.client.WorkflowJob;
025    import org.apache.oozie.util.ELEvaluator;
026    import org.apache.oozie.util.ParamChecker;
027    import org.apache.oozie.util.XLog;
028    import org.apache.oozie.service.HadoopAccessorException;
029    import org.apache.oozie.service.Services;
030    
031    import java.io.ByteArrayOutputStream;
032    import java.io.IOException;
033    import java.io.PrintStream;
034    import java.net.URISyntaxException;
035    import java.util.HashMap;
036    import java.util.Map;
037    import java.util.Properties;
038    import java.util.LinkedHashMap;
039    
040    /**
041     * Base action executor class. <p/> All the action executors should extend this class.
042     */
043    public abstract class ActionExecutor {
044    
045        /**
046         * Configuration prefix for action executor (sub-classes) properties.
047         */
048            public static final String CONF_PREFIX = "oozie.action.";
049    
050            public static final String MAX_RETRIES = CONF_PREFIX + "retries.max";
051    
052        /**
053         * Error code used by {@link #convertException} when there is not register error information for an exception.
054         */
055        public static final String ERROR_OTHER = "OTHER";
056        
057        public boolean requiresNNJT = false;
058    
059        private static class ErrorInfo {
060            ActionExecutorException.ErrorType errorType;
061            String errorCode;
062            Class<?> errorClass;
063    
064            private ErrorInfo(ActionExecutorException.ErrorType errorType, String errorCode, Class<?> errorClass) {
065                this.errorType = errorType;
066                this.errorCode = errorCode;
067                this.errorClass = errorClass;
068            }
069        }
070    
071        private static boolean initMode = false;
072        private static Map<String, Map<String, ErrorInfo>> ERROR_INFOS = new HashMap<String, Map<String, ErrorInfo>>();
073    
074        /**
075         * Context information passed to the ActionExecutor methods.
076         */
077        public interface Context {
078    
079            /**
080             * Create the callback URL for the action.
081             *
082             * @param externalStatusVar variable for the caller to inject the external status.
083             * @return the callback URL.
084             */
085            public String getCallbackUrl(String externalStatusVar);
086    
087            /**
088             * Return a proto configuration for actions with auth properties already set.
089             *
090             * @return a proto configuration for actions with auth properties already set.
091             */
092            public Configuration getProtoActionConf();
093    
094            /**
095             * Return the workflow job.
096             *
097             * @return the workflow job.
098             */
099            public WorkflowJob getWorkflow();
100    
101            /**
102             * Return an ELEvaluator with the context injected.
103             *
104             * @return configured ELEvaluator.
105             */
106            public ELEvaluator getELEvaluator();
107    
108            /**
109             * Set a workflow action variable. <p/> Convenience method that prefixes the variable name with the action name
110             * plus a '.'.
111             *
112             * @param name variable name.
113             * @param value variable value, <code>null</code> removes the variable.
114             */
115            public void setVar(String name, String value);
116    
117            /**
118             * Get a workflow action variable. <p/> Convenience method that prefixes the variable name with the action name
119             * plus a '.'.
120             *
121             * @param name variable name.
122             * @return the variable value, <code>null</code> if not set.
123             */
124            public String getVar(String name);
125    
126            /**
127             * Set the action tracking information for an successfully started action.
128             *
129             * @param externalId the action external ID.
130             * @param trackerUri the action tracker URI.
131             * @param consoleUrl the action console URL.
132             */
133            void setStartData(String externalId, String trackerUri, String consoleUrl);
134    
135            /**
136             * Set the action execution completion information for an action. The action status is set to {@link
137             * org.apache.oozie.client.WorkflowAction.Status#DONE}
138             *
139             * @param externalStatus the action external end status.
140             * @param actionData the action data on completion, <code>null</code> if none.
141             */
142            void setExecutionData(String externalStatus, Properties actionData);
143    
144            /**
145             * Set execution statistics information for a particular action. The action status is set to {@link
146             * org.apache.oozie.client.WorkflowAction.Status#DONE}
147             *
148             * @param jsonStats the JSON string representation of the stats.
149             */
150            void setExecutionStats(String jsonStats);
151    
152            /**
153             * Set external child IDs for a particular action (Eg: pig). The action status is set to {@link
154             * org.apache.oozie.client.WorkflowAction.Status#DONE}
155             *
156             * @param externalChildIDs the external child IDs as a comma-delimited string.
157             */
158            void setExternalChildIDs(String externalChildIDs);
159    
160            /**
161             * Set the action end completion information for a completed action.
162             *
163             * @param status the action end status, it can be {@link org.apache.oozie.client.WorkflowAction.Status#OK} or
164             * {@link org.apache.oozie.client.WorkflowAction.Status#ERROR}.
165             * @param signalValue the action external end status.
166             */
167            void setEndData(WorkflowAction.Status status, String signalValue);
168    
169            /**
170             * Return if the executor invocation is a retry or not.
171             *
172             * @return if the executor invocation is a retry or not.
173             */
174            boolean isRetry();
175    
176            /**
177             * Sets the external status for the action in context.
178             *
179             * @param externalStatus the external status.
180             */
181            void setExternalStatus(String externalStatus);
182    
183            /**
184             * Get the Action Recovery ID.
185             *
186             * @return recovery ID.
187             */
188            String getRecoveryId();
189    
190            /*
191             * @return the path that will be used to store action specific data
192             * @throws IOException @throws URISyntaxException @throws HadoopAccessorException
193             */
194            public Path getActionDir() throws HadoopAccessorException, IOException, URISyntaxException;
195    
196            /**
197             * @return filesystem handle for the application deployment fs.
198             * @throws IOException
199             * @throws URISyntaxException
200             * @throws HadoopAccessorException
201             */
202            public FileSystem getAppFileSystem() throws HadoopAccessorException, IOException, URISyntaxException;
203    
204            public void setErrorInfo(String str, String exMsg);
205        }
206    
207    
208        /**
209         * Define the default inteval in seconds between retries.
210         */
211        public static final long RETRY_INTERVAL = 60;
212    
213        private String type;
214        private int maxRetries;
215        private long retryInterval;
216    
217        /**
218         * Create an action executor with default retry parameters.
219         *
220         * @param type action executor type.
221         */
222        protected ActionExecutor(String type) {
223            this(type, RETRY_INTERVAL);
224        }
225    
226        /**
227         * Create an action executor.
228         *
229         * @param type action executor type.
230         * @param retryAttempts retry attempts.
231         * @param retryInterval retry interval, in seconds.
232         */
233        protected ActionExecutor(String type, long retryInterval) {
234            this.type = ParamChecker.notEmpty(type, "type");
235            this.maxRetries = getOozieConf().getInt(MAX_RETRIES, 3);
236            this.retryInterval = retryInterval;
237        }
238    
239        /**
240         * Clear all init settings for all action types.
241         */
242        public static void resetInitInfo() {
243            if (!initMode) {
244                throw new IllegalStateException("Error, action type info locked");
245            }
246            ERROR_INFOS.clear();
247        }
248    
249        /**
250         * Enable action type initialization.
251         */
252        public static void enableInit() {
253            initMode = true;
254        }
255    
256        /**
257         * Disable action type initialization.
258         */
259        public static void disableInit() {
260            initMode = false;
261        }
262    
263        /**
264         * Invoked once at system initialization time. <p/> It can be used to register error information for the expected
265         * exceptions. Exceptions should be register from subclasses to superclasses to ensure proper detection, same thing
266         * that it is done in a normal catch. <p/> This method should invoke the {@link #registerError} method to register
267         * all its possible errors. <p/> Subclasses overriding must invoke super.
268         */
269        public void initActionType() {
270            XLog.getLog(getClass()).trace(" Init Action Type : [{0}]", getType());
271            ERROR_INFOS.put(getType(), new LinkedHashMap<String, ErrorInfo>());
272        }
273    
274        /**
275         * Return the system ID, this ID is defined in Oozie configuration.
276         *
277         * @return the system ID.
278         */
279        public String getOozieSystemId() {
280            return Services.get().getSystemId();
281        }
282    
283        /**
284         * Return the runtime directory of the Oozie instance. <p/> The directory is created under TMP and it is always a
285         * new directory per system initialization.
286         *
287         * @return the runtime directory of the Oozie instance.
288         */
289        public String getOozieRuntimeDir() {
290            return Services.get().getRuntimeDir();
291        }
292    
293        /**
294         * Return Oozie configuration. <p/> This is useful for actions that need access to configuration properties.
295         *
296         * @return Oozie configuration.
297         */
298        public Configuration getOozieConf() {
299            return Services.get().getConf();
300        }
301    
302        /**
303         * Register error handling information for an exception.
304         *
305         * @param exClass excpetion class name (to work in case of a particular exception not being in the classpath, needed
306         * to be able to handle multiple version of Hadoop  or other JARs used by executors with the same codebase).
307         * @param errorType error type for the exception.
308         * @param errorCode error code for the exception.
309         */
310        protected void registerError(String exClass, ActionExecutorException.ErrorType errorType, String errorCode) {
311            if (!initMode) {
312                throw new IllegalStateException("Error, action type info locked");
313            }
314            try {
315                Class errorClass = Thread.currentThread().getContextClassLoader().loadClass(exClass);
316                Map<String, ErrorInfo> executorErrorInfo = ERROR_INFOS.get(getType());
317                executorErrorInfo.put(exClass, new ErrorInfo(errorType, errorCode, errorClass));
318            }
319            catch (ClassNotFoundException cnfe) {
320                XLog.getLog(getClass()).warn(
321                        "Exception [{0}] not in classpath, ActionExecutor [{1}] will handle it as ERROR", exClass,
322                        getType());
323            }
324            catch (java.lang.NoClassDefFoundError err) {
325                ByteArrayOutputStream baos = new ByteArrayOutputStream();
326                err.printStackTrace(new PrintStream(baos));
327                XLog.getLog(getClass()).warn(baos.toString());
328            }
329        }
330    
331        /**
332         * Return the action executor type.
333         *
334         * @return the action executor type.
335         */
336        public String getType() {
337            return type;
338        }
339    
340        /**
341         * Return the maximum number of retries for the action executor.
342         *
343         * @return the maximum number of retries for the action executor.
344         */
345        public int getMaxRetries() {
346            return maxRetries;
347        }
348    
349        /**
350         * Set the maximum number of retries for the action executor.
351         *
352         * @param maxRetries the maximum number of retries.
353         */
354        public void setMaxRetries(int maxRetries) {
355            this.maxRetries = maxRetries;
356        }
357    
358        /**
359         * Return the retry interval for the action executor in seconds.
360         *
361         * @return the retry interval for the action executor in seconds.
362         */
363        public long getRetryInterval() {
364            return retryInterval;
365        }
366    
367        /**
368         * Sets the retry interval for the action executor.
369         *
370         * @param retryInterval retry interval in seconds.
371         */
372        public void setRetryInterval(long retryInterval) {
373            this.retryInterval = retryInterval;
374        }
375    
376        /**
377         * Utility method to handle exceptions in the {@link #start}, {@link #end}, {@link #kill} and {@link #check} methods
378         * <p/> It uses the error registry to convert exceptions to {@link ActionExecutorException}s.
379         *
380         * @param ex exception to convert.
381         * @return ActionExecutorException converted exception.
382         */
383        @SuppressWarnings({"ThrowableInstanceNeverThrown"})
384        protected ActionExecutorException convertException(Exception ex) {
385            if (ex instanceof ActionExecutorException) {
386                return (ActionExecutorException) ex;
387            }
388    
389            ActionExecutorException aee = null;
390            // Check the cause of the exception first
391            if (ex.getCause() != null) {
392                aee = convertExceptionHelper(ex.getCause());
393            }
394            // If the cause isn't registered or doesn't exist, check the exception itself
395            if (aee == null) {
396                aee = convertExceptionHelper(ex);
397                // If the cause isn't registered either, then just create a new ActionExecutorException
398                if (aee == null) {
399                    String exClass = ex.getClass().getName();
400                    String errorCode = exClass.substring(exClass.lastIndexOf(".") + 1);
401                    aee = new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, errorCode, "{0}", ex.getMessage(), ex);
402                }
403            }
404            return aee;
405        }
406    
407        private ActionExecutorException convertExceptionHelper(Throwable ex) {
408            Map<String, ErrorInfo> executorErrorInfo = ERROR_INFOS.get(getType());
409            // Check if we have registered ex
410            ErrorInfo classErrorInfo = executorErrorInfo.get(ex.getClass().getName());
411            if (classErrorInfo != null) {
412                return new ActionExecutorException(classErrorInfo.errorType, classErrorInfo.errorCode, "{0}", ex.getMessage(), ex);
413            }
414            // Else, check if a parent class of ex is registered
415            else {
416                for (ErrorInfo errorInfo : executorErrorInfo.values()) {
417                    if (errorInfo.errorClass.isInstance(ex)) {
418                        return new ActionExecutorException(errorInfo.errorType, errorInfo.errorCode, "{0}", ex.getMessage(), ex);
419                    }
420                }
421            }
422            return null;
423        }
424    
425        /**
426         * Convenience method that return the signal for an Action based on the action status.
427         *
428         * @param status action status.
429         * @return the action signal.
430         */
431        protected String getActionSignal(WorkflowAction.Status status) {
432            switch (status) {
433                case OK:
434                    return "OK";
435                case ERROR:
436                case KILLED:
437                    return "ERROR";
438                default:
439                    throw new IllegalArgumentException("Action status for signal can only be OK or ERROR");
440            }
441        }
442    
443        /**
444         * Return the path that will be used to store action specific data
445         *
446         * @param jobId Worfklow ID
447         * @param action Action
448         * @param key An Identifier
449         * @param temp temp directory flag
450         * @return A string that has the path
451         */
452        protected String getActionDirPath(String jobId, WorkflowAction action, String key, boolean temp) {
453            String name = jobId + "/" + action.getName() + "--" + key;
454            if (temp) {
455                name += ".temp";
456            }
457            return getOozieSystemId() + "/" + name;
458        }
459    
460        /**
461         * Return the path that will be used to store action specific data.
462         *
463         * @param jobId Workflow ID
464         * @param action Action
465         * @param key An identifier
466         * @param temp Temp directory flag
467         * @return Path to the directory
468         */
469        public Path getActionDir(String jobId, WorkflowAction action, String key, boolean temp) {
470            return new Path(getActionDirPath(jobId, action, key, temp));
471        }
472    
473        /**
474         * Start an action. <p/> The {@link Context#setStartData} method must be called within this method. <p/> If the
475         * action has completed, the {@link Context#setExecutionData} method must be called within this method.
476         *
477         * @param context executor context.
478         * @param action the action to start.
479         * @throws ActionExecutorException thrown if the action could not start.
480         */
481        public abstract void start(Context context, WorkflowAction action) throws ActionExecutorException;
482    
483        /**
484         * End an action after it has executed. <p/> The {@link Context#setEndData} method must be called within this
485         * method.
486         *
487         * @param context executor context.
488         * @param action the action to end.
489         * @throws ActionExecutorException thrown if the action could not end.
490         */
491        public abstract void end(Context context, WorkflowAction action) throws ActionExecutorException;
492    
493        /**
494         * Check if an action has completed. This method must be implemented by Async Action Executors. <p/> If the action
495         * has completed, the {@link Context#setExecutionData} method must be called within this method. <p/> If the action
496         * has not completed, the {@link Context#setExternalStatus} method must be called within this method.
497         *
498         * @param context executor context.
499         * @param action the action to end.
500         * @throws ActionExecutorException thrown if the action could not be checked.
501         */
502        public abstract void check(Context context, WorkflowAction action) throws ActionExecutorException;
503    
504        /**
505         * Kill an action. <p/> The {@link Context#setEndData} method must be called within this method.
506         *
507         * @param context executor context.
508         * @param action the action to kill.
509         * @throws ActionExecutorException thrown if the action could not be killed.
510         */
511        public abstract void kill(Context context, WorkflowAction action) throws ActionExecutorException;
512    
513        /**
514         * Return if the external status indicates that the action has completed.
515         *
516         * @param externalStatus external status to check.
517         * @return if the external status indicates that the action has completed.
518         */
519        public abstract boolean isCompleted(String externalStatus);
520    
521    }