001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.wf;
019    
020    import java.io.IOException;
021    import java.io.StringReader;
022    import java.net.URI;
023    import java.net.URISyntaxException;
024    import java.util.Date;
025    import java.util.Properties;
026    import java.util.Set;
027    
028    import org.apache.hadoop.conf.Configuration;
029    import org.apache.hadoop.fs.FileSystem;
030    import org.apache.hadoop.fs.Path;
031    import org.apache.oozie.DagELFunctions;
032    import org.apache.oozie.ErrorCode;
033    import org.apache.oozie.WorkflowActionBean;
034    import org.apache.oozie.WorkflowJobBean;
035    import org.apache.oozie.action.ActionExecutor;
036    import org.apache.oozie.client.WorkflowAction;
037    import org.apache.oozie.client.WorkflowJob;
038    import org.apache.oozie.command.CommandException;
039    import org.apache.oozie.command.coord.CoordActionUpdateXCommand;
040    import org.apache.oozie.service.CallbackService;
041    import org.apache.oozie.service.ELService;
042    import org.apache.oozie.service.HadoopAccessorException;
043    import org.apache.oozie.service.HadoopAccessorService;
044    import org.apache.oozie.service.JPAService;
045    import org.apache.oozie.service.LiteWorkflowStoreService;
046    import org.apache.oozie.service.Services;
047    import org.apache.oozie.util.ELEvaluator;
048    import org.apache.oozie.util.InstrumentUtils;
049    import org.apache.oozie.util.Instrumentation;
050    import org.apache.oozie.util.XConfiguration;
051    import org.apache.oozie.workflow.WorkflowException;
052    import org.apache.oozie.workflow.WorkflowInstance;
053    import org.apache.oozie.workflow.lite.LiteWorkflowInstance;
054    
055    /**
056     * Base class for Action execution commands. Provides common functionality to handle different types of errors while
057     * attempting to start or end an action.
058     */
059    public abstract class ActionXCommand<T> extends WorkflowXCommand<Void> {
060        private static final String INSTRUMENTATION_GROUP = "action.executors";
061    
062        protected static final String INSTR_FAILED_JOBS_COUNTER = "failed";
063    
064        protected static final String RECOVERY_ID_SEPARATOR = "@";
065    
066        public ActionXCommand(String name, String type, int priority) {
067            super(name, type, priority);
068        }
069    
070        /**
071         * Takes care of Transient failures. Sets the action status to retry and increments the retry count if not enough
072         * attempts have been made. Otherwise returns false.
073         *
074         * @param context the execution context.
075         * @param executor the executor instance being used.
076         * @param status the status to be set for the action.
077         * @return true if the action is scheduled for another retry. false if the number of retries has exceeded the
078         *         maximum number of configured retries.
079         * @throws CommandException thrown if unable to handle transient
080         */
081        protected boolean handleTransient(ActionExecutor.Context context, ActionExecutor executor,
082                WorkflowAction.Status status) throws CommandException {
083            LOG.debug("Attempting to retry");
084            ActionExecutorContext aContext = (ActionExecutorContext) context;
085            WorkflowActionBean action = (WorkflowActionBean) aContext.getAction();
086            incrActionErrorCounter(action.getType(), "transient", 1);
087    
088            int actionRetryCount = action.getRetries();
089            if (actionRetryCount >= executor.getMaxRetries()) {
090                LOG.warn("Exceeded max retry count [{0}]. Suspending Job", executor.getMaxRetries());
091                return false;
092            }
093            else {
094                action.setStatus(status);
095                action.setPending();
096                action.incRetries();
097                long retryDelayMillis = executor.getRetryInterval() * 1000;
098                action.setPendingAge(new Date(System.currentTimeMillis() + retryDelayMillis));
099                LOG.info("Next Retry, Attempt Number [{0}] in [{1}] milliseconds", actionRetryCount + 1, retryDelayMillis);
100                this.resetUsed();
101                queue(this, retryDelayMillis);
102                return true;
103            }
104        }
105    
106        /**
107         * Takes care of non transient failures. The job is suspended, and the state of the action is changed to *MANUAL and
108         * set pending flag of action to false
109         *
110         * @param context the execution context.
111         * @param executor the executor instance being used.
112         * @param status the status to be set for the action.
113         * @throws CommandException thrown if unable to suspend job
114         */
115        protected void handleNonTransient(ActionExecutor.Context context, ActionExecutor executor,
116                WorkflowAction.Status status) throws CommandException {
117            ActionExecutorContext aContext = (ActionExecutorContext) context;
118            WorkflowActionBean action = (WorkflowActionBean) aContext.getAction();
119            incrActionErrorCounter(action.getType(), "nontransient", 1);
120            WorkflowJobBean workflow = (WorkflowJobBean) context.getWorkflow();
121            String id = workflow.getId();
122            action.setStatus(status);
123            action.resetPendingOnly();
124            LOG.warn("Suspending Workflow Job id=" + id);
125            try {
126                SuspendXCommand.suspendJob(Services.get().get(JPAService.class), workflow, id, action.getId(), null);
127            }
128            catch (Exception e) {
129                throw new CommandException(ErrorCode.E0727, id, e.getMessage());
130            }
131            finally {
132                // update coordinator action
133                new CoordActionUpdateXCommand(workflow, 3).call();
134            }
135        }
136    
137        /**
138         * Takes care of errors. </p> For errors while attempting to start the action, the job state is updated and an
139         * {@link ActionEndCommand} is queued. </p> For errors while attempting to end the action, the job state is updated.
140         * </p>
141         *
142         * @param context the execution context.
143         * @param executor the executor instance being used.
144         * @param message
145         * @param isStart whether the error was generated while starting or ending an action.
146         * @param status the status to be set for the action.
147         * @throws CommandException thrown if unable to handle action error
148         */
149        protected void handleError(ActionExecutor.Context context, ActionExecutor executor, String message,
150                boolean isStart, WorkflowAction.Status status) throws CommandException {
151            LOG.warn("Setting Action Status to [{0}]", status);
152            ActionExecutorContext aContext = (ActionExecutorContext) context;
153            WorkflowActionBean action = (WorkflowActionBean) aContext.getAction();
154    
155            if (!handleUserRetry(action)) {
156                incrActionErrorCounter(action.getType(), "error", 1);
157                action.setPending();
158                if (isStart) {
159                    action.setExecutionData(message, null);
160                    queue(new ActionEndXCommand(action.getId(), action.getType()));
161                }
162                else {
163                    action.setEndData(status, WorkflowAction.Status.ERROR.toString());
164                }
165            }
166        }
167    
168        /**
169         * Fail the job due to failed action
170         *
171         * @param context the execution context.
172         * @throws CommandException thrown if unable to fail job
173         */
174        public void failJob(ActionExecutor.Context context) throws CommandException {
175            ActionExecutorContext aContext = (ActionExecutorContext) context;
176            WorkflowActionBean action = (WorkflowActionBean) aContext.getAction();
177            WorkflowJobBean workflow = (WorkflowJobBean) context.getWorkflow();
178    
179            if (!handleUserRetry(action)) {
180                incrActionErrorCounter(action.getType(), "failed", 1);
181                LOG.warn("Failing Job due to failed action [{0}]", action.getName());
182                try {
183                    workflow.getWorkflowInstance().fail(action.getName());
184                    WorkflowInstance wfInstance = workflow.getWorkflowInstance();
185                    ((LiteWorkflowInstance) wfInstance).setStatus(WorkflowInstance.Status.FAILED);
186                    workflow.setWorkflowInstance(wfInstance);
187                    workflow.setStatus(WorkflowJob.Status.FAILED);
188                    action.setStatus(WorkflowAction.Status.FAILED);
189                    action.resetPending();
190                    queue(new NotificationXCommand(workflow, action));
191                    queue(new KillXCommand(workflow.getId()));
192                    InstrumentUtils.incrJobCounter(INSTR_FAILED_JOBS_COUNTER, 1, getInstrumentation());
193                }
194                catch (WorkflowException ex) {
195                    throw new CommandException(ex);
196                }
197            }
198        }
199    
200        /**
201         * Execute retry for action if this action is eligible for user-retry
202         *
203         * @param context the execution context.
204         * @return true if user-retry has to be handled for this action
205         * @throws CommandException thrown if unable to fail job
206         */
207        public boolean handleUserRetry(WorkflowActionBean action) throws CommandException {
208            String errorCode = action.getErrorCode();
209            Set<String> allowedRetryCode = LiteWorkflowStoreService.getUserRetryErrorCode();
210    
211            if (allowedRetryCode.contains(errorCode) && action.getUserRetryCount() < action.getUserRetryMax()) {
212                LOG.info("Preparing retry this action [{0}], errorCode [{1}], userRetryCount [{2}], "
213                        + "userRetryMax [{3}], userRetryInterval [{4}]", action.getId(), errorCode, action
214                        .getUserRetryCount(), action.getUserRetryMax(), action.getUserRetryInterval());
215                int interval = action.getUserRetryInterval() * 60 * 1000;
216                action.setStatus(WorkflowAction.Status.USER_RETRY);
217                action.incrmentUserRetryCount();
218                action.setPending();
219                queue(new ActionStartXCommand(action.getId(), action.getType()), interval);
220                return true;
221            }
222            return false;
223        }
224    
225            /*
226             * In case of action error increment the error count for instrumentation
227             */
228        private void incrActionErrorCounter(String type, String error, int count) {
229            getInstrumentation().incr(INSTRUMENTATION_GROUP, type + "#ex." + error, count);
230        }
231    
232            /**
233             * Increment the action counter in the instrumentation log. indicating how
234             * many times the action was executed since the start Oozie server
235             */
236        protected void incrActionCounter(String type, int count) {
237            getInstrumentation().incr(INSTRUMENTATION_GROUP, type + "#" + getName(), count);
238        }
239    
240            /**
241             * Adding a cron for the instrumentation time for the given Instrumentation
242             * group
243             */
244        protected void addActionCron(String type, Instrumentation.Cron cron) {
245            getInstrumentation().addCron(INSTRUMENTATION_GROUP, type + "#" + getName(), cron);
246        }
247    
248        /**
249         * Workflow action executor context
250         *
251         */
252        public static class ActionExecutorContext implements ActionExecutor.Context {
253            private final WorkflowJobBean workflow;
254            private Configuration protoConf;
255            private final WorkflowActionBean action;
256            private final boolean isRetry;
257            private final boolean isUserRetry;
258            private boolean started;
259            private boolean ended;
260            private boolean executed;
261    
262                    /**
263                     * Constructing the ActionExecutorContext, setting the private members
264                     * and constructing the proto configuration
265                     */
266            public ActionExecutorContext(WorkflowJobBean workflow, WorkflowActionBean action, boolean isRetry, boolean isUserRetry) {
267                this.workflow = workflow;
268                this.action = action;
269                this.isRetry = isRetry;
270                this.isUserRetry = isUserRetry;
271                try {
272                    protoConf = new XConfiguration(new StringReader(workflow.getProtoActionConf()));
273                }
274                catch (IOException ex) {
275                    throw new RuntimeException("It should not happen", ex);
276                }
277            }
278    
279            /*
280             * (non-Javadoc)
281             * @see org.apache.oozie.action.ActionExecutor.Context#getCallbackUrl(java.lang.String)
282             */
283            public String getCallbackUrl(String externalStatusVar) {
284                return Services.get().get(CallbackService.class).createCallBackUrl(action.getId(), externalStatusVar);
285            }
286    
287            /*
288             * (non-Javadoc)
289             * @see org.apache.oozie.action.ActionExecutor.Context#getProtoActionConf()
290             */
291            public Configuration getProtoActionConf() {
292                return protoConf;
293            }
294    
295            /*
296             * (non-Javadoc)
297             * @see org.apache.oozie.action.ActionExecutor.Context#getWorkflow()
298             */
299            public WorkflowJob getWorkflow() {
300                return workflow;
301            }
302    
303            /**
304             * Returns the workflow action of the given action context
305             *
306             * @return the workflow action of the given action context
307             */
308            public WorkflowAction getAction() {
309                return action;
310            }
311    
312            /*
313             * (non-Javadoc)
314             * @see org.apache.oozie.action.ActionExecutor.Context#getELEvaluator()
315             */
316            public ELEvaluator getELEvaluator() {
317                ELEvaluator evaluator = Services.get().get(ELService.class).createEvaluator("workflow");
318                DagELFunctions.configureEvaluator(evaluator, workflow, action);
319                return evaluator;
320            }
321    
322            /*
323             * (non-Javadoc)
324             * @see org.apache.oozie.action.ActionExecutor.Context#setVar(java.lang.String, java.lang.String)
325             */
326            public void setVar(String name, String value) {
327                name = action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + name;
328                WorkflowInstance wfInstance = workflow.getWorkflowInstance();
329                wfInstance.setVar(name, value);
330                workflow.setWorkflowInstance(wfInstance);
331            }
332    
333            /*
334             * (non-Javadoc)
335             * @see org.apache.oozie.action.ActionExecutor.Context#getVar(java.lang.String)
336             */
337            public String getVar(String name) {
338                name = action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + name;
339                return workflow.getWorkflowInstance().getVar(name);
340            }
341    
342            /*
343             * (non-Javadoc)
344             * @see org.apache.oozie.action.ActionExecutor.Context#setStartData(java.lang.String, java.lang.String, java.lang.String)
345             */
346            public void setStartData(String externalId, String trackerUri, String consoleUrl) {
347                action.setStartData(externalId, trackerUri, consoleUrl);
348                started = true;
349            }
350    
351            /**
352             * Setting the start time of the action
353             */
354            public void setStartTime() {
355                Date now = new Date();
356                action.setStartTime(now);
357            }
358    
359            /*
360             * (non-Javadoc)
361             * @see org.apache.oozie.action.ActionExecutor.Context#setExecutionData(java.lang.String, java.util.Properties)
362             */
363            public void setExecutionData(String externalStatus, Properties actionData) {
364                action.setExecutionData(externalStatus, actionData);
365                executed = true;
366            }
367    
368            /*
369             * (non-Javadoc)
370             * @see org.apache.oozie.action.ActionExecutor.Context#setExecutionStats(java.lang.String)
371             */
372            public void setExecutionStats(String jsonStats) {
373                action.setExecutionStats(jsonStats);
374                executed = true;
375            }
376    
377            /*
378             * (non-Javadoc)
379             * @see org.apache.oozie.action.ActionExecutor.Context#setExternalChildIDs(java.lang.String)
380             */
381            public void setExternalChildIDs(String externalChildIDs) {
382                action.setExternalChildIDs(externalChildIDs);
383                executed = true;
384            }
385    
386            /*
387             * (non-Javadoc)
388             * @see org.apache.oozie.action.ActionExecutor.Context#setEndData(org.apache.oozie.client.WorkflowAction.Status, java.lang.String)
389             */
390            public void setEndData(WorkflowAction.Status status, String signalValue) {
391                action.setEndData(status, signalValue);
392                ended = true;
393            }
394    
395            /*
396             * (non-Javadoc)
397             * @see org.apache.oozie.action.ActionExecutor.Context#isRetry()
398             */
399            public boolean isRetry() {
400                return isRetry;
401            }
402    
403            /**
404             * Return if the executor invocation is a user retry or not.
405             *
406             * @return if the executor invocation is a user retry or not.
407             */
408            public boolean isUserRetry() {
409                return isUserRetry;
410            }
411    
412            /**
413             * Returns whether setStartData has been called or not.
414             *
415             * @return true if start completion info has been set.
416             */
417            public boolean isStarted() {
418                return started;
419            }
420    
421            /**
422             * Returns whether setExecutionData has been called or not.
423             *
424             * @return true if execution completion info has been set, otherwise false.
425             */
426            public boolean isExecuted() {
427                return executed;
428            }
429    
430            /**
431             * Returns whether setEndData has been called or not.
432             *
433             * @return true if end completion info has been set.
434             */
435            public boolean isEnded() {
436                return ended;
437            }
438    
439            public void setExternalStatus(String externalStatus) {
440                action.setExternalStatus(externalStatus);
441            }
442    
443            @Override
444            public String getRecoveryId() {
445                return action.getId() + RECOVERY_ID_SEPARATOR + workflow.getRun();
446            }
447    
448            /* (non-Javadoc)
449             * @see org.apache.oozie.action.ActionExecutor.Context#getActionDir()
450             */
451            public Path getActionDir() throws HadoopAccessorException, IOException, URISyntaxException {
452                String name = getWorkflow().getId() + "/" + action.getName() + "--" + action.getType();
453                FileSystem fs = getAppFileSystem();
454                String actionDirPath = Services.get().getSystemId() + "/" + name;
455                Path fqActionDir = new Path(fs.getHomeDirectory(), actionDirPath);
456                return fqActionDir;
457            }
458    
459            /* (non-Javadoc)
460             * @see org.apache.oozie.action.ActionExecutor.Context#getAppFileSystem()
461             */
462            public FileSystem getAppFileSystem() throws HadoopAccessorException, IOException, URISyntaxException {
463                WorkflowJob workflow = getWorkflow();
464                URI uri = new URI(getWorkflow().getAppPath());
465                HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
466                Configuration fsConf = has.createJobConf(uri.getAuthority());
467                return has.createFileSystem(workflow.getUser(), uri, fsConf);
468    
469            }
470    
471            /* (non-Javadoc)
472             * @see org.apache.oozie.action.ActionExecutor.Context#setErrorInfo(java.lang.String, java.lang.String)
473             */
474            @Override
475            public void setErrorInfo(String str, String exMsg) {
476                action.setErrorInfo(str, exMsg);
477            }
478        }
479    
480    }