001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 * 
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 * 
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.command.wf;
019
020import java.io.IOException;
021import java.io.StringReader;
022import java.net.URI;
023import java.net.URISyntaxException;
024import java.util.Date;
025import java.util.Properties;
026import java.util.Set;
027
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.FileSystem;
030import org.apache.hadoop.fs.Path;
031import org.apache.oozie.DagELFunctions;
032import org.apache.oozie.ErrorCode;
033import org.apache.oozie.WorkflowActionBean;
034import org.apache.oozie.WorkflowJobBean;
035import org.apache.oozie.action.ActionExecutor;
036import org.apache.oozie.client.WorkflowAction;
037import org.apache.oozie.client.WorkflowJob;
038import org.apache.oozie.command.CommandException;
039import org.apache.oozie.service.CallbackService;
040import org.apache.oozie.service.ELService;
041import org.apache.oozie.service.HadoopAccessorException;
042import org.apache.oozie.service.HadoopAccessorService;
043import org.apache.oozie.service.JPAService;
044import org.apache.oozie.service.LiteWorkflowStoreService;
045import org.apache.oozie.service.Services;
046import org.apache.oozie.util.ELEvaluator;
047import org.apache.oozie.util.InstrumentUtils;
048import org.apache.oozie.util.Instrumentation;
049import org.apache.oozie.util.XConfiguration;
050import org.apache.oozie.workflow.WorkflowException;
051import org.apache.oozie.workflow.WorkflowInstance;
052import org.apache.oozie.workflow.lite.LiteWorkflowInstance;
053
054/**
055 * Base class for Action execution commands. Provides common functionality to handle different types of errors while
056 * attempting to start or end an action.
057 */
058public abstract class ActionXCommand<T> extends WorkflowXCommand<Void> {
059    private static final String INSTRUMENTATION_GROUP = "action.executors";
060
061    protected static final String RECOVERY_ID_SEPARATOR = "@";
062
063    public ActionXCommand(String name, String type, int priority) {
064        super(name, type, priority);
065    }
066
067    /**
068     * Takes care of Transient failures. Sets the action status to retry and increments the retry count if not enough
069     * attempts have been made. Otherwise returns false.
070     *
071     * @param context the execution context.
072     * @param executor the executor instance being used.
073     * @param status the status to be set for the action.
074     * @return true if the action is scheduled for another retry. false if the number of retries has exceeded the
075     *         maximum number of configured retries.
076     * @throws CommandException thrown if unable to handle transient
077     */
078    protected boolean handleTransient(ActionExecutor.Context context, ActionExecutor executor,
079            WorkflowAction.Status status) throws CommandException {
080        LOG.debug("Attempting to retry");
081        ActionExecutorContext aContext = (ActionExecutorContext) context;
082        WorkflowActionBean action = (WorkflowActionBean) aContext.getAction();
083        incrActionErrorCounter(action.getType(), "transient", 1);
084
085        int actionRetryCount = action.getRetries();
086        if (actionRetryCount >= executor.getMaxRetries()) {
087            LOG.warn("Exceeded max retry count [{0}]. Suspending Job", executor.getMaxRetries());
088            return false;
089        }
090        else {
091            action.setStatus(status);
092            action.setPending();
093            action.incRetries();
094            long retryDelayMillis = executor.getRetryInterval() * 1000;
095            action.setPendingAge(new Date(System.currentTimeMillis() + retryDelayMillis));
096            LOG.info("Next Retry, Attempt Number [{0}] in [{1}] milliseconds", actionRetryCount + 1, retryDelayMillis);
097            this.resetUsed();
098            queue(this, retryDelayMillis);
099            return true;
100        }
101    }
102
103    /**
104     * Takes care of non transient failures. The job is suspended, and the state of the action is changed to *MANUAL and
105     * set pending flag of action to false
106     *
107     * @param context the execution context.
108     * @param executor the executor instance being used.
109     * @param status the status to be set for the action.
110     * @throws CommandException thrown if unable to suspend job
111     */
112    protected void handleNonTransient(ActionExecutor.Context context, ActionExecutor executor,
113            WorkflowAction.Status status) throws CommandException {
114        ActionExecutorContext aContext = (ActionExecutorContext) context;
115        WorkflowActionBean action = (WorkflowActionBean) aContext.getAction();
116        incrActionErrorCounter(action.getType(), "nontransient", 1);
117        WorkflowJobBean workflow = (WorkflowJobBean) context.getWorkflow();
118        String id = workflow.getId();
119        action.setStatus(status);
120        action.resetPendingOnly();
121        LOG.warn("Suspending Workflow Job id=" + id);
122        try {
123            SuspendXCommand.suspendJob(Services.get().get(JPAService.class), workflow, id, action.getId(), null);
124        }
125        catch (Exception e) {
126            throw new CommandException(ErrorCode.E0727, id, e.getMessage());
127        }
128        finally {
129            updateParentIfNecessary(workflow, 3);
130        }
131    }
132
133    /**
134     * Takes care of errors. </p> For errors while attempting to start the action, the job state is updated and an
135     * {@link ActionEndCommand} is queued. </p> For errors while attempting to end the action, the job state is updated.
136     * </p>
137     *
138     * @param context the execution context.
139     * @param executor the executor instance being used.
140     * @param message
141     * @param isStart whether the error was generated while starting or ending an action.
142     * @param status the status to be set for the action.
143     * @throws CommandException thrown if unable to handle action error
144     */
145    protected void handleError(ActionExecutor.Context context, ActionExecutor executor, String message,
146            boolean isStart, WorkflowAction.Status status) throws CommandException {
147        LOG.warn("Setting Action Status to [{0}]", status);
148        ActionExecutorContext aContext = (ActionExecutorContext) context;
149        WorkflowActionBean action = (WorkflowActionBean) aContext.getAction();
150
151        if (!handleUserRetry(action)) {
152            incrActionErrorCounter(action.getType(), "error", 1);
153            action.setPending();
154            if (isStart) {
155                action.setExecutionData(message, null);
156                queue(new ActionEndXCommand(action.getId(), action.getType()));
157            }
158            else {
159                action.setEndData(status, WorkflowAction.Status.ERROR.toString());
160            }
161        }
162    }
163
164    /**
165     * Fail the job due to failed action
166     *
167     * @param context the execution context.
168     * @throws CommandException thrown if unable to fail job
169     */
170    public void failJob(ActionExecutor.Context context) throws CommandException {
171        ActionExecutorContext aContext = (ActionExecutorContext) context;
172        WorkflowActionBean action = (WorkflowActionBean) aContext.getAction();
173        failJob(context, action);
174    }
175
176    /**
177     * Fail the job due to failed action
178     *
179     * @param context the execution context.
180     * @param action the action that caused the workflow to fail
181     * @throws CommandException thrown if unable to fail job
182     */
183    public void failJob(ActionExecutor.Context context, WorkflowActionBean action) throws CommandException {
184        WorkflowJobBean workflow = (WorkflowJobBean) context.getWorkflow();
185        if (!handleUserRetry(action)) {
186            incrActionErrorCounter(action.getType(), "failed", 1);
187            LOG.warn("Failing Job due to failed action [{0}]", action.getName());
188            try {
189                workflow.getWorkflowInstance().fail(action.getName());
190                WorkflowInstance wfInstance = workflow.getWorkflowInstance();
191                ((LiteWorkflowInstance) wfInstance).setStatus(WorkflowInstance.Status.FAILED);
192                workflow.setWorkflowInstance(wfInstance);
193                workflow.setStatus(WorkflowJob.Status.FAILED);
194                action.setStatus(WorkflowAction.Status.FAILED);
195                action.resetPending();
196                queue(new NotificationXCommand(workflow, action));
197                queue(new KillXCommand(workflow.getId()));
198                InstrumentUtils.incrJobCounter(INSTR_FAILED_JOBS_COUNTER_NAME, 1, getInstrumentation());
199            }
200            catch (WorkflowException ex) {
201                throw new CommandException(ex);
202            }
203        }
204    }
205
206    /**
207     * Execute retry for action if this action is eligible for user-retry
208     *
209     * @param context the execution context.
210     * @return true if user-retry has to be handled for this action
211     * @throws CommandException thrown if unable to fail job
212     */
213    public boolean handleUserRetry(WorkflowActionBean action) throws CommandException {
214        String errorCode = action.getErrorCode();
215        Set<String> allowedRetryCode = LiteWorkflowStoreService.getUserRetryErrorCode();
216
217        if (allowedRetryCode.contains(errorCode) && action.getUserRetryCount() < action.getUserRetryMax()) {
218            LOG.info("Preparing retry this action [{0}], errorCode [{1}], userRetryCount [{2}], "
219                    + "userRetryMax [{3}], userRetryInterval [{4}]", action.getId(), errorCode, action
220                    .getUserRetryCount(), action.getUserRetryMax(), action.getUserRetryInterval());
221            int interval = action.getUserRetryInterval() * 60 * 1000;
222            action.setStatus(WorkflowAction.Status.USER_RETRY);
223            action.incrmentUserRetryCount();
224            action.setPending();
225            queue(new ActionStartXCommand(action.getId(), action.getType()), interval);
226            return true;
227        }
228        return false;
229    }
230
231        /*
232         * In case of action error increment the error count for instrumentation
233         */
234    private void incrActionErrorCounter(String type, String error, int count) {
235        getInstrumentation().incr(INSTRUMENTATION_GROUP, type + "#ex." + error, count);
236    }
237
238        /**
239         * Increment the action counter in the instrumentation log. indicating how
240         * many times the action was executed since the start Oozie server
241         */
242    protected void incrActionCounter(String type, int count) {
243        getInstrumentation().incr(INSTRUMENTATION_GROUP, type + "#" + getName(), count);
244    }
245
246        /**
247         * Adding a cron for the instrumentation time for the given Instrumentation
248         * group
249         */
250    protected void addActionCron(String type, Instrumentation.Cron cron) {
251        getInstrumentation().addCron(INSTRUMENTATION_GROUP, type + "#" + getName(), cron);
252    }
253
254    /**
255     * Workflow action executor context
256     *
257     */
258    public static class ActionExecutorContext implements ActionExecutor.Context {
259        private final WorkflowJobBean workflow;
260        private Configuration protoConf;
261        private final WorkflowActionBean action;
262        private final boolean isRetry;
263        private final boolean isUserRetry;
264        private boolean started;
265        private boolean ended;
266        private boolean executed;
267
268                /**
269                 * Constructing the ActionExecutorContext, setting the private members
270                 * and constructing the proto configuration
271                 */
272        public ActionExecutorContext(WorkflowJobBean workflow, WorkflowActionBean action, boolean isRetry, boolean isUserRetry) {
273            this.workflow = workflow;
274            this.action = action;
275            this.isRetry = isRetry;
276            this.isUserRetry = isUserRetry;
277            try {
278                protoConf = new XConfiguration(new StringReader(workflow.getProtoActionConf()));
279            }
280            catch (IOException ex) {
281                throw new RuntimeException("It should not happen", ex);
282            }
283        }
284
285        /*
286         * (non-Javadoc)
287         * @see org.apache.oozie.action.ActionExecutor.Context#getCallbackUrl(java.lang.String)
288         */
289        public String getCallbackUrl(String externalStatusVar) {
290            return Services.get().get(CallbackService.class).createCallBackUrl(action.getId(), externalStatusVar);
291        }
292
293        /*
294         * (non-Javadoc)
295         * @see org.apache.oozie.action.ActionExecutor.Context#getProtoActionConf()
296         */
297        public Configuration getProtoActionConf() {
298            return protoConf;
299        }
300
301        /*
302         * (non-Javadoc)
303         * @see org.apache.oozie.action.ActionExecutor.Context#getWorkflow()
304         */
305        public WorkflowJob getWorkflow() {
306            return workflow;
307        }
308
309        /**
310         * Returns the workflow action of the given action context
311         *
312         * @return the workflow action of the given action context
313         */
314        public WorkflowAction getAction() {
315            return action;
316        }
317
318        /*
319         * (non-Javadoc)
320         * @see org.apache.oozie.action.ActionExecutor.Context#getELEvaluator()
321         */
322        public ELEvaluator getELEvaluator() {
323            ELEvaluator evaluator = Services.get().get(ELService.class).createEvaluator("workflow");
324            DagELFunctions.configureEvaluator(evaluator, workflow, action);
325            return evaluator;
326        }
327
328        /*
329         * (non-Javadoc)
330         * @see org.apache.oozie.action.ActionExecutor.Context#setVar(java.lang.String, java.lang.String)
331         */
332        public void setVar(String name, String value) {
333            name = action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + name;
334            WorkflowInstance wfInstance = workflow.getWorkflowInstance();
335            wfInstance.setVar(name, value);
336            workflow.setWorkflowInstance(wfInstance);
337        }
338
339        /*
340         * (non-Javadoc)
341         * @see org.apache.oozie.action.ActionExecutor.Context#getVar(java.lang.String)
342         */
343        public String getVar(String name) {
344            name = action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + name;
345            return workflow.getWorkflowInstance().getVar(name);
346        }
347
348        /*
349         * (non-Javadoc)
350         * @see org.apache.oozie.action.ActionExecutor.Context#setStartData(java.lang.String, java.lang.String, java.lang.String)
351         */
352        public void setStartData(String externalId, String trackerUri, String consoleUrl) {
353            action.setStartData(externalId, trackerUri, consoleUrl);
354            started = true;
355        }
356
357        /**
358         * Setting the start time of the action
359         */
360        public void setStartTime() {
361            Date now = new Date();
362            action.setStartTime(now);
363        }
364
365        /*
366         * (non-Javadoc)
367         * @see org.apache.oozie.action.ActionExecutor.Context#setExecutionData(java.lang.String, java.util.Properties)
368         */
369        public void setExecutionData(String externalStatus, Properties actionData) {
370            action.setExecutionData(externalStatus, actionData);
371            executed = true;
372        }
373
374        /*
375         * (non-Javadoc)
376         * @see org.apache.oozie.action.ActionExecutor.Context#setExecutionStats(java.lang.String)
377         */
378        public void setExecutionStats(String jsonStats) {
379            action.setExecutionStats(jsonStats);
380            executed = true;
381        }
382
383        /*
384         * (non-Javadoc)
385         * @see org.apache.oozie.action.ActionExecutor.Context#setExternalChildIDs(java.lang.String)
386         */
387        public void setExternalChildIDs(String externalChildIDs) {
388            action.setExternalChildIDs(externalChildIDs);
389            executed = true;
390        }
391
392        /*
393         * (non-Javadoc)
394         * @see org.apache.oozie.action.ActionExecutor.Context#setEndData(org.apache.oozie.client.WorkflowAction.Status, java.lang.String)
395         */
396        public void setEndData(WorkflowAction.Status status, String signalValue) {
397            action.setEndData(status, signalValue);
398            ended = true;
399        }
400
401        /*
402         * (non-Javadoc)
403         * @see org.apache.oozie.action.ActionExecutor.Context#isRetry()
404         */
405        public boolean isRetry() {
406            return isRetry;
407        }
408
409        /**
410         * Return if the executor invocation is a user retry or not.
411         *
412         * @return if the executor invocation is a user retry or not.
413         */
414        public boolean isUserRetry() {
415            return isUserRetry;
416        }
417
418        /**
419         * Returns whether setStartData has been called or not.
420         *
421         * @return true if start completion info has been set.
422         */
423        public boolean isStarted() {
424            return started;
425        }
426
427        /**
428         * Returns whether setExecutionData has been called or not.
429         *
430         * @return true if execution completion info has been set, otherwise false.
431         */
432        public boolean isExecuted() {
433            return executed;
434        }
435
436        /**
437         * Returns whether setEndData has been called or not.
438         *
439         * @return true if end completion info has been set.
440         */
441        public boolean isEnded() {
442            return ended;
443        }
444
445        public void setExternalStatus(String externalStatus) {
446            action.setExternalStatus(externalStatus);
447        }
448
449        @Override
450        public String getRecoveryId() {
451            return action.getId() + RECOVERY_ID_SEPARATOR + workflow.getRun();
452        }
453
454        /* (non-Javadoc)
455         * @see org.apache.oozie.action.ActionExecutor.Context#getActionDir()
456         */
457        public Path getActionDir() throws HadoopAccessorException, IOException, URISyntaxException {
458            String name = getWorkflow().getId() + "/" + action.getName() + "--" + action.getType();
459            FileSystem fs = getAppFileSystem();
460            String actionDirPath = Services.get().getSystemId() + "/" + name;
461            Path fqActionDir = new Path(fs.getHomeDirectory(), actionDirPath);
462            return fqActionDir;
463        }
464
465        /* (non-Javadoc)
466         * @see org.apache.oozie.action.ActionExecutor.Context#getAppFileSystem()
467         */
468        public FileSystem getAppFileSystem() throws HadoopAccessorException, IOException, URISyntaxException {
469            WorkflowJob workflow = getWorkflow();
470            URI uri = new URI(getWorkflow().getAppPath());
471            HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
472            Configuration fsConf = has.createJobConf(uri.getAuthority());
473            return has.createFileSystem(workflow.getUser(), uri, fsConf);
474
475        }
476
477        /* (non-Javadoc)
478         * @see org.apache.oozie.action.ActionExecutor.Context#setErrorInfo(java.lang.String, java.lang.String)
479         */
480        @Override
481        public void setErrorInfo(String str, String exMsg) {
482            action.setErrorInfo(str, exMsg);
483        }
484    }
485
486}