001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.wf;
019    
020    import java.io.IOException;
021    import java.io.StringReader;
022    import java.net.URI;
023    import java.net.URISyntaxException;
024    import java.util.Date;
025    import java.util.Properties;
026    
027    import org.apache.hadoop.conf.Configuration;
028    import org.apache.hadoop.fs.FileSystem;
029    import org.apache.hadoop.fs.Path;
030    import org.apache.oozie.DagELFunctions;
031    import org.apache.oozie.WorkflowActionBean;
032    import org.apache.oozie.WorkflowJobBean;
033    import org.apache.oozie.action.ActionExecutor;
034    import org.apache.oozie.client.WorkflowAction;
035    import org.apache.oozie.client.WorkflowJob;
036    import org.apache.oozie.command.CommandException;
037    import org.apache.oozie.service.CallbackService;
038    import org.apache.oozie.service.ELService;
039    import org.apache.oozie.service.HadoopAccessorException;
040    import org.apache.oozie.service.HadoopAccessorService;
041    import org.apache.oozie.service.Services;
042    import org.apache.oozie.store.StoreException;
043    import org.apache.oozie.store.WorkflowStore;
044    import org.apache.oozie.util.ELEvaluator;
045    import org.apache.oozie.util.Instrumentation;
046    import org.apache.oozie.util.XConfiguration;
047    import org.apache.oozie.util.XLog;
048    import org.apache.oozie.workflow.WorkflowException;
049    import org.apache.oozie.workflow.WorkflowInstance;
050    import org.apache.oozie.workflow.lite.LiteWorkflowInstance;
051    
052    /**
053     * Base class for Action execution commands. Provides common functionality to handle different types of errors while
054     * attempting to start or end an action.
055     */
056    public abstract class ActionCommand<T> extends WorkflowCommand<Void> {
057        private static final String INSTRUMENTATION_GROUP = "action.executors";
058    
059        protected static final String INSTR_FAILED_JOBS_COUNTER = "failed";
060    
061        protected static final String RECOVERY_ID_SEPARATOR = "@";
062    
063        public ActionCommand(String name, String type, int priority) {
064            super(name, type, priority, XLog.STD);
065        }
066    
067        /**
068         * Takes care of Transient failures. Sets the action status to retry and increments the retry count if not enough
069         * attempts have been made. Otherwise returns false.
070         *
071         * @param context the execution context.
072         * @param executor the executor instance being used.
073         * @param status the status to be set for the action.
074         * @return true if the action is scheduled for another retry. false if the number of retries has exceeded the
075         *         maximum number of configured retries.
076         * @throws StoreException
077         * @throws org.apache.oozie.command.CommandException
078         */
079        protected boolean handleTransient(ActionExecutor.Context context, ActionExecutor executor, WorkflowAction.Status status)
080                throws StoreException, CommandException {
081            XLog.getLog(getClass()).debug("Attempting to retry");
082            ActionExecutorContext aContext = (ActionExecutorContext) context;
083            WorkflowActionBean action = (WorkflowActionBean) aContext.getAction();
084            incrActionErrorCounter(action.getType(), "transient", 1);
085    
086            int actionRetryCount = action.getRetries();
087            if (actionRetryCount >= executor.getMaxRetries()) {
088                XLog.getLog(getClass()).warn("Exceeded max retry count [{0}]. Suspending Job", executor.getMaxRetries());
089                return false;
090            }
091            else {
092                action.setStatus(status);
093                action.setPending();
094                action.incRetries();
095                long retryDelayMillis = executor.getRetryInterval() * 1000;
096                action.setPendingAge(new Date(System.currentTimeMillis() + retryDelayMillis));
097                XLog.getLog(getClass()).info("Next Retry, Attempt Number [{0}] in [{1}] milliseconds",
098                                             actionRetryCount + 1, retryDelayMillis);
099                queueCallable(this, retryDelayMillis);
100                return true;
101            }
102        }
103    
104        /**
105         * Takes care of non transient failures. The job is suspended, and the state of the action is changed to *MANUAL
106         * and set pending flag of action to false
107         *
108         * @param store WorkflowStore
109         * @param context the execution context.
110         * @param executor the executor instance being used.
111         * @param status the status to be set for the action.
112         * @throws StoreException
113         * @throws CommandException
114         */
115        protected void handleNonTransient(WorkflowStore store, ActionExecutor.Context context, ActionExecutor executor,
116                WorkflowAction.Status status)
117                throws StoreException, CommandException {
118            ActionExecutorContext aContext = (ActionExecutorContext) context;
119            WorkflowActionBean action = (WorkflowActionBean) aContext.getAction();
120            incrActionErrorCounter(action.getType(), "nontransient", 1);
121            WorkflowJobBean workflow = (WorkflowJobBean) context.getWorkflow();
122            String id = workflow.getId();
123            action.setStatus(status);
124            action.resetPendingOnly();
125            XLog.getLog(getClass()).warn("Suspending Workflow Job id=" + id);
126            try {
127                SuspendCommand.suspendJob(store, workflow, id, action.getId());
128            }
129            catch (WorkflowException e) {
130                throw new CommandException(e);
131            }
132        }
133    
134        /**
135         * Takes care of errors. </p> For errors while attempting to start the action, the job state is updated and an
136         * {@link ActionEndCommand} is queued. </p> For errors while attempting to end the action, the job state is updated.
137         * </p>
138         *
139         * @param context the execution context.
140         * @param executor the executor instance being used.
141         * @param message
142         * @param isStart whether the error was generated while starting or ending an action.
143         * @param status the status to be set for the action.
144         * @throws org.apache.oozie.command.CommandException
145         */
146        protected void handleError(ActionExecutor.Context context, ActionExecutor executor, String message,
147                                   boolean isStart, WorkflowAction.Status status) throws CommandException {
148            XLog.getLog(getClass()).warn("Setting Action Status to [{0}]", status);
149            ActionExecutorContext aContext = (ActionExecutorContext) context;
150            WorkflowActionBean action = (WorkflowActionBean) aContext.getAction();
151            incrActionErrorCounter(action.getType(), "error", 1);
152            action.setPending();
153            if (isStart) {
154                action.setExecutionData(message, null);
155                queueCallable(new ActionEndCommand(action.getId(), action.getType()));
156            }
157            else {
158                action.setEndData(status, WorkflowAction.Status.ERROR.toString());
159            }
160        }
161    
162        public void failJob(ActionExecutor.Context context) throws CommandException {
163            ActionExecutorContext aContext = (ActionExecutorContext) context;
164            WorkflowActionBean action = (WorkflowActionBean) aContext.getAction();
165            incrActionErrorCounter(action.getType(), "failed", 1);
166            WorkflowJobBean workflow = (WorkflowJobBean) context.getWorkflow();
167            XLog.getLog(getClass()).warn("Failing Job due to failed action [{0}]", action.getName());
168            try {
169                workflow.getWorkflowInstance().fail(action.getName());
170                WorkflowInstance wfInstance = workflow.getWorkflowInstance();
171                ((LiteWorkflowInstance) wfInstance).setStatus(WorkflowInstance.Status.FAILED);
172                workflow.setWorkflowInstance(wfInstance);
173                workflow.setStatus(WorkflowJob.Status.FAILED);
174                action.setStatus(WorkflowAction.Status.FAILED);
175                action.resetPending();
176                queueCallable(new NotificationCommand(workflow, action));
177                queueCallable(new KillCommand(workflow.getId()));
178                incrJobCounter(INSTR_FAILED_JOBS_COUNTER, 1);
179            }
180            catch (WorkflowException ex) {
181                throw new CommandException(ex);
182            }
183        }
184    
185        private void incrActionErrorCounter(String type, String error, int count) {
186            getInstrumentation().incr(INSTRUMENTATION_GROUP, type + "#ex." + error, count);
187        }
188    
189        protected void incrActionCounter(String type, int count) {
190            getInstrumentation().incr(INSTRUMENTATION_GROUP, type + "#" + getName(), count);
191        }
192    
193        protected void addActionCron(String type, Instrumentation.Cron cron) {
194            getInstrumentation().addCron(INSTRUMENTATION_GROUP, type + "#" + getName(), cron);
195        }
196    
197        public static class ActionExecutorContext implements ActionExecutor.Context {
198            private WorkflowJobBean workflow;
199            private Configuration protoConf;
200            private WorkflowActionBean action;
201            private boolean isRetry;
202            private boolean started;
203            private boolean ended;
204            private boolean executed;
205    
206            public ActionExecutorContext(WorkflowJobBean workflow, WorkflowActionBean action, boolean isRetry) {
207                this.workflow = workflow;
208                this.action = action;
209                this.isRetry = isRetry;
210                try {
211                    protoConf = new XConfiguration(new StringReader(workflow.getProtoActionConf()));
212                }
213                catch (IOException ex) {
214                    throw new RuntimeException("It should not happen", ex);
215                }
216            }
217    
218            public String getCallbackUrl(String externalStatusVar) {
219                return Services.get().get(CallbackService.class).createCallBackUrl(action.getId(), externalStatusVar);
220            }
221    
222            public Configuration getProtoActionConf() {
223                return protoConf;
224            }
225    
226            public WorkflowJob getWorkflow() {
227                return workflow;
228            }
229    
230            public WorkflowAction getAction() {
231                return action;
232            }
233    
234            public ELEvaluator getELEvaluator() {
235                ELEvaluator evaluator = Services.get().get(ELService.class).createEvaluator("workflow");
236                DagELFunctions.configureEvaluator(evaluator, workflow, action);
237                return evaluator;
238            }
239    
240            public void setVar(String name, String value) {
241                name = action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + name;
242                WorkflowInstance wfInstance = workflow.getWorkflowInstance();
243                wfInstance.setVar(name, value);
244                //workflow.getWorkflowInstance().setVar(name, value);
245                workflow.setWorkflowInstance(wfInstance);
246            }
247    
248            public String getVar(String name) {
249                name = action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + name;
250                return workflow.getWorkflowInstance().getVar(name);
251            }
252    
253            public void setStartData(String externalId, String trackerUri, String consoleUrl) {
254                action.setStartData(externalId, trackerUri, consoleUrl);
255                started = true;
256            }
257    
258            public void setExecutionData(String externalStatus, Properties actionData) {
259                action.setExecutionData(externalStatus, actionData);
260                executed = true;
261            }
262    
263            public void setEndData(WorkflowAction.Status status, String signalValue) {
264                action.setEndData(status, signalValue);
265                ended = true;
266            }
267    
268            public boolean isRetry() {
269                return isRetry;
270            }
271    
272            /**
273             * Returns whether setStartData has been called or not.
274             *
275             * @return true if start completion info has been set.
276             */
277            public boolean isStarted() {
278                return started;
279            }
280    
281            /**
282             * Returns whether setExecutionData has been called or not.
283             *
284             * @return true if execution completion info has been set, otherwise false.
285             */
286            public boolean isExecuted() {
287                return executed;
288            }
289    
290    
291            /**
292             * Returns whether setEndData has been called or not.
293             *
294             * @return true if end completion info has been set.
295             */
296            public boolean isEnded() {
297                return ended;
298            }
299    
300            public void setExternalStatus(String externalStatus) {
301                action.setExternalStatus(externalStatus);
302            }
303    
304            @Override
305            public String getRecoveryId() {
306                return action.getId() + RECOVERY_ID_SEPARATOR + workflow.getRun();
307            }
308    
309            /* (non-Javadoc)
310             * @see org.apache.oozie.action.ActionExecutor.Context#getActionDir()
311             */
312            public Path getActionDir() throws HadoopAccessorException, IOException, URISyntaxException {
313                String name = getWorkflow().getId() + "/" + action.getName() + "--" + action.getType();
314                FileSystem fs = getAppFileSystem();
315                String actionDirPath = Services.get().getSystemId() + "/" + name;
316                Path fqActionDir = new Path(fs.getHomeDirectory(), actionDirPath);
317                return fqActionDir;
318            }
319    
320            /* (non-Javadoc)
321             * @see org.apache.oozie.action.ActionExecutor.Context#getAppFileSystem()
322             */
323            public FileSystem getAppFileSystem() throws HadoopAccessorException, IOException, URISyntaxException {
324                WorkflowJob workflow = getWorkflow();
325                XConfiguration jobConf = new XConfiguration(new StringReader(workflow.getConf()));
326                Configuration fsConf = new Configuration();
327                XConfiguration.copy(jobConf, fsConf);
328                return Services.get().get(HadoopAccessorService.class).createFileSystem(workflow.getUser(),
329                        workflow.getGroup(), new URI(getWorkflow().getAppPath()), fsConf);
330    
331            }
332    
333            @Override
334            public void setErrorInfo(String str, String exMsg) {
335                action.setErrorInfo(str, exMsg);
336            }
337        }
338    
339    }