001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.command.wf;
020
021import java.io.IOException;
022import java.io.StringReader;
023import java.net.URI;
024import java.net.URISyntaxException;
025import java.util.Date;
026import java.util.HashMap;
027import java.util.Map;
028import java.util.Properties;
029import java.util.Set;
030
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.oozie.DagELFunctions;
035import org.apache.oozie.ErrorCode;
036import org.apache.oozie.WorkflowActionBean;
037import org.apache.oozie.WorkflowJobBean;
038import org.apache.oozie.action.ActionExecutor;
039import org.apache.oozie.client.Job;
040import org.apache.oozie.client.WorkflowAction;
041import org.apache.oozie.client.WorkflowJob;
042import org.apache.oozie.command.CommandException;
043import org.apache.oozie.service.CallbackService;
044import org.apache.oozie.service.ConfigurationService;
045import org.apache.oozie.service.ELService;
046import org.apache.oozie.service.HadoopAccessorException;
047import org.apache.oozie.service.HadoopAccessorService;
048import org.apache.oozie.service.JPAService;
049import org.apache.oozie.service.LiteWorkflowStoreService;
050import org.apache.oozie.service.Services;
051import org.apache.oozie.util.ELEvaluator;
052import org.apache.oozie.util.InstrumentUtils;
053import org.apache.oozie.util.Instrumentation;
054import org.apache.oozie.util.XConfiguration;
055import org.apache.oozie.workflow.WorkflowException;
056import org.apache.oozie.workflow.WorkflowInstance;
057import org.apache.oozie.workflow.lite.LiteWorkflowApp;
058import org.apache.oozie.workflow.lite.LiteWorkflowInstance;
059import org.apache.oozie.workflow.lite.NodeDef;
060
061/**
062 * Base class for Action execution commands. Provides common functionality to handle different types of errors while
063 * attempting to start or end an action.
064 */
065public abstract class ActionXCommand<T> extends WorkflowXCommand<T> {
066    private static final String INSTRUMENTATION_GROUP = "action.executors";
067
068    protected static final String RECOVERY_ID_SEPARATOR = "@";
069
070    public ActionXCommand(String name, String type, int priority) {
071        super(name, type, priority);
072    }
073
074    /**
075     * Takes care of Transient failures. Sets the action status to retry and increments the retry count if not enough
076     * attempts have been made. Otherwise returns false.
077     *
078     * @param context the execution context.
079     * @param executor the executor instance being used.
080     * @param status the status to be set for the action.
081     * @return true if the action is scheduled for another retry. false if the number of retries has exceeded the
082     *         maximum number of configured retries.
083     * @throws CommandException thrown if unable to handle transient
084     */
085    protected boolean handleTransient(ActionExecutor.Context context, ActionExecutor executor,
086            WorkflowAction.Status status) throws CommandException {
087        LOG.debug("Attempting to retry");
088        ActionExecutorContext aContext = (ActionExecutorContext) context;
089        WorkflowActionBean action = (WorkflowActionBean) aContext.getAction();
090        incrActionErrorCounter(action.getType(), "transient", 1);
091
092        int actionRetryCount = action.getRetries();
093        if (actionRetryCount >= executor.getMaxRetries()) {
094            LOG.warn("Exceeded max retry count [{0}]. Suspending Job", executor.getMaxRetries());
095            return false;
096        }
097        else {
098            action.setStatus(status);
099            action.setPending();
100            action.incRetries();
101            long retryDelayMillis = getRetryDelay(actionRetryCount, executor.getRetryInterval(), executor.getRetryPolicy());
102            action.setPendingAge(new Date(System.currentTimeMillis() + retryDelayMillis));
103            LOG.info("Next Retry, Attempt Number [{0}] in [{1}] milliseconds", actionRetryCount + 1, retryDelayMillis);
104            this.resetUsed();
105            queueCommandForTransientFailure(retryDelayMillis);
106            return true;
107        }
108    }
109
110    protected void queueCommandForTransientFailure(long retryDelayMillis){
111        queue(this, retryDelayMillis);
112    }
113    /**
114     * Takes care of non transient failures. The job is suspended, and the state of the action is changed to *MANUAL and
115     * set pending flag of action to false
116     *
117     * @param context the execution context.
118     * @param executor the executor instance being used.
119     * @param status the status to be set for the action.
120     * @throws CommandException thrown if unable to suspend job
121     */
122    protected void handleNonTransient(ActionExecutor.Context context, ActionExecutor executor,
123            WorkflowAction.Status status) throws CommandException {
124        ActionExecutorContext aContext = (ActionExecutorContext) context;
125        WorkflowActionBean action = (WorkflowActionBean) aContext.getAction();
126        incrActionErrorCounter(action.getType(), "nontransient", 1);
127        WorkflowJobBean workflow = (WorkflowJobBean) context.getWorkflow();
128        String id = workflow.getId();
129        action.setStatus(status);
130        action.resetPendingOnly();
131        LOG.warn("Suspending Workflow Job id=" + id);
132        try {
133            SuspendXCommand.suspendJob(Services.get().get(JPAService.class), workflow, id, action.getId(), null);
134        }
135        catch (Exception e) {
136            throw new CommandException(ErrorCode.E0727, id, e.getMessage());
137        }
138        finally {
139            updateParentIfNecessary(workflow, 3);
140        }
141    }
142
143    /**
144     * Takes care of errors. <p> For errors while attempting to start the action, the job state is updated and an
145     * {@link ActionEndXCommand} is queued. <p> For errors while attempting to end the action, the job state is updated.
146     * <p>
147     *
148     * @param context the execution context.
149     * @param executor the executor instance being used.
150     * @param message
151     * @param isStart whether the error was generated while starting or ending an action.
152     * @param status the status to be set for the action.
153     * @throws CommandException thrown if unable to handle action error
154     */
155    protected void handleError(ActionExecutor.Context context, ActionExecutor executor, String message,
156            boolean isStart, WorkflowAction.Status status) throws CommandException {
157        LOG.warn("Setting Action Status to [{0}]", status);
158        ActionExecutorContext aContext = (ActionExecutorContext) context;
159        WorkflowActionBean action = (WorkflowActionBean) aContext.getAction();
160        WorkflowJobBean wfJob = (WorkflowJobBean) context.getWorkflow();
161        if (!handleUserRetry(action, wfJob)) {
162            incrActionErrorCounter(action.getType(), "error", 1);
163            action.setPending();
164            if (isStart) {
165                action.setExecutionData(message, null);
166                queue(new ActionEndXCommand(action.getId(), action.getType()));
167            }
168            else {
169                action.setEndData(status, WorkflowAction.Status.ERROR.toString());
170            }
171        }
172    }
173
174    /**
175     * Fail the job due to failed action
176     *
177     * @param context the execution context.
178     * @throws CommandException thrown if unable to fail job
179     */
180    public void failJob(ActionExecutor.Context context) throws CommandException {
181        ActionExecutorContext aContext = (ActionExecutorContext) context;
182        WorkflowActionBean action = (WorkflowActionBean) aContext.getAction();
183        failJob(context, action);
184    }
185
186    /**
187     * Fail the job due to failed action
188     *
189     * @param context the execution context.
190     * @param action the action that caused the workflow to fail
191     * @throws CommandException thrown if unable to fail job
192     */
193    public void failJob(ActionExecutor.Context context, WorkflowActionBean action) throws CommandException {
194        WorkflowJobBean workflow = (WorkflowJobBean) context.getWorkflow();
195        if (!handleUserRetry(action, workflow)) {
196            incrActionErrorCounter(action.getType(), "failed", 1);
197            LOG.warn("Failing Job due to failed action [{0}]", action.getName());
198            try {
199                workflow.getWorkflowInstance().fail(action.getName());
200                WorkflowInstance wfInstance = workflow.getWorkflowInstance();
201                ((LiteWorkflowInstance) wfInstance).setStatus(WorkflowInstance.Status.FAILED);
202                workflow.setWorkflowInstance(wfInstance);
203                workflow.setStatus(WorkflowJob.Status.FAILED);
204                action.setStatus(WorkflowAction.Status.FAILED);
205                action.resetPending();
206                queue(new WorkflowNotificationXCommand(workflow, action));
207                queue(new KillXCommand(workflow.getId()));
208                InstrumentUtils.incrJobCounter(INSTR_FAILED_JOBS_COUNTER_NAME, 1, getInstrumentation());
209            }
210            catch (WorkflowException ex) {
211                throw new CommandException(ex);
212            }
213        }
214    }
215
216    /**
217     * Execute retry for action if this action is eligible for user-retry
218     *
219     * @param action the Workflow action bean
220     * @return true if user-retry has to be handled for this action
221     * @throws CommandException thrown if unable to fail job
222     */
223    public boolean handleUserRetry(WorkflowActionBean action, WorkflowJobBean wfJob) throws CommandException {
224        String errorCode = action.getErrorCode();
225        Set<String> allowedRetryCode = LiteWorkflowStoreService.getUserRetryErrorCode();
226
227        if ((allowedRetryCode.contains(LiteWorkflowStoreService.USER_ERROR_CODE_ALL) || allowedRetryCode.contains(errorCode))
228                && action.getUserRetryCount() < action.getUserRetryMax()) {
229            LOG.info("Preparing retry this action [{0}], errorCode [{1}], userRetryCount [{2}], "
230                    + "userRetryMax [{3}], userRetryInterval [{4}]", action.getId(), errorCode, action
231                    .getUserRetryCount(), action.getUserRetryMax(), action.getUserRetryInterval());
232            ActionExecutor.RETRYPOLICY retryPolicy = getUserRetryPolicy(action, wfJob);
233            long interval = getRetryDelay(action.getUserRetryCount(), action.getUserRetryInterval() * 60, retryPolicy);
234            action.setStatus(WorkflowAction.Status.USER_RETRY);
235            action.incrmentUserRetryCount();
236            action.setPending();
237            queue(new ActionStartXCommand(action.getId(), action.getType()), interval);
238            return true;
239        }
240        return false;
241    }
242
243        /*
244         * In case of action error increment the error count for instrumentation
245         */
246    private void incrActionErrorCounter(String type, String error, int count) {
247        getInstrumentation().incr(INSTRUMENTATION_GROUP, type + "#ex." + error, count);
248    }
249
250        /**
251         * Increment the action counter in the instrumentation log. indicating how
252         * many times the action was executed since the start Oozie server
253         */
254    protected void incrActionCounter(String type, int count) {
255        getInstrumentation().incr(INSTRUMENTATION_GROUP, type + "#" + getName(), count);
256    }
257
258        /**
259         * Adding a cron for the instrumentation time for the given Instrumentation
260         * group
261         */
262    protected void addActionCron(String type, Instrumentation.Cron cron) {
263        getInstrumentation().addCron(INSTRUMENTATION_GROUP, type + "#" + getName(), cron);
264    }
265
266    /*
267     * Returns the next retry time in milliseconds, based on retry policy algorithm.
268     */
269    private long getRetryDelay(int retryCount, long retryInterval, ActionExecutor.RETRYPOLICY retryPolicy) {
270        switch (retryPolicy) {
271            case EXPONENTIAL:
272                long retryTime = ((long) Math.pow(2, retryCount) * retryInterval * 1000L);
273                return retryTime;
274            case PERIODIC:
275                return retryInterval * 1000L;
276            default:
277                throw new UnsupportedOperationException("Retry policy not supported");
278        }
279    }
280
281    /**
282     * Workflow action executor context
283     *
284     */
285    public static class ActionExecutorContext implements ActionExecutor.Context {
286        protected final WorkflowJobBean workflow;
287        private Configuration protoConf;
288        protected final WorkflowActionBean action;
289        private final boolean isRetry;
290        private final boolean isUserRetry;
291        private boolean started;
292        private boolean ended;
293        private boolean executed;
294        private boolean shouldEndWF;
295        private Job.Status jobStatus;
296
297        /**
298                 * Constructing the ActionExecutorContext, setting the private members
299                 * and constructing the proto configuration
300                 */
301        public ActionExecutorContext(WorkflowJobBean workflow, WorkflowActionBean action, boolean isRetry, boolean isUserRetry) {
302            this.workflow = workflow;
303            this.action = action;
304            this.isRetry = isRetry;
305            this.isUserRetry = isUserRetry;
306            try {
307                protoConf = new XConfiguration(new StringReader(workflow.getProtoActionConf()));
308            }
309            catch (IOException ex) {
310                throw new RuntimeException("It should not happen", ex);
311            }
312        }
313
314        /*
315         * (non-Javadoc)
316         * @see org.apache.oozie.action.ActionExecutor.Context#getCallbackUrl(java.lang.String)
317         */
318        public String getCallbackUrl(String externalStatusVar) {
319            return Services.get().get(CallbackService.class).createCallBackUrl(action.getId(), externalStatusVar);
320        }
321
322        /*
323         * (non-Javadoc)
324         * @see org.apache.oozie.action.ActionExecutor.Context#getProtoActionConf()
325         */
326        public Configuration getProtoActionConf() {
327            return protoConf;
328        }
329
330        /*
331         * (non-Javadoc)
332         * @see org.apache.oozie.action.ActionExecutor.Context#getWorkflow()
333         */
334        public WorkflowJob getWorkflow() {
335            return workflow;
336        }
337
338        /**
339         * Returns the workflow action of the given action context
340         *
341         * @return the workflow action of the given action context
342         */
343        public WorkflowAction getAction() {
344            return action;
345        }
346
347        /*
348         * (non-Javadoc)
349         * @see org.apache.oozie.action.ActionExecutor.Context#getELEvaluator()
350         */
351        public ELEvaluator getELEvaluator() {
352            ELEvaluator evaluator = Services.get().get(ELService.class).createEvaluator("workflow");
353            DagELFunctions.configureEvaluator(evaluator, workflow, action);
354            return evaluator;
355        }
356
357        /*
358         * (non-Javadoc)
359         * @see org.apache.oozie.action.ActionExecutor.Context#setVar(java.lang.String, java.lang.String)
360         */
361        public void setVar(String name, String value) {
362            setVarToWorkflow(name, value);
363        }
364
365        /**
366         * This is not thread safe, don't use if workflowjob is shared among multiple actions command
367         * @param name
368         * @param value
369         */
370        public void setVarToWorkflow(String name, String value) {
371            name = action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + name;
372            WorkflowInstance wfInstance = workflow.getWorkflowInstance();
373            wfInstance.setVar(name, value);
374            workflow.setWorkflowInstance(wfInstance);
375        }
376
377        /*
378         * (non-Javadoc)
379         * @see org.apache.oozie.action.ActionExecutor.Context#getVar(java.lang.String)
380         */
381        public String getVar(String name) {
382            name = action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + name;
383            return workflow.getWorkflowInstance().getVar(name);
384        }
385
386        /*
387         * (non-Javadoc)
388         * @see org.apache.oozie.action.ActionExecutor.Context#setStartData(java.lang.String, java.lang.String, java.lang.String)
389         */
390        public void setStartData(String externalId, String trackerUri, String consoleUrl) {
391            action.setStartData(externalId, trackerUri, consoleUrl);
392            started = true;
393        }
394
395        /**
396         * Setting the start time of the action
397         */
398        public void setStartTime() {
399            Date now = new Date();
400            action.setStartTime(now);
401        }
402
403        /*
404         * (non-Javadoc)
405         * @see org.apache.oozie.action.ActionExecutor.Context#setExecutionData(java.lang.String, java.util.Properties)
406         */
407        public void setExecutionData(String externalStatus, Properties actionData) {
408            action.setExecutionData(externalStatus, actionData);
409            executed = true;
410        }
411
412        /*
413         * (non-Javadoc)
414         * @see org.apache.oozie.action.ActionExecutor.Context#setExecutionStats(java.lang.String)
415         */
416        public void setExecutionStats(String jsonStats) {
417            action.setExecutionStats(jsonStats);
418            executed = true;
419        }
420
421        /*
422         * (non-Javadoc)
423         * @see org.apache.oozie.action.ActionExecutor.Context#setExternalChildIDs(java.lang.String)
424         */
425        public void setExternalChildIDs(String externalChildIDs) {
426            action.setExternalChildIDs(externalChildIDs);
427            executed = true;
428        }
429
430        /*
431         * (non-Javadoc)
432         * @see org.apache.oozie.action.ActionExecutor.Context#setEndData(org.apache.oozie.client.WorkflowAction.Status, java.lang.String)
433         */
434        public void setEndData(WorkflowAction.Status status, String signalValue) {
435            action.setEndData(status, signalValue);
436            ended = true;
437        }
438
439        /*
440         * (non-Javadoc)
441         * @see org.apache.oozie.action.ActionExecutor.Context#isRetry()
442         */
443        public boolean isRetry() {
444            return isRetry;
445        }
446
447        /**
448         * Return if the executor invocation is a user retry or not.
449         *
450         * @return if the executor invocation is a user retry or not.
451         */
452        public boolean isUserRetry() {
453            return isUserRetry;
454        }
455
456        /**
457         * Returns whether setStartData has been called or not.
458         *
459         * @return true if start completion info has been set.
460         */
461        public boolean isStarted() {
462            return started;
463        }
464
465        /**
466         * Returns whether setExecutionData has been called or not.
467         *
468         * @return true if execution completion info has been set, otherwise false.
469         */
470        public boolean isExecuted() {
471            return executed;
472        }
473
474        /**
475         * Returns whether setEndData has been called or not.
476         *
477         * @return true if end completion info has been set.
478         */
479        public boolean isEnded() {
480            return ended;
481        }
482
483        public void setExternalStatus(String externalStatus) {
484            action.setExternalStatus(externalStatus);
485        }
486
487        @Override
488        public String getRecoveryId() {
489            return action.getId() + RECOVERY_ID_SEPARATOR + workflow.getRun();
490        }
491
492        /* (non-Javadoc)
493         * @see org.apache.oozie.action.ActionExecutor.Context#getActionDir()
494         */
495        public Path getActionDir() throws HadoopAccessorException, IOException, URISyntaxException {
496            String name = getWorkflow().getId() + "/" + action.getName() + "--" + action.getType();
497            FileSystem fs = getAppFileSystem();
498            String actionDirPath = Services.get().getSystemId() + "/" + name;
499            Path fqActionDir = new Path(fs.getHomeDirectory(), actionDirPath);
500            return fqActionDir;
501        }
502
503        /* (non-Javadoc)
504         * @see org.apache.oozie.action.ActionExecutor.Context#getAppFileSystem()
505         */
506        public FileSystem getAppFileSystem() throws HadoopAccessorException, IOException, URISyntaxException {
507            WorkflowJob workflow = getWorkflow();
508            URI uri = new URI(getWorkflow().getAppPath());
509            HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
510            Configuration fsConf = has.createJobConf(uri.getAuthority());
511            return has.createFileSystem(workflow.getUser(), uri, fsConf);
512
513        }
514
515        /* (non-Javadoc)
516         * @see org.apache.oozie.action.ActionExecutor.Context#setErrorInfo(java.lang.String, java.lang.String)
517         */
518        @Override
519        public void setErrorInfo(String str, String exMsg) {
520            action.setErrorInfo(str, exMsg);
521        }
522
523        public boolean isShouldEndWF() {
524            return shouldEndWF;
525        }
526
527        public void setShouldEndWF(boolean shouldEndWF) {
528            this.shouldEndWF = shouldEndWF;
529        }
530
531        public Job.Status getJobStatus() {
532            return jobStatus;
533        }
534
535        public void setJobStatus(Job.Status jobStatus) {
536            this.jobStatus = jobStatus;
537        }
538    }
539
540    public static class ForkedActionExecutorContext extends ActionExecutorContext {
541        private Map<String, String> contextVariableMap = new HashMap<String, String>();
542
543        public ForkedActionExecutorContext(WorkflowJobBean workflow, WorkflowActionBean action, boolean isRetry,
544                boolean isUserRetry) {
545            super(workflow, action, isRetry, isUserRetry);
546        }
547
548        public void setVar(String name, String value) {
549            if (value != null) {
550                contextVariableMap.remove(name);
551            }
552            else {
553                contextVariableMap.put(name, value);
554            }
555        }
556
557        public String getVar(String name) {
558            return contextVariableMap.get(name);
559        }
560
561        public Map<String, String> getContextMap() {
562            return contextVariableMap;
563        }
564    }
565
566    /*
567     * Returns user retry policy
568     */
569    private ActionExecutor.RETRYPOLICY getUserRetryPolicy(WorkflowActionBean wfAction, WorkflowJobBean wfJob) {
570        WorkflowInstance wfInstance = wfJob.getWorkflowInstance();
571        LiteWorkflowApp wfApp = (LiteWorkflowApp) wfInstance.getApp();
572        NodeDef nodeDef = wfApp.getNode(wfAction.getName());
573        if (nodeDef == null) {
574            return ActionExecutor.RETRYPOLICY.valueOf(LiteWorkflowStoreService.DEFAULT_USER_RETRY_POLICY);
575        }
576        String userRetryPolicy = nodeDef.getUserRetryPolicy().toUpperCase();
577        String userRetryPolicyInSysConfig = ConfigurationService.get(LiteWorkflowStoreService.CONF_USER_RETRY_POLICY)
578                .toUpperCase();
579        if (isValidRetryPolicy(userRetryPolicy)) {
580            return ActionExecutor.RETRYPOLICY.valueOf(userRetryPolicy);
581        }
582        else if (isValidRetryPolicy(userRetryPolicyInSysConfig)) {
583            return ActionExecutor.RETRYPOLICY.valueOf(userRetryPolicyInSysConfig);
584        }
585        else {
586            return ActionExecutor.RETRYPOLICY.valueOf(LiteWorkflowStoreService.DEFAULT_USER_RETRY_POLICY);
587        }
588    }
589
590    /*
591     * Returns true if policy is valid, otherwise false
592     */
593    private static boolean isValidRetryPolicy(String policy) {
594        try {
595            ActionExecutor.RETRYPOLICY.valueOf(policy.toUpperCase().trim());
596        }
597        catch (IllegalArgumentException e) {
598            // Invalid Policy
599            return false;
600        }
601        return true;
602    }
603
604}