001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.command.wf;
019
020import org.apache.oozie.action.control.ControlNodeActionExecutor;
021import org.apache.oozie.client.WorkflowJob;
022import org.apache.oozie.client.SLAEvent.SlaAppType;
023import org.apache.oozie.client.SLAEvent.Status;
024import org.apache.oozie.client.rest.JsonBean;
025import org.apache.oozie.ErrorCode;
026import org.apache.oozie.SLAEventBean;
027import org.apache.oozie.WorkflowActionBean;
028import org.apache.oozie.WorkflowJobBean;
029import org.apache.oozie.XException;
030import org.apache.oozie.command.CommandException;
031import org.apache.oozie.command.PreconditionException;
032import org.apache.oozie.executor.jpa.BatchQueryExecutor;
033import org.apache.oozie.executor.jpa.JPAExecutorException;
034import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
035import org.apache.oozie.executor.jpa.WorkflowActionsGetForJobJPAExecutor;
036import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
037import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
038import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
039import org.apache.oozie.service.ActionService;
040import org.apache.oozie.service.EventHandlerService;
041import org.apache.oozie.service.JPAService;
042import org.apache.oozie.service.Services;
043import org.apache.oozie.workflow.WorkflowException;
044import org.apache.oozie.workflow.WorkflowInstance;
045import org.apache.oozie.workflow.lite.LiteWorkflowInstance;
046import org.apache.oozie.util.InstrumentUtils;
047import org.apache.oozie.util.LogUtils;
048import org.apache.oozie.util.ParamChecker;
049import org.apache.oozie.util.db.SLADbXOperations;
050
051import java.util.ArrayList;
052import java.util.Date;
053import java.util.List;
054
055/**
056 * Kill workflow job and its workflow instance and queue a {@link WorkflowActionKillXCommand} to kill the workflow
057 * actions.
058 */
059@SuppressWarnings("deprecation")
060public class KillXCommand extends WorkflowXCommand<Void> {
061
062    private String wfId;
063    private WorkflowJobBean wfJob;
064    private List<WorkflowActionBean> actionList;
065    private ActionService actionService;
066    private JPAService jpaService = null;
067    private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
068    private List<JsonBean> insertList = new ArrayList<JsonBean>();
069
070    public KillXCommand(String wfId) {
071        super("kill", "kill", 1);
072        this.wfId = ParamChecker.notEmpty(wfId, "wfId");
073    }
074
075    @Override
076    protected boolean isLockRequired() {
077        return true;
078    }
079
080    @Override
081    public String getEntityKey() {
082        return this.wfId;
083    }
084
085    @Override
086    public String getKey() {
087        return getName() + "_" + this.wfId;
088    }
089
090    @Override
091    protected void loadState() throws CommandException {
092        try {
093            jpaService = Services.get().get(JPAService.class);
094            if (jpaService != null) {
095                this.wfJob = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_KILL, wfId);
096                this.actionList = jpaService.execute(new WorkflowActionsGetForJobJPAExecutor(wfId));
097                LogUtils.setLogInfo(wfJob, logInfo);
098            }
099            else {
100                throw new CommandException(ErrorCode.E0610);
101            }
102            actionService = Services.get().get(ActionService.class);
103        }
104        catch (XException ex) {
105            throw new CommandException(ex);
106        }
107    }
108
109    @Override
110    protected void verifyPrecondition() throws CommandException, PreconditionException {
111        if (wfJob.getStatus() != WorkflowJob.Status.PREP && wfJob.getStatus() != WorkflowJob.Status.RUNNING
112                && wfJob.getStatus() != WorkflowJob.Status.SUSPENDED && wfJob.getStatus() != WorkflowJob.Status.FAILED) {
113            throw new PreconditionException(ErrorCode.E0725, wfJob.getId());
114        }
115    }
116
117    @Override
118    protected Void execute() throws CommandException {
119        LOG.info("STARTED WorkflowKillXCommand for jobId=" + wfId);
120
121        wfJob.setEndTime(new Date());
122
123        if (wfJob.getStatus() != WorkflowJob.Status.FAILED) {
124            InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
125            wfJob.setStatus(WorkflowJob.Status.KILLED);
126            SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfJob.getSlaXml(), wfJob.getId(),
127                    Status.KILLED, SlaAppType.WORKFLOW_JOB);
128            if(slaEvent != null) {
129                insertList.add(slaEvent);
130            }
131            try {
132                wfJob.getWorkflowInstance().kill();
133            }
134            catch (WorkflowException e) {
135                throw new CommandException(ErrorCode.E0725, e.getMessage(), e);
136            }
137            WorkflowInstance wfInstance = wfJob.getWorkflowInstance();
138            ((LiteWorkflowInstance) wfInstance).setStatus(WorkflowInstance.Status.KILLED);
139            wfJob.setWorkflowInstance(wfInstance);
140        }
141        try {
142            for (WorkflowActionBean action : actionList) {
143                if (action.getStatus() == WorkflowActionBean.Status.RUNNING
144                        || action.getStatus() == WorkflowActionBean.Status.DONE) {
145                    if (!(actionService.getExecutor(action.getType()) instanceof ControlNodeActionExecutor)) {
146                        action.setPending();
147                    }
148                    action.setStatus(WorkflowActionBean.Status.KILLED);
149                    updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_STATUS_PENDING, action));
150
151                    queue(new ActionKillXCommand(action.getId(), action.getType()));
152                }
153                else if (action.getStatus() == WorkflowActionBean.Status.PREP
154                        || action.getStatus() == WorkflowActionBean.Status.START_RETRY
155                        || action.getStatus() == WorkflowActionBean.Status.START_MANUAL
156                        || action.getStatus() == WorkflowActionBean.Status.END_RETRY
157                        || action.getStatus() == WorkflowActionBean.Status.END_MANUAL
158                        || action.getStatus() == WorkflowActionBean.Status.USER_RETRY) {
159
160                    action.setStatus(WorkflowActionBean.Status.KILLED);
161                    action.resetPending();
162                    SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(action.getSlaXml(), action.getId(),
163                            Status.KILLED, SlaAppType.WORKFLOW_ACTION);
164                    if(slaEvent != null) {
165                        insertList.add(slaEvent);
166                    }
167                    updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_STATUS_PENDING, action));
168                    if (EventHandlerService.isEnabled()
169                            && !(actionService.getExecutor(action.getType()) instanceof ControlNodeActionExecutor)) {
170                        generateEvent(action, wfJob.getUser());
171                    }
172                }
173            }
174            wfJob.setLastModifiedTime(new Date());
175            updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_END, wfJob));
176            BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null);
177            if (EventHandlerService.isEnabled()) {
178                generateEvent(wfJob);
179            }
180            queue(new NotificationXCommand(wfJob));
181        }
182        catch (JPAExecutorException e) {
183            throw new CommandException(e);
184        }
185        finally {
186            if(wfJob.getStatus() == WorkflowJob.Status.KILLED) {
187                 new WfEndXCommand(wfJob).call(); //To delete the WF temp dir
188            }
189            updateParentIfNecessary(wfJob);
190        }
191
192        LOG.info("ENDED WorkflowKillXCommand for jobId=" + wfId);
193        return null;
194    }
195
196}