001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.command.wf;
019
020import java.util.ArrayList;
021import java.util.Date;
022import java.util.List;
023
024import org.apache.hadoop.conf.Configuration;
025import org.apache.oozie.DagELFunctions;
026import org.apache.oozie.ErrorCode;
027import org.apache.oozie.SLAEventBean;
028import org.apache.oozie.WorkflowActionBean;
029import org.apache.oozie.WorkflowJobBean;
030import org.apache.oozie.XException;
031import org.apache.oozie.action.ActionExecutor;
032import org.apache.oozie.action.ActionExecutorException;
033import org.apache.oozie.action.control.ControlNodeActionExecutor;
034import org.apache.oozie.client.OozieClient;
035import org.apache.oozie.client.WorkflowAction;
036import org.apache.oozie.client.WorkflowJob;
037import org.apache.oozie.client.SLAEvent.SlaAppType;
038import org.apache.oozie.client.SLAEvent.Status;
039import org.apache.oozie.client.rest.JsonBean;
040import org.apache.oozie.command.CommandException;
041import org.apache.oozie.command.PreconditionException;
042import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
043import org.apache.oozie.executor.jpa.BatchQueryExecutor;
044import org.apache.oozie.executor.jpa.JPAExecutorException;
045import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor;
046import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
047import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
048import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
049import org.apache.oozie.service.ActionService;
050import org.apache.oozie.service.EventHandlerService;
051import org.apache.oozie.service.JPAService;
052import org.apache.oozie.service.Services;
053import org.apache.oozie.service.UUIDService;
054import org.apache.oozie.util.Instrumentation;
055import org.apache.oozie.util.LogUtils;
056import org.apache.oozie.util.XLog;
057import org.apache.oozie.util.db.SLADbXOperations;
058import org.apache.oozie.workflow.WorkflowInstance;
059
060@SuppressWarnings("deprecation")
061public class ActionEndXCommand extends ActionXCommand<Void> {
062    public static final String COULD_NOT_END = "COULD_NOT_END";
063    public static final String END_DATA_MISSING = "END_DATA_MISSING";
064
065    private String jobId = null;
066    private String actionId = null;
067    private WorkflowJobBean wfJob = null;
068    private WorkflowActionBean wfAction = null;
069    private JPAService jpaService = null;
070    private ActionExecutor executor = null;
071    private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
072    private List<JsonBean> insertList = new ArrayList<JsonBean>();
073
074    public ActionEndXCommand(String actionId, String type) {
075        super("action.end", type, 0);
076        this.actionId = actionId;
077        this.jobId = Services.get().get(UUIDService.class).getId(actionId);
078    }
079
080    @Override
081    protected boolean isLockRequired() {
082        return true;
083    }
084
085    @Override
086    public String getEntityKey() {
087        return this.jobId;
088    }
089
090    @Override
091    public String getKey() {
092        return getName() + "_" + actionId;
093    }
094
095    @Override
096    protected void loadState() throws CommandException {
097        try {
098            jpaService = Services.get().get(JPAService.class);
099            if (jpaService != null) {
100                this.wfJob = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_ACTION_OP,
101                        jobId);
102                this.wfAction = WorkflowActionQueryExecutor.getInstance().get(WorkflowActionQuery.GET_ACTION_END,
103                        actionId);
104                LogUtils.setLogInfo(wfJob, logInfo);
105                LogUtils.setLogInfo(wfAction, logInfo);
106            }
107            else {
108                throw new CommandException(ErrorCode.E0610);
109            }
110        }
111        catch (XException ex) {
112            throw new CommandException(ex);
113        }
114    }
115
116    @Override
117    protected void verifyPrecondition() throws CommandException, PreconditionException {
118        if (wfJob == null) {
119            throw new PreconditionException(ErrorCode.E0604, jobId);
120        }
121        if (wfAction == null) {
122            throw new PreconditionException(ErrorCode.E0605, actionId);
123        }
124        if (wfAction.isPending()
125                && (wfAction.getStatus() == WorkflowActionBean.Status.DONE
126                        || wfAction.getStatus() == WorkflowActionBean.Status.END_RETRY || wfAction.getStatus() == WorkflowActionBean.Status.END_MANUAL)) {
127
128            if (wfJob.getStatus() != WorkflowJob.Status.RUNNING) {
129                throw new PreconditionException(ErrorCode.E0811,  WorkflowJob.Status.RUNNING.toString());
130            }
131        }
132        else {
133            throw new PreconditionException(ErrorCode.E0812, wfAction.isPending(), wfAction.getStatusStr());
134        }
135
136        executor = Services.get().get(ActionService.class).getExecutor(wfAction.getType());
137        if (executor == null) {
138            throw new CommandException(ErrorCode.E0802, wfAction.getType());
139        }
140    }
141
142    @Override
143    protected Void execute() throws CommandException {
144        LOG.debug("STARTED ActionEndXCommand for action " + actionId);
145
146        Configuration conf = wfJob.getWorkflowInstance().getConf();
147
148        int maxRetries = 0;
149        long retryInterval = 0;
150
151        if (!(executor instanceof ControlNodeActionExecutor)) {
152            maxRetries = conf.getInt(OozieClient.ACTION_MAX_RETRIES, executor.getMaxRetries());
153            retryInterval = conf.getLong(OozieClient.ACTION_RETRY_INTERVAL, executor.getRetryInterval());
154        }
155
156        executor.setMaxRetries(maxRetries);
157        executor.setRetryInterval(retryInterval);
158
159        boolean isRetry = false;
160        if (wfAction.getStatus() == WorkflowActionBean.Status.END_RETRY
161                || wfAction.getStatus() == WorkflowActionBean.Status.END_MANUAL) {
162            isRetry = true;
163        }
164        boolean isUserRetry = false;
165        ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry, isUserRetry);
166        try {
167
168            LOG.debug(
169                    "End, name [{0}] type [{1}] status[{2}] external status [{3}] signal value [{4}]",
170                    wfAction.getName(), wfAction.getType(), wfAction.getStatus(), wfAction.getExternalStatus(),
171                    wfAction.getSignalValue());
172
173            Instrumentation.Cron cron = new Instrumentation.Cron();
174            cron.start();
175            executor.end(context, wfAction);
176            cron.stop();
177            addActionCron(wfAction.getType(), cron);
178
179            WorkflowInstance wfInstance = wfJob.getWorkflowInstance();
180            DagELFunctions.setActionInfo(wfInstance, wfAction);
181            wfJob.setWorkflowInstance(wfInstance);
182            incrActionCounter(wfAction.getType(), 1);
183
184            if (!context.isEnded()) {
185                LOG.warn(XLog.OPS, "Action Ended, ActionExecutor [{0}] must call setEndData()",
186                        executor.getType());
187                wfAction.setErrorInfo(END_DATA_MISSING, "Execution Ended, but End Data Missing from Action");
188                failJob(context);
189            } else {
190                wfAction.setRetries(0);
191                wfAction.setEndTime(new Date());
192
193                boolean shouldHandleUserRetry = false;
194                Status slaStatus = null;
195                switch (wfAction.getStatus()) {
196                    case OK:
197                        slaStatus = Status.SUCCEEDED;
198                        break;
199                    case KILLED:
200                        slaStatus = Status.KILLED;
201                        break;
202                    case FAILED:
203                        slaStatus = Status.FAILED;
204                        shouldHandleUserRetry = true;
205                        break;
206                    case ERROR:
207                        LOG.info("ERROR is considered as FAILED for SLA");
208                        slaStatus = Status.KILLED;
209                        shouldHandleUserRetry = true;
210                        break;
211                    default:
212                        slaStatus = Status.FAILED;
213                        shouldHandleUserRetry = true;
214                        break;
215                }
216                if (!shouldHandleUserRetry || !handleUserRetry(wfAction)) {
217                    SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), slaStatus, SlaAppType.WORKFLOW_ACTION);
218                    if(slaEvent != null) {
219                        insertList.add(slaEvent);
220                    }
221                }
222            }
223            updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_END,wfAction));
224            wfJob.setLastModifiedTime(new Date());
225            updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, wfJob));
226        }
227        catch (ActionExecutorException ex) {
228            LOG.warn(
229                    "Error ending action [{0}]. ErrorType [{1}], ErrorCode [{2}], Message [{3}]",
230                    wfAction.getName(), ex.getErrorType(), ex.getErrorCode(), ex.getMessage());
231            wfAction.setErrorInfo(ex.getErrorCode(), ex.getMessage());
232            wfAction.setEndTime(null);
233
234            switch (ex.getErrorType()) {
235                case TRANSIENT:
236                    if (!handleTransient(context, executor, WorkflowAction.Status.END_RETRY)) {
237                        handleNonTransient(context, executor, WorkflowAction.Status.END_MANUAL);
238                        wfAction.setPendingAge(new Date());
239                        wfAction.setRetries(0);
240                    }
241                    wfAction.setEndTime(null);
242                    break;
243                case NON_TRANSIENT:
244                    handleNonTransient(context, executor, WorkflowAction.Status.END_MANUAL);
245                    wfAction.setEndTime(null);
246                    break;
247                case ERROR:
248                    handleError(context, executor, COULD_NOT_END, false, WorkflowAction.Status.ERROR);
249                    break;
250                case FAILED:
251                    failJob(context);
252                    break;
253            }
254
255            WorkflowInstance wfInstance = wfJob.getWorkflowInstance();
256            DagELFunctions.setActionInfo(wfInstance, wfAction);
257            wfJob.setWorkflowInstance(wfInstance);
258
259            updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_END,wfAction));
260            wfJob.setLastModifiedTime(new Date());
261            updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, wfJob));
262        }
263        finally {
264            try {
265                BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null);
266            }
267            catch (JPAExecutorException e) {
268                throw new CommandException(e);
269            }
270            if (!(executor instanceof ControlNodeActionExecutor) && EventHandlerService.isEnabled()) {
271                generateEvent(wfAction, wfJob.getUser());
272            }
273            new SignalXCommand(jobId, actionId).call(getEntityKey());
274        }
275
276        LOG.debug("ENDED ActionEndXCommand for action " + actionId);
277        return null;
278    }
279
280}