001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.wf;
019    
020    import java.util.Date;
021    
022    import org.apache.hadoop.conf.Configuration;
023    import org.apache.oozie.DagELFunctions;
024    import org.apache.oozie.ErrorCode;
025    import org.apache.oozie.WorkflowActionBean;
026    import org.apache.oozie.WorkflowJobBean;
027    import org.apache.oozie.action.ActionExecutor;
028    import org.apache.oozie.action.ActionExecutorException;
029    import org.apache.oozie.client.OozieClient;
030    import org.apache.oozie.client.WorkflowAction;
031    import org.apache.oozie.client.WorkflowJob;
032    import org.apache.oozie.client.SLAEvent.SlaAppType;
033    import org.apache.oozie.client.SLAEvent.Status;
034    import org.apache.oozie.command.CommandException;
035    import org.apache.oozie.service.ActionService;
036    import org.apache.oozie.service.Services;
037    import org.apache.oozie.service.UUIDService;
038    import org.apache.oozie.store.StoreException;
039    import org.apache.oozie.store.WorkflowStore;
040    import org.apache.oozie.util.Instrumentation;
041    import org.apache.oozie.util.XLog;
042    import org.apache.oozie.util.db.SLADbOperations;
043    import org.apache.oozie.workflow.WorkflowInstance;
044    
045    public class ActionEndCommand extends ActionCommand<Void> {
046        public static final String COULD_NOT_END = "COULD_NOT_END";
047        public static final String END_DATA_MISSING = "END_DATA_MISSING";
048    
049        private String id;
050        private String jobId = null;
051    
052        public ActionEndCommand(String id, String type) {
053            super("action.end", type, 0);
054            this.id = id;
055        }
056    
057        @Override
058        protected Void call(WorkflowStore store) throws StoreException, CommandException {
059            WorkflowJobBean workflow = store.getWorkflow(jobId, false);
060            setLogInfo(workflow);
061            WorkflowActionBean action = store.getAction(id, false);
062            setLogInfo(action);
063            if (action.isPending()
064                    && (action.getStatus() == WorkflowActionBean.Status.DONE
065                    || action.getStatus() == WorkflowActionBean.Status.END_RETRY || action.getStatus() == WorkflowActionBean.Status.END_MANUAL)) {
066                if (workflow.getStatus() == WorkflowJob.Status.RUNNING) {
067    
068                    ActionExecutor executor = Services.get().get(ActionService.class).getExecutor(action.getType());
069                    Configuration conf = workflow.getWorkflowInstance().getConf();
070                    int maxRetries = conf.getInt(OozieClient.ACTION_MAX_RETRIES, executor.getMaxRetries());
071                    long retryInterval = conf.getLong(OozieClient.ACTION_RETRY_INTERVAL, executor.getRetryInterval());
072                    executor.setMaxRetries(maxRetries);
073                    executor.setRetryInterval(retryInterval);
074    
075                    if (executor != null) {
076                        boolean isRetry = false;
077                        if (action.getStatus() == WorkflowActionBean.Status.END_RETRY
078                                || action.getStatus() == WorkflowActionBean.Status.END_MANUAL) {
079                            isRetry = true;
080                        }
081                        ActionExecutorContext context = new ActionCommand.ActionExecutorContext(workflow, action, isRetry);
082                        try {
083    
084                            XLog.getLog(getClass()).debug(
085                                    "End, name [{0}] type [{1}] status[{2}] external status [{3}] signal value [{4}]",
086                                    action.getName(), action.getType(), action.getStatus(), action.getExternalStatus(),
087                                    action.getSignalValue());
088    
089                            Instrumentation.Cron cron = new Instrumentation.Cron();
090                            cron.start();
091                            executor.end(context, action);
092                            cron.stop();
093                            addActionCron(action.getType(), cron);
094    
095                            WorkflowInstance wfInstance = workflow.getWorkflowInstance();
096                            DagELFunctions.setActionInfo(wfInstance, action);
097                            workflow.setWorkflowInstance(wfInstance);
098                            incrActionCounter(action.getType(), 1);
099    
100                            if (!context.isEnded()) {
101                                XLog.getLog(getClass()).warn(XLog.OPS,
102                                                             "Action Ended, ActionExecutor [{0}] must call setEndData()", executor.getType());
103                                action.setErrorInfo(END_DATA_MISSING, "Execution Ended, but End Data Missing from Action");
104                                failJob(context);
105                                store.updateAction(action);
106                                store.updateWorkflow(workflow);
107                                return null;
108                            }
109                            action.setRetries(0);
110                            action.setEndTime(new Date());
111                            store.updateAction(action);
112                            store.updateWorkflow(workflow);
113                            Status slaStatus = null;
114                            switch (action.getStatus()) {
115                                case OK:
116                                    slaStatus = Status.SUCCEEDED;
117                                    break;
118                                case KILLED:
119                                    slaStatus = Status.KILLED;
120                                    break;
121                                case FAILED:
122                                    slaStatus = Status.FAILED;
123                                    break;
124                                case ERROR:
125                                    XLog.getLog(getClass()).info("ERROR is considered as FAILED for SLA");
126                                    slaStatus = Status.KILLED;
127                                    break;
128                                default: // TODO: What will happen for other Action
129                                    // status
130                                    slaStatus = Status.FAILED;
131                                    break;
132                            }
133                            SLADbOperations.writeStausEvent(action.getSlaXml(), action.getId(), store, slaStatus,
134                                                            SlaAppType.WORKFLOW_ACTION);
135                            queueCallable(new NotificationCommand(workflow, action));
136                            XLog.getLog(getClass()).debug(
137                                    "Queuing commands for action=" + id + ", status=" + action.getStatus()
138                                            + ", Set pending=" + action.getPending());
139                            queueCallable(new SignalCommand(workflow.getId(), id));
140                        }
141                        catch (ActionExecutorException ex) {
142                            XLog.getLog(getClass()).warn(
143                                    "Error ending action [{0}]. ErrorType [{1}], ErrorCode [{2}], Message [{3}]",
144                                    action.getName(), ex.getErrorType(), ex.getErrorCode(), ex.getMessage());
145                            action.setErrorInfo(ex.getErrorCode(), ex.getMessage());
146                            action.setEndTime(null);
147                            switch (ex.getErrorType()) {
148                                case TRANSIENT:
149                                    if (!handleTransient(context, executor, WorkflowAction.Status.END_RETRY)) {
150                                        handleNonTransient(store, context, executor, WorkflowAction.Status.END_MANUAL);
151                                        action.setPendingAge(new Date());
152                                        action.setRetries(0);
153                                    }
154                                    action.setEndTime(null);
155                                    break;
156                                case NON_TRANSIENT:
157                                    handleNonTransient(store, context, executor, WorkflowAction.Status.END_MANUAL);
158                                    action.setEndTime(null);
159                                    break;
160                                case ERROR:
161                                    handleError(context, executor, COULD_NOT_END, false, WorkflowAction.Status.ERROR);
162                                    queueCallable(new SignalCommand(workflow.getId(), id));
163                                    break;
164                                case FAILED:
165                                    failJob(context);
166                                    break;
167                            }
168                            store.updateAction(action);
169                            store.updateWorkflow(workflow);
170                        }
171                    }
172                    else {
173                        throw new CommandException(ErrorCode.E0802, action.getType());
174                    }
175                }
176                else {
177                    XLog.getLog(getClass()).warn("Job state is not {0}. Skipping ActionEnd Execution",
178                                                 WorkflowJob.Status.RUNNING.toString());
179                }
180            }
181            else {
182                XLog.getLog(getClass()).debug("Action pending={0}, status={1}. Skipping ActionEnd Execution",
183                                              action.getPending(), action.getStatusStr());
184            }
185            return null;
186        }
187    
188        @Override
189        protected Void execute(WorkflowStore store) throws CommandException, StoreException {
190            XLog.getLog(getClass()).debug("STARTED ActionEndCommand for action " + id);
191            try {
192                jobId = Services.get().get(UUIDService.class).getId(id);
193                if (lock(jobId)) {
194                    call(store);
195                }
196                else {
197                    queueCallable(new ActionEndCommand(id, getType()), LOCK_FAILURE_REQUEUE_INTERVAL);
198                    XLog.getLog(getClass()).warn("ActionEnd lock was not acquired - failed {0}", id);
199                }
200            }
201            catch (InterruptedException e) {
202                queueCallable(new ActionEndCommand(id, getType()), LOCK_FAILURE_REQUEUE_INTERVAL);
203                XLog.getLog(getClass()).warn("ActionEnd lock was not acquired - interrupted exception failed {0}", id);
204            }
205            finally {
206                XLog.getLog(getClass()).debug("ENDED ActionEndCommand for action " + id);
207            }
208            return null;
209        }
210    }