001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.wf;
019    
020    import java.io.IOException;
021    import java.io.StringReader;
022    import java.net.URI;
023    import java.net.URISyntaxException;
024    import java.util.Date;
025    import java.util.Properties;
026    import java.util.Set;
027    
028    import org.apache.hadoop.conf.Configuration;
029    import org.apache.hadoop.fs.FileSystem;
030    import org.apache.hadoop.fs.Path;
031    import org.apache.oozie.DagELFunctions;
032    import org.apache.oozie.ErrorCode;
033    import org.apache.oozie.WorkflowActionBean;
034    import org.apache.oozie.WorkflowJobBean;
035    import org.apache.oozie.action.ActionExecutor;
036    import org.apache.oozie.client.WorkflowAction;
037    import org.apache.oozie.client.WorkflowJob;
038    import org.apache.oozie.command.CommandException;
039    import org.apache.oozie.command.coord.CoordActionUpdateXCommand;
040    import org.apache.oozie.service.CallbackService;
041    import org.apache.oozie.service.ELService;
042    import org.apache.oozie.service.HadoopAccessorException;
043    import org.apache.oozie.service.HadoopAccessorService;
044    import org.apache.oozie.service.JPAService;
045    import org.apache.oozie.service.LiteWorkflowStoreService;
046    import org.apache.oozie.service.Services;
047    import org.apache.oozie.util.ELEvaluator;
048    import org.apache.oozie.util.InstrumentUtils;
049    import org.apache.oozie.util.Instrumentation;
050    import org.apache.oozie.util.XConfiguration;
051    import org.apache.oozie.workflow.WorkflowException;
052    import org.apache.oozie.workflow.WorkflowInstance;
053    import org.apache.oozie.workflow.lite.LiteWorkflowInstance;
054    
055    /**
056     * Base class for Action execution commands. Provides common functionality to handle different types of errors while
057     * attempting to start or end an action.
058     */
059    public abstract class ActionXCommand<T> extends WorkflowXCommand<Void> {
060        private static final String INSTRUMENTATION_GROUP = "action.executors";
061    
062        protected static final String INSTR_FAILED_JOBS_COUNTER = "failed";
063    
064        protected static final String RECOVERY_ID_SEPARATOR = "@";
065    
066        public ActionXCommand(String name, String type, int priority) {
067            super(name, type, priority);
068        }
069    
070        /**
071         * Takes care of Transient failures. Sets the action status to retry and increments the retry count if not enough
072         * attempts have been made. Otherwise returns false.
073         *
074         * @param context the execution context.
075         * @param executor the executor instance being used.
076         * @param status the status to be set for the action.
077         * @return true if the action is scheduled for another retry. false if the number of retries has exceeded the
078         *         maximum number of configured retries.
079         * @throws CommandException thrown if unable to handle transient
080         */
081        protected boolean handleTransient(ActionExecutor.Context context, ActionExecutor executor,
082                WorkflowAction.Status status) throws CommandException {
083            LOG.debug("Attempting to retry");
084            ActionExecutorContext aContext = (ActionExecutorContext) context;
085            WorkflowActionBean action = (WorkflowActionBean) aContext.getAction();
086            incrActionErrorCounter(action.getType(), "transient", 1);
087    
088            int actionRetryCount = action.getRetries();
089            if (actionRetryCount >= executor.getMaxRetries()) {
090                LOG.warn("Exceeded max retry count [{0}]. Suspending Job", executor.getMaxRetries());
091                return false;
092            }
093            else {
094                action.setStatus(status);
095                action.setPending();
096                action.incRetries();
097                long retryDelayMillis = executor.getRetryInterval() * 1000;
098                action.setPendingAge(new Date(System.currentTimeMillis() + retryDelayMillis));
099                LOG.info("Next Retry, Attempt Number [{0}] in [{1}] milliseconds", actionRetryCount + 1, retryDelayMillis);
100                this.resetUsed();
101                queue(this, retryDelayMillis);
102                return true;
103            }
104        }
105    
106        /**
107         * Takes care of non transient failures. The job is suspended, and the state of the action is changed to *MANUAL and
108         * set pending flag of action to false
109         *
110         * @param context the execution context.
111         * @param executor the executor instance being used.
112         * @param status the status to be set for the action.
113         * @throws CommandException thrown if unable to suspend job
114         */
115        protected void handleNonTransient(ActionExecutor.Context context, ActionExecutor executor,
116                WorkflowAction.Status status) throws CommandException {
117            ActionExecutorContext aContext = (ActionExecutorContext) context;
118            WorkflowActionBean action = (WorkflowActionBean) aContext.getAction();
119            incrActionErrorCounter(action.getType(), "nontransient", 1);
120            WorkflowJobBean workflow = (WorkflowJobBean) context.getWorkflow();
121            String id = workflow.getId();
122            action.setStatus(status);
123            action.resetPendingOnly();
124            LOG.warn("Suspending Workflow Job id=" + id);
125            try {
126                SuspendXCommand.suspendJob(Services.get().get(JPAService.class), workflow, id, action.getId());
127            }
128            catch (Exception e) {
129                throw new CommandException(ErrorCode.E0727, e.getMessage());
130            }
131            finally {
132                // update coordinator action
133                new CoordActionUpdateXCommand(workflow, 3).call();
134            }
135        }
136    
137        /**
138         * Takes care of errors. </p> For errors while attempting to start the action, the job state is updated and an
139         * {@link ActionEndCommand} is queued. </p> For errors while attempting to end the action, the job state is updated.
140         * </p>
141         *
142         * @param context the execution context.
143         * @param executor the executor instance being used.
144         * @param message
145         * @param isStart whether the error was generated while starting or ending an action.
146         * @param status the status to be set for the action.
147         * @throws CommandException thrown if unable to handle action error
148         */
149        protected void handleError(ActionExecutor.Context context, ActionExecutor executor, String message,
150                boolean isStart, WorkflowAction.Status status) throws CommandException {
151            LOG.warn("Setting Action Status to [{0}]", status);
152            ActionExecutorContext aContext = (ActionExecutorContext) context;
153            WorkflowActionBean action = (WorkflowActionBean) aContext.getAction();
154    
155            if (!handleUserRetry(action)) {
156                incrActionErrorCounter(action.getType(), "error", 1);
157                action.setPending();
158                if (isStart) {
159                    action.setExecutionData(message, null);
160                    queue(new ActionEndXCommand(action.getId(), action.getType()));
161                }
162                else {
163                    action.setEndData(status, WorkflowAction.Status.ERROR.toString());
164                }
165            }
166        }
167    
168        /**
169         * Fail the job due to failed action
170         *
171         * @param context the execution context.
172         * @throws CommandException thrown if unable to fail job
173         */
174        public void failJob(ActionExecutor.Context context) throws CommandException {
175            ActionExecutorContext aContext = (ActionExecutorContext) context;
176            WorkflowActionBean action = (WorkflowActionBean) aContext.getAction();
177            WorkflowJobBean workflow = (WorkflowJobBean) context.getWorkflow();
178    
179            if (!handleUserRetry(action)) {
180                incrActionErrorCounter(action.getType(), "failed", 1);
181                LOG.warn("Failing Job due to failed action [{0}]", action.getName());
182                try {
183                    workflow.getWorkflowInstance().fail(action.getName());
184                    WorkflowInstance wfInstance = workflow.getWorkflowInstance();
185                    ((LiteWorkflowInstance) wfInstance).setStatus(WorkflowInstance.Status.FAILED);
186                    workflow.setWorkflowInstance(wfInstance);
187                    workflow.setStatus(WorkflowJob.Status.FAILED);
188                    action.setStatus(WorkflowAction.Status.FAILED);
189                    action.resetPending();
190                    queue(new NotificationXCommand(workflow, action));
191                    queue(new KillXCommand(workflow.getId()));
192                    InstrumentUtils.incrJobCounter(INSTR_FAILED_JOBS_COUNTER, 1, getInstrumentation());
193                }
194                catch (WorkflowException ex) {
195                    throw new CommandException(ex);
196                }
197            }
198        }
199    
200        /**
201         * Execute retry for action if this action is eligible for user-retry
202         *
203         * @param context the execution context.
204         * @return true if user-retry has to be handled for this action
205         * @throws CommandException thrown if unable to fail job
206         */
207        public boolean handleUserRetry(WorkflowActionBean action) throws CommandException {
208            String errorCode = action.getErrorCode();
209            Set<String> allowedRetryCode = LiteWorkflowStoreService.getUserRetryErrorCode();
210    
211            if (allowedRetryCode.contains(errorCode) && action.getUserRetryCount() < action.getUserRetryMax()) {
212                LOG.info("Preparing retry this action [{0}], errorCode [{1}], userRetryCount [{2}], "
213                        + "userRetryMax [{3}], userRetryInterval [{4}]", action.getId(), errorCode, action
214                        .getUserRetryCount(), action.getUserRetryMax(), action.getUserRetryInterval());
215                int interval = action.getUserRetryInterval() * 60 * 1000;
216                action.setStatus(WorkflowAction.Status.USER_RETRY);
217                action.incrmentUserRetryCount();
218                action.setPending();
219                queue(new ActionStartXCommand(action.getId(), action.getType()), interval);
220                return true;
221            }
222            return false;
223        }
224    
225        private void incrActionErrorCounter(String type, String error, int count) {
226            getInstrumentation().incr(INSTRUMENTATION_GROUP, type + "#ex." + error, count);
227        }
228    
229        protected void incrActionCounter(String type, int count) {
230            getInstrumentation().incr(INSTRUMENTATION_GROUP, type + "#" + getName(), count);
231        }
232    
233        protected void addActionCron(String type, Instrumentation.Cron cron) {
234            getInstrumentation().addCron(INSTRUMENTATION_GROUP, type + "#" + getName(), cron);
235        }
236    
237        /**
238         * Workflow action executor context
239         *
240         */
241        public static class ActionExecutorContext implements ActionExecutor.Context {
242            private final WorkflowJobBean workflow;
243            private Configuration protoConf;
244            private final WorkflowActionBean action;
245            private final boolean isRetry;
246            private final boolean isUserRetry;
247            private boolean started;
248            private boolean ended;
249            private boolean executed;
250    
251            public ActionExecutorContext(WorkflowJobBean workflow, WorkflowActionBean action, boolean isRetry, boolean isUserRetry) {
252                this.workflow = workflow;
253                this.action = action;
254                this.isRetry = isRetry;
255                this.isUserRetry = isUserRetry;
256                try {
257                    protoConf = new XConfiguration(new StringReader(workflow.getProtoActionConf()));
258                }
259                catch (IOException ex) {
260                    throw new RuntimeException("It should not happen", ex);
261                }
262            }
263    
264            public String getCallbackUrl(String externalStatusVar) {
265                return Services.get().get(CallbackService.class).createCallBackUrl(action.getId(), externalStatusVar);
266            }
267    
268            public Configuration getProtoActionConf() {
269                return protoConf;
270            }
271    
272            public WorkflowJob getWorkflow() {
273                return workflow;
274            }
275    
276            public WorkflowAction getAction() {
277                return action;
278            }
279    
280            public ELEvaluator getELEvaluator() {
281                ELEvaluator evaluator = Services.get().get(ELService.class).createEvaluator("workflow");
282                DagELFunctions.configureEvaluator(evaluator, workflow, action);
283                return evaluator;
284            }
285    
286            public void setVar(String name, String value) {
287                name = action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + name;
288                WorkflowInstance wfInstance = workflow.getWorkflowInstance();
289                wfInstance.setVar(name, value);
290                workflow.setWorkflowInstance(wfInstance);
291            }
292    
293            public String getVar(String name) {
294                name = action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + name;
295                return workflow.getWorkflowInstance().getVar(name);
296            }
297    
298            public void setStartData(String externalId, String trackerUri, String consoleUrl) {
299                action.setStartData(externalId, trackerUri, consoleUrl);
300                started = true;
301            }
302    
303            public void setExecutionData(String externalStatus, Properties actionData) {
304                action.setExecutionData(externalStatus, actionData);
305                executed = true;
306            }
307    
308            public void setEndData(WorkflowAction.Status status, String signalValue) {
309                action.setEndData(status, signalValue);
310                ended = true;
311            }
312    
313            public boolean isRetry() {
314                return isRetry;
315            }
316    
317            public boolean isUserRetry() {
318                return isUserRetry;
319            }
320    
321            /**
322             * Returns whether setStartData has been called or not.
323             *
324             * @return true if start completion info has been set.
325             */
326            public boolean isStarted() {
327                return started;
328            }
329    
330            /**
331             * Returns whether setExecutionData has been called or not.
332             *
333             * @return true if execution completion info has been set, otherwise false.
334             */
335            public boolean isExecuted() {
336                return executed;
337            }
338    
339            /**
340             * Returns whether setEndData has been called or not.
341             *
342             * @return true if end completion info has been set.
343             */
344            public boolean isEnded() {
345                return ended;
346            }
347    
348            public void setExternalStatus(String externalStatus) {
349                action.setExternalStatus(externalStatus);
350            }
351    
352            @Override
353            public String getRecoveryId() {
354                return action.getId() + RECOVERY_ID_SEPARATOR + workflow.getRun();
355            }
356    
357            /* (non-Javadoc)
358             * @see org.apache.oozie.action.ActionExecutor.Context#getActionDir()
359             */
360            public Path getActionDir() throws HadoopAccessorException, IOException, URISyntaxException {
361                String name = getWorkflow().getId() + "/" + action.getName() + "--" + action.getType();
362                FileSystem fs = getAppFileSystem();
363                String actionDirPath = Services.get().getSystemId() + "/" + name;
364                Path fqActionDir = new Path(fs.getHomeDirectory(), actionDirPath);
365                return fqActionDir;
366            }
367    
368            /* (non-Javadoc)
369             * @see org.apache.oozie.action.ActionExecutor.Context#getAppFileSystem()
370             */
371            public FileSystem getAppFileSystem() throws HadoopAccessorException, IOException, URISyntaxException {
372                WorkflowJob workflow = getWorkflow();
373                XConfiguration jobConf = new XConfiguration(new StringReader(workflow.getConf()));
374                Configuration fsConf = new Configuration();
375                XConfiguration.copy(jobConf, fsConf);
376                return Services.get().get(HadoopAccessorService.class).createFileSystem(workflow.getUser(),
377                        workflow.getGroup(), new URI(getWorkflow().getAppPath()), fsConf);
378    
379            }
380    
381            /* (non-Javadoc)
382             * @see org.apache.oozie.action.ActionExecutor.Context#setErrorInfo(java.lang.String, java.lang.String)
383             */
384            @Override
385            public void setErrorInfo(String str, String exMsg) {
386                action.setErrorInfo(str, exMsg);
387            }
388        }
389    
390    }