001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.wf;
019    
020    import java.io.IOException;
021    import java.io.StringReader;
022    import java.net.URI;
023    import java.net.URISyntaxException;
024    import java.util.Date;
025    import java.util.Properties;
026    import java.util.Set;
027    
028    import org.apache.hadoop.conf.Configuration;
029    import org.apache.hadoop.fs.FileSystem;
030    import org.apache.hadoop.fs.Path;
031    import org.apache.oozie.DagELFunctions;
032    import org.apache.oozie.ErrorCode;
033    import org.apache.oozie.WorkflowActionBean;
034    import org.apache.oozie.WorkflowJobBean;
035    import org.apache.oozie.action.ActionExecutor;
036    import org.apache.oozie.client.WorkflowAction;
037    import org.apache.oozie.client.WorkflowJob;
038    import org.apache.oozie.command.CommandException;
039    import org.apache.oozie.command.coord.CoordActionUpdateXCommand;
040    import org.apache.oozie.service.CallbackService;
041    import org.apache.oozie.service.ELService;
042    import org.apache.oozie.service.HadoopAccessorException;
043    import org.apache.oozie.service.HadoopAccessorService;
044    import org.apache.oozie.service.JPAService;
045    import org.apache.oozie.service.LiteWorkflowStoreService;
046    import org.apache.oozie.service.Services;
047    import org.apache.oozie.util.ELEvaluator;
048    import org.apache.oozie.util.InstrumentUtils;
049    import org.apache.oozie.util.Instrumentation;
050    import org.apache.oozie.util.XConfiguration;
051    import org.apache.oozie.workflow.WorkflowException;
052    import org.apache.oozie.workflow.WorkflowInstance;
053    import org.apache.oozie.workflow.lite.LiteWorkflowInstance;
054    
055    /**
056     * Base class for Action execution commands. Provides common functionality to handle different types of errors while
057     * attempting to start or end an action.
058     */
059    public abstract class ActionXCommand<T> extends WorkflowXCommand<Void> {
060        private static final String INSTRUMENTATION_GROUP = "action.executors";
061    
062        protected static final String INSTR_FAILED_JOBS_COUNTER = "failed";
063    
064        protected static final String RECOVERY_ID_SEPARATOR = "@";
065    
066        public ActionXCommand(String name, String type, int priority) {
067            super(name, type, priority);
068        }
069    
070        /**
071         * Takes care of Transient failures. Sets the action status to retry and increments the retry count if not enough
072         * attempts have been made. Otherwise returns false.
073         *
074         * @param context the execution context.
075         * @param executor the executor instance being used.
076         * @param status the status to be set for the action.
077         * @return true if the action is scheduled for another retry. false if the number of retries has exceeded the
078         *         maximum number of configured retries.
079         * @throws CommandException thrown if unable to handle transient
080         */
081        protected boolean handleTransient(ActionExecutor.Context context, ActionExecutor executor,
082                WorkflowAction.Status status) throws CommandException {
083            LOG.debug("Attempting to retry");
084            ActionExecutorContext aContext = (ActionExecutorContext) context;
085            WorkflowActionBean action = (WorkflowActionBean) aContext.getAction();
086            incrActionErrorCounter(action.getType(), "transient", 1);
087    
088            int actionRetryCount = action.getRetries();
089            if (actionRetryCount >= executor.getMaxRetries()) {
090                LOG.warn("Exceeded max retry count [{0}]. Suspending Job", executor.getMaxRetries());
091                return false;
092            }
093            else {
094                action.setStatus(status);
095                action.setPending();
096                action.incRetries();
097                long retryDelayMillis = executor.getRetryInterval() * 1000;
098                action.setPendingAge(new Date(System.currentTimeMillis() + retryDelayMillis));
099                LOG.info("Next Retry, Attempt Number [{0}] in [{1}] milliseconds", actionRetryCount + 1, retryDelayMillis);
100                this.resetUsed();
101                queue(this, retryDelayMillis);
102                return true;
103            }
104        }
105    
106        /**
107         * Takes care of non transient failures. The job is suspended, and the state of the action is changed to *MANUAL and
108         * set pending flag of action to false
109         *
110         * @param context the execution context.
111         * @param executor the executor instance being used.
112         * @param status the status to be set for the action.
113         * @throws CommandException thrown if unable to suspend job
114         */
115        protected void handleNonTransient(ActionExecutor.Context context, ActionExecutor executor,
116                WorkflowAction.Status status) throws CommandException {
117            ActionExecutorContext aContext = (ActionExecutorContext) context;
118            WorkflowActionBean action = (WorkflowActionBean) aContext.getAction();
119            incrActionErrorCounter(action.getType(), "nontransient", 1);
120            WorkflowJobBean workflow = (WorkflowJobBean) context.getWorkflow();
121            String id = workflow.getId();
122            action.setStatus(status);
123            action.resetPendingOnly();
124            LOG.warn("Suspending Workflow Job id=" + id);
125            try {
126                SuspendXCommand.suspendJob(Services.get().get(JPAService.class), workflow, id, action.getId(), null);
127            }
128            catch (Exception e) {
129                throw new CommandException(ErrorCode.E0727, id, e.getMessage());
130            }
131            finally {
132                // update coordinator action
133                new CoordActionUpdateXCommand(workflow, 3).call();
134            }
135        }
136    
137        /**
138         * Takes care of errors. </p> For errors while attempting to start the action, the job state is updated and an
139         * {@link ActionEndCommand} is queued. </p> For errors while attempting to end the action, the job state is updated.
140         * </p>
141         *
142         * @param context the execution context.
143         * @param executor the executor instance being used.
144         * @param message
145         * @param isStart whether the error was generated while starting or ending an action.
146         * @param status the status to be set for the action.
147         * @throws CommandException thrown if unable to handle action error
148         */
149        protected void handleError(ActionExecutor.Context context, ActionExecutor executor, String message,
150                boolean isStart, WorkflowAction.Status status) throws CommandException {
151            LOG.warn("Setting Action Status to [{0}]", status);
152            ActionExecutorContext aContext = (ActionExecutorContext) context;
153            WorkflowActionBean action = (WorkflowActionBean) aContext.getAction();
154    
155            if (!handleUserRetry(action)) {
156                incrActionErrorCounter(action.getType(), "error", 1);
157                action.setPending();
158                if (isStart) {
159                    action.setExecutionData(message, null);
160                    queue(new ActionEndXCommand(action.getId(), action.getType()));
161                }
162                else {
163                    action.setEndData(status, WorkflowAction.Status.ERROR.toString());
164                }
165            }
166        }
167    
168        /**
169         * Fail the job due to failed action
170         *
171         * @param context the execution context.
172         * @throws CommandException thrown if unable to fail job
173         */
174        public void failJob(ActionExecutor.Context context) throws CommandException {
175            ActionExecutorContext aContext = (ActionExecutorContext) context;
176            WorkflowActionBean action = (WorkflowActionBean) aContext.getAction();
177            failJob(context, action);
178        }
179    
180        /**
181         * Fail the job due to failed action
182         *
183         * @param context the execution context.
184         * @param action the action that caused the workflow to fail
185         * @throws CommandException thrown if unable to fail job
186         */
187        public void failJob(ActionExecutor.Context context, WorkflowActionBean action) throws CommandException {
188            WorkflowJobBean workflow = (WorkflowJobBean) context.getWorkflow();
189            if (!handleUserRetry(action)) {
190                incrActionErrorCounter(action.getType(), "failed", 1);
191                LOG.warn("Failing Job due to failed action [{0}]", action.getName());
192                try {
193                    workflow.getWorkflowInstance().fail(action.getName());
194                    WorkflowInstance wfInstance = workflow.getWorkflowInstance();
195                    ((LiteWorkflowInstance) wfInstance).setStatus(WorkflowInstance.Status.FAILED);
196                    workflow.setWorkflowInstance(wfInstance);
197                    workflow.setStatus(WorkflowJob.Status.FAILED);
198                    action.setStatus(WorkflowAction.Status.FAILED);
199                    action.resetPending();
200                    queue(new NotificationXCommand(workflow, action));
201                    queue(new KillXCommand(workflow.getId()));
202                    InstrumentUtils.incrJobCounter(INSTR_FAILED_JOBS_COUNTER, 1, getInstrumentation());
203                }
204                catch (WorkflowException ex) {
205                    throw new CommandException(ex);
206                }
207            }
208        }
209    
210        /**
211         * Execute retry for action if this action is eligible for user-retry
212         *
213         * @param context the execution context.
214         * @return true if user-retry has to be handled for this action
215         * @throws CommandException thrown if unable to fail job
216         */
217        public boolean handleUserRetry(WorkflowActionBean action) throws CommandException {
218            String errorCode = action.getErrorCode();
219            Set<String> allowedRetryCode = LiteWorkflowStoreService.getUserRetryErrorCode();
220    
221            if (allowedRetryCode.contains(errorCode) && action.getUserRetryCount() < action.getUserRetryMax()) {
222                LOG.info("Preparing retry this action [{0}], errorCode [{1}], userRetryCount [{2}], "
223                        + "userRetryMax [{3}], userRetryInterval [{4}]", action.getId(), errorCode, action
224                        .getUserRetryCount(), action.getUserRetryMax(), action.getUserRetryInterval());
225                int interval = action.getUserRetryInterval() * 60 * 1000;
226                action.setStatus(WorkflowAction.Status.USER_RETRY);
227                action.incrmentUserRetryCount();
228                action.setPending();
229                queue(new ActionStartXCommand(action.getId(), action.getType()), interval);
230                return true;
231            }
232            return false;
233        }
234    
235            /*
236             * In case of action error increment the error count for instrumentation
237             */
238        private void incrActionErrorCounter(String type, String error, int count) {
239            getInstrumentation().incr(INSTRUMENTATION_GROUP, type + "#ex." + error, count);
240        }
241    
242            /**
243             * Increment the action counter in the instrumentation log. indicating how
244             * many times the action was executed since the start Oozie server
245             */
246        protected void incrActionCounter(String type, int count) {
247            getInstrumentation().incr(INSTRUMENTATION_GROUP, type + "#" + getName(), count);
248        }
249    
250            /**
251             * Adding a cron for the instrumentation time for the given Instrumentation
252             * group
253             */
254        protected void addActionCron(String type, Instrumentation.Cron cron) {
255            getInstrumentation().addCron(INSTRUMENTATION_GROUP, type + "#" + getName(), cron);
256        }
257    
258        /**
259         * Workflow action executor context
260         *
261         */
262        public static class ActionExecutorContext implements ActionExecutor.Context {
263            private final WorkflowJobBean workflow;
264            private Configuration protoConf;
265            private final WorkflowActionBean action;
266            private final boolean isRetry;
267            private final boolean isUserRetry;
268            private boolean started;
269            private boolean ended;
270            private boolean executed;
271    
272                    /**
273                     * Constructing the ActionExecutorContext, setting the private members
274                     * and constructing the proto configuration
275                     */
276            public ActionExecutorContext(WorkflowJobBean workflow, WorkflowActionBean action, boolean isRetry, boolean isUserRetry) {
277                this.workflow = workflow;
278                this.action = action;
279                this.isRetry = isRetry;
280                this.isUserRetry = isUserRetry;
281                try {
282                    protoConf = new XConfiguration(new StringReader(workflow.getProtoActionConf()));
283                }
284                catch (IOException ex) {
285                    throw new RuntimeException("It should not happen", ex);
286                }
287            }
288    
289            /*
290             * (non-Javadoc)
291             * @see org.apache.oozie.action.ActionExecutor.Context#getCallbackUrl(java.lang.String)
292             */
293            public String getCallbackUrl(String externalStatusVar) {
294                return Services.get().get(CallbackService.class).createCallBackUrl(action.getId(), externalStatusVar);
295            }
296    
297            /*
298             * (non-Javadoc)
299             * @see org.apache.oozie.action.ActionExecutor.Context#getProtoActionConf()
300             */
301            public Configuration getProtoActionConf() {
302                return protoConf;
303            }
304    
305            /*
306             * (non-Javadoc)
307             * @see org.apache.oozie.action.ActionExecutor.Context#getWorkflow()
308             */
309            public WorkflowJob getWorkflow() {
310                return workflow;
311            }
312    
313            /**
314             * Returns the workflow action of the given action context
315             *
316             * @return the workflow action of the given action context
317             */
318            public WorkflowAction getAction() {
319                return action;
320            }
321    
322            /*
323             * (non-Javadoc)
324             * @see org.apache.oozie.action.ActionExecutor.Context#getELEvaluator()
325             */
326            public ELEvaluator getELEvaluator() {
327                ELEvaluator evaluator = Services.get().get(ELService.class).createEvaluator("workflow");
328                DagELFunctions.configureEvaluator(evaluator, workflow, action);
329                return evaluator;
330            }
331    
332            /*
333             * (non-Javadoc)
334             * @see org.apache.oozie.action.ActionExecutor.Context#setVar(java.lang.String, java.lang.String)
335             */
336            public void setVar(String name, String value) {
337                name = action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + name;
338                WorkflowInstance wfInstance = workflow.getWorkflowInstance();
339                wfInstance.setVar(name, value);
340                workflow.setWorkflowInstance(wfInstance);
341            }
342    
343            /*
344             * (non-Javadoc)
345             * @see org.apache.oozie.action.ActionExecutor.Context#getVar(java.lang.String)
346             */
347            public String getVar(String name) {
348                name = action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + name;
349                return workflow.getWorkflowInstance().getVar(name);
350            }
351    
352            /*
353             * (non-Javadoc)
354             * @see org.apache.oozie.action.ActionExecutor.Context#setStartData(java.lang.String, java.lang.String, java.lang.String)
355             */
356            public void setStartData(String externalId, String trackerUri, String consoleUrl) {
357                action.setStartData(externalId, trackerUri, consoleUrl);
358                started = true;
359            }
360    
361            /**
362             * Setting the start time of the action
363             */
364            public void setStartTime() {
365                Date now = new Date();
366                action.setStartTime(now);
367            }
368    
369            /*
370             * (non-Javadoc)
371             * @see org.apache.oozie.action.ActionExecutor.Context#setExecutionData(java.lang.String, java.util.Properties)
372             */
373            public void setExecutionData(String externalStatus, Properties actionData) {
374                action.setExecutionData(externalStatus, actionData);
375                executed = true;
376            }
377    
378            /*
379             * (non-Javadoc)
380             * @see org.apache.oozie.action.ActionExecutor.Context#setExecutionStats(java.lang.String)
381             */
382            public void setExecutionStats(String jsonStats) {
383                action.setExecutionStats(jsonStats);
384                executed = true;
385            }
386    
387            /*
388             * (non-Javadoc)
389             * @see org.apache.oozie.action.ActionExecutor.Context#setExternalChildIDs(java.lang.String)
390             */
391            public void setExternalChildIDs(String externalChildIDs) {
392                action.setExternalChildIDs(externalChildIDs);
393                executed = true;
394            }
395    
396            /*
397             * (non-Javadoc)
398             * @see org.apache.oozie.action.ActionExecutor.Context#setEndData(org.apache.oozie.client.WorkflowAction.Status, java.lang.String)
399             */
400            public void setEndData(WorkflowAction.Status status, String signalValue) {
401                action.setEndData(status, signalValue);
402                ended = true;
403            }
404    
405            /*
406             * (non-Javadoc)
407             * @see org.apache.oozie.action.ActionExecutor.Context#isRetry()
408             */
409            public boolean isRetry() {
410                return isRetry;
411            }
412    
413            /**
414             * Return if the executor invocation is a user retry or not.
415             *
416             * @return if the executor invocation is a user retry or not.
417             */
418            public boolean isUserRetry() {
419                return isUserRetry;
420            }
421    
422            /**
423             * Returns whether setStartData has been called or not.
424             *
425             * @return true if start completion info has been set.
426             */
427            public boolean isStarted() {
428                return started;
429            }
430    
431            /**
432             * Returns whether setExecutionData has been called or not.
433             *
434             * @return true if execution completion info has been set, otherwise false.
435             */
436            public boolean isExecuted() {
437                return executed;
438            }
439    
440            /**
441             * Returns whether setEndData has been called or not.
442             *
443             * @return true if end completion info has been set.
444             */
445            public boolean isEnded() {
446                return ended;
447            }
448    
449            public void setExternalStatus(String externalStatus) {
450                action.setExternalStatus(externalStatus);
451            }
452    
453            @Override
454            public String getRecoveryId() {
455                return action.getId() + RECOVERY_ID_SEPARATOR + workflow.getRun();
456            }
457    
458            /* (non-Javadoc)
459             * @see org.apache.oozie.action.ActionExecutor.Context#getActionDir()
460             */
461            public Path getActionDir() throws HadoopAccessorException, IOException, URISyntaxException {
462                String name = getWorkflow().getId() + "/" + action.getName() + "--" + action.getType();
463                FileSystem fs = getAppFileSystem();
464                String actionDirPath = Services.get().getSystemId() + "/" + name;
465                Path fqActionDir = new Path(fs.getHomeDirectory(), actionDirPath);
466                return fqActionDir;
467            }
468    
469            /* (non-Javadoc)
470             * @see org.apache.oozie.action.ActionExecutor.Context#getAppFileSystem()
471             */
472            public FileSystem getAppFileSystem() throws HadoopAccessorException, IOException, URISyntaxException {
473                WorkflowJob workflow = getWorkflow();
474                URI uri = new URI(getWorkflow().getAppPath());
475                HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
476                Configuration fsConf = has.createJobConf(uri.getAuthority());
477                return has.createFileSystem(workflow.getUser(), uri, fsConf);
478    
479            }
480    
481            /* (non-Javadoc)
482             * @see org.apache.oozie.action.ActionExecutor.Context#setErrorInfo(java.lang.String, java.lang.String)
483             */
484            @Override
485            public void setErrorInfo(String str, String exMsg) {
486                action.setErrorInfo(str, exMsg);
487            }
488        }
489    
490    }