001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.command.wf;
020
021import java.util.ArrayList;
022import java.util.Date;
023import java.util.List;
024
025import org.apache.hadoop.conf.Configuration;
026import org.apache.oozie.DagELFunctions;
027import org.apache.oozie.ErrorCode;
028import org.apache.oozie.SLAEventBean;
029import org.apache.oozie.WorkflowActionBean;
030import org.apache.oozie.WorkflowJobBean;
031import org.apache.oozie.XException;
032import org.apache.oozie.action.ActionExecutor;
033import org.apache.oozie.action.ActionExecutorException;
034import org.apache.oozie.action.control.ControlNodeActionExecutor;
035import org.apache.oozie.client.OozieClient;
036import org.apache.oozie.client.WorkflowAction;
037import org.apache.oozie.client.WorkflowJob;
038import org.apache.oozie.client.SLAEvent.SlaAppType;
039import org.apache.oozie.client.SLAEvent.Status;
040import org.apache.oozie.client.rest.JsonBean;
041import org.apache.oozie.command.CommandException;
042import org.apache.oozie.command.PreconditionException;
043import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
044import org.apache.oozie.executor.jpa.BatchQueryExecutor;
045import org.apache.oozie.executor.jpa.JPAExecutorException;
046import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor;
047import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
048import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
049import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
050import org.apache.oozie.service.ActionService;
051import org.apache.oozie.service.EventHandlerService;
052import org.apache.oozie.service.JPAService;
053import org.apache.oozie.service.Services;
054import org.apache.oozie.service.UUIDService;
055import org.apache.oozie.util.Instrumentation;
056import org.apache.oozie.util.LogUtils;
057import org.apache.oozie.util.XLog;
058import org.apache.oozie.util.db.SLADbXOperations;
059import org.apache.oozie.workflow.WorkflowInstance;
060
061@SuppressWarnings("deprecation")
062public class ActionEndXCommand extends ActionXCommand<Void> {
063    public static final String COULD_NOT_END = "COULD_NOT_END";
064    public static final String END_DATA_MISSING = "END_DATA_MISSING";
065
066    private String jobId = null;
067    private String actionId = null;
068    private WorkflowJobBean wfJob = null;
069    private WorkflowActionBean wfAction = null;
070    private JPAService jpaService = null;
071    private ActionExecutor executor = null;
072    private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
073    private List<JsonBean> insertList = new ArrayList<JsonBean>();
074
075    public ActionEndXCommand(String actionId, String type) {
076        super("action.end", type, 0);
077        this.actionId = actionId;
078        this.jobId = Services.get().get(UUIDService.class).getId(actionId);
079    }
080
081    @Override
082    protected void setLogInfo() {
083        LogUtils.setLogInfo(actionId);
084    }
085
086    @Override
087    protected boolean isLockRequired() {
088        return true;
089    }
090
091    @Override
092    public String getEntityKey() {
093        return this.jobId;
094    }
095
096    @Override
097    public String getKey() {
098        return getName() + "_" + actionId;
099    }
100
101    @Override
102    protected void loadState() throws CommandException {
103        try {
104            jpaService = Services.get().get(JPAService.class);
105            if (jpaService != null) {
106                this.wfJob = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_ACTION_OP,
107                        jobId);
108                this.wfAction = WorkflowActionQueryExecutor.getInstance().get(WorkflowActionQuery.GET_ACTION_END,
109                        actionId);
110                LogUtils.setLogInfo(wfJob);
111                LogUtils.setLogInfo(wfAction);
112            }
113            else {
114                throw new CommandException(ErrorCode.E0610);
115            }
116        }
117        catch (XException ex) {
118            throw new CommandException(ex);
119        }
120    }
121
122    @Override
123    protected void verifyPrecondition() throws CommandException, PreconditionException {
124        if (wfJob == null) {
125            throw new PreconditionException(ErrorCode.E0604, jobId);
126        }
127        if (wfAction == null) {
128            throw new PreconditionException(ErrorCode.E0605, actionId);
129        }
130        if (wfAction.isPending()
131                && (wfAction.getStatus() == WorkflowActionBean.Status.DONE
132                        || wfAction.getStatus() == WorkflowActionBean.Status.END_RETRY || wfAction.getStatus() == WorkflowActionBean.Status.END_MANUAL)) {
133
134            if (wfJob.getStatus() != WorkflowJob.Status.RUNNING) {
135                throw new PreconditionException(ErrorCode.E0811,  WorkflowJob.Status.RUNNING.toString());
136            }
137        }
138        else {
139            throw new PreconditionException(ErrorCode.E0812, wfAction.isPending(), wfAction.getStatusStr());
140        }
141
142        executor = Services.get().get(ActionService.class).getExecutor(wfAction.getType());
143        if (executor == null) {
144            throw new CommandException(ErrorCode.E0802, wfAction.getType());
145        }
146    }
147
148    @Override
149    protected Void execute() throws CommandException {
150        LOG.debug("STARTED ActionEndXCommand for action " + actionId);
151
152        Configuration conf = wfJob.getWorkflowInstance().getConf();
153
154        int maxRetries = 0;
155        long retryInterval = 0;
156
157        if (!(executor instanceof ControlNodeActionExecutor)) {
158            maxRetries = conf.getInt(OozieClient.ACTION_MAX_RETRIES, executor.getMaxRetries());
159            retryInterval = conf.getLong(OozieClient.ACTION_RETRY_INTERVAL, executor.getRetryInterval());
160        }
161
162        executor.setMaxRetries(maxRetries);
163        executor.setRetryInterval(retryInterval);
164
165        boolean isRetry = false;
166        if (wfAction.getStatus() == WorkflowActionBean.Status.END_RETRY
167                || wfAction.getStatus() == WorkflowActionBean.Status.END_MANUAL) {
168            isRetry = true;
169        }
170        boolean isUserRetry = false;
171        ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry, isUserRetry);
172        try {
173
174            LOG.debug(
175                    "End, name [{0}] type [{1}] status[{2}] external status [{3}] signal value [{4}]",
176                    wfAction.getName(), wfAction.getType(), wfAction.getStatus(), wfAction.getExternalStatus(),
177                    wfAction.getSignalValue());
178
179            Instrumentation.Cron cron = new Instrumentation.Cron();
180            cron.start();
181            executor.end(context, wfAction);
182            cron.stop();
183            addActionCron(wfAction.getType(), cron);
184
185            incrActionCounter(wfAction.getType(), 1);
186
187            if (!context.isEnded()) {
188                LOG.warn(XLog.OPS, "Action Ended, ActionExecutor [{0}] must call setEndData()",
189                        executor.getType());
190                wfAction.setErrorInfo(END_DATA_MISSING, "Execution Ended, but End Data Missing from Action");
191                failJob(context);
192            } else {
193                wfAction.setRetries(0);
194                wfAction.setEndTime(new Date());
195
196                boolean shouldHandleUserRetry = false;
197                Status slaStatus = null;
198                switch (wfAction.getStatus()) {
199                    case OK:
200                        slaStatus = Status.SUCCEEDED;
201                        break;
202                    case KILLED:
203                        slaStatus = Status.KILLED;
204                        break;
205                    case FAILED:
206                        slaStatus = Status.FAILED;
207                        shouldHandleUserRetry = true;
208                        break;
209                    case ERROR:
210                        LOG.info("ERROR is considered as FAILED for SLA");
211                        slaStatus = Status.KILLED;
212                        shouldHandleUserRetry = true;
213                        break;
214                    default:
215                        slaStatus = Status.FAILED;
216                        shouldHandleUserRetry = true;
217                        break;
218                }
219                if (!shouldHandleUserRetry || !handleUserRetry(wfAction, wfJob)) {
220                    SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), slaStatus, SlaAppType.WORKFLOW_ACTION);
221                    if(slaEvent != null) {
222                        insertList.add(slaEvent);
223                    }
224                }
225            }
226            WorkflowInstance wfInstance = wfJob.getWorkflowInstance();
227            DagELFunctions.setActionInfo(wfInstance, wfAction);
228            wfJob.setWorkflowInstance(wfInstance);
229
230            updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_END,wfAction));
231            wfJob.setLastModifiedTime(new Date());
232            updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, wfJob));
233        }
234        catch (ActionExecutorException ex) {
235            LOG.warn(
236                    "Error ending action [{0}]. ErrorType [{1}], ErrorCode [{2}], Message [{3}]",
237                    wfAction.getName(), ex.getErrorType(), ex.getErrorCode(), ex.getMessage());
238            wfAction.setErrorInfo(ex.getErrorCode(), ex.getMessage());
239            wfAction.setEndTime(null);
240
241            switch (ex.getErrorType()) {
242                case TRANSIENT:
243                    if (!handleTransient(context, executor, WorkflowAction.Status.END_RETRY)) {
244                        handleNonTransient(context, executor, WorkflowAction.Status.END_MANUAL);
245                        wfAction.setPendingAge(new Date());
246                        wfAction.setRetries(0);
247                    }
248                    wfAction.setEndTime(null);
249                    break;
250                case NON_TRANSIENT:
251                    handleNonTransient(context, executor, WorkflowAction.Status.END_MANUAL);
252                    wfAction.setEndTime(null);
253                    break;
254                case ERROR:
255                    handleError(context, executor, COULD_NOT_END, false, WorkflowAction.Status.ERROR);
256                    break;
257                case FAILED:
258                    failJob(context);
259                    break;
260            }
261
262            WorkflowInstance wfInstance = wfJob.getWorkflowInstance();
263            DagELFunctions.setActionInfo(wfInstance, wfAction);
264            wfJob.setWorkflowInstance(wfInstance);
265
266            updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_END,wfAction));
267            wfJob.setLastModifiedTime(new Date());
268            updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, wfJob));
269        }
270        finally {
271            try {
272                BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null);
273            }
274            catch (JPAExecutorException e) {
275                throw new CommandException(e);
276            }
277            if (!(executor instanceof ControlNodeActionExecutor) && EventHandlerService.isEnabled()) {
278                generateEvent(wfAction, wfJob.getUser());
279            }
280            new SignalXCommand(jobId, actionId).call();
281        }
282
283        LOG.debug("ENDED ActionEndXCommand for action " + actionId);
284        return null;
285    }
286
287}