001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.command.wf;
020
021import org.apache.oozie.action.control.ControlNodeActionExecutor;
022import org.apache.oozie.client.WorkflowJob;
023import org.apache.oozie.client.SLAEvent.SlaAppType;
024import org.apache.oozie.client.SLAEvent.Status;
025import org.apache.oozie.client.rest.JsonBean;
026import org.apache.oozie.ErrorCode;
027import org.apache.oozie.SLAEventBean;
028import org.apache.oozie.WorkflowActionBean;
029import org.apache.oozie.WorkflowJobBean;
030import org.apache.oozie.XException;
031import org.apache.oozie.command.CommandException;
032import org.apache.oozie.command.PreconditionException;
033import org.apache.oozie.executor.jpa.BatchQueryExecutor;
034import org.apache.oozie.executor.jpa.JPAExecutorException;
035import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
036import org.apache.oozie.executor.jpa.WorkflowActionsGetForJobJPAExecutor;
037import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
038import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
039import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
040import org.apache.oozie.service.ActionService;
041import org.apache.oozie.service.EventHandlerService;
042import org.apache.oozie.service.JPAService;
043import org.apache.oozie.service.Services;
044import org.apache.oozie.workflow.WorkflowException;
045import org.apache.oozie.workflow.WorkflowInstance;
046import org.apache.oozie.workflow.lite.LiteWorkflowInstance;
047import org.apache.oozie.util.InstrumentUtils;
048import org.apache.oozie.util.LogUtils;
049import org.apache.oozie.util.ParamChecker;
050import org.apache.oozie.util.db.SLADbXOperations;
051
052import java.util.ArrayList;
053import java.util.Date;
054import java.util.List;
055
056/**
057 * Kill workflow job and its workflow instance and queue a {@link ActionKillXCommand} to kill the workflow
058 * actions.
059 */
060@SuppressWarnings("deprecation")
061public class KillXCommand extends WorkflowXCommand<Void> {
062
063    private String wfId;
064    private WorkflowJobBean wfJob;
065    private List<WorkflowActionBean> actionList;
066    private ActionService actionService;
067    private JPAService jpaService = null;
068    private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
069    private List<JsonBean> insertList = new ArrayList<JsonBean>();
070
071    public KillXCommand(String wfId) {
072        super("kill", "kill", 1);
073        this.wfId = ParamChecker.notEmpty(wfId, "wfId");
074    }
075
076    @Override
077    protected void setLogInfo() {
078        LogUtils.setLogInfo(wfId);
079    }
080
081    @Override
082    protected boolean isLockRequired() {
083        return true;
084    }
085
086    @Override
087    public String getEntityKey() {
088        return this.wfId;
089    }
090
091    @Override
092    public String getKey() {
093        return getName() + "_" + this.wfId;
094    }
095
096    @Override
097    protected void loadState() throws CommandException {
098        try {
099            jpaService = Services.get().get(JPAService.class);
100            if (jpaService != null) {
101                this.wfJob = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_KILL, wfId);
102                this.actionList = jpaService.execute(new WorkflowActionsGetForJobJPAExecutor(wfId));
103                LogUtils.setLogInfo(wfJob);
104            }
105            else {
106                throw new CommandException(ErrorCode.E0610);
107            }
108            actionService = Services.get().get(ActionService.class);
109        }
110        catch (XException ex) {
111            throw new CommandException(ex);
112        }
113    }
114
115    @Override
116    protected void verifyPrecondition() throws CommandException, PreconditionException {
117        if (wfJob.getStatus() != WorkflowJob.Status.PREP && wfJob.getStatus() != WorkflowJob.Status.RUNNING
118                && wfJob.getStatus() != WorkflowJob.Status.SUSPENDED && wfJob.getStatus() != WorkflowJob.Status.FAILED) {
119            throw new PreconditionException(ErrorCode.E0725, wfJob.getId());
120        }
121    }
122
123    @Override
124    protected Void execute() throws CommandException {
125        LOG.info("STARTED WorkflowKillXCommand for jobId=" + wfId);
126
127        wfJob.setEndTime(new Date());
128
129        if (wfJob.getStatus() != WorkflowJob.Status.FAILED) {
130            InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
131            wfJob.setStatus(WorkflowJob.Status.KILLED);
132            SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfJob.getSlaXml(), wfJob.getId(),
133                    Status.KILLED, SlaAppType.WORKFLOW_JOB);
134            if(slaEvent != null) {
135                insertList.add(slaEvent);
136            }
137            try {
138                wfJob.getWorkflowInstance().kill();
139            }
140            catch (WorkflowException e) {
141                throw new CommandException(ErrorCode.E0725, e.getMessage(), e);
142            }
143            WorkflowInstance wfInstance = wfJob.getWorkflowInstance();
144            ((LiteWorkflowInstance) wfInstance).setStatus(WorkflowInstance.Status.KILLED);
145            wfJob.setWorkflowInstance(wfInstance);
146        }
147        try {
148            for (WorkflowActionBean action : actionList) {
149                if (action.getStatus() == WorkflowActionBean.Status.RUNNING
150                        || action.getStatus() == WorkflowActionBean.Status.DONE) {
151                    if (!(actionService.getExecutor(action.getType()) instanceof ControlNodeActionExecutor)) {
152                        action.setPending();
153                    }
154                    action.setStatus(WorkflowActionBean.Status.KILLED);
155                    updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_STATUS_PENDING, action));
156
157                    queue(new ActionKillXCommand(action.getId(), action.getType()));
158                }
159                else if (action.getStatus() == WorkflowActionBean.Status.PREP
160                        || action.getStatus() == WorkflowActionBean.Status.START_RETRY
161                        || action.getStatus() == WorkflowActionBean.Status.START_MANUAL
162                        || action.getStatus() == WorkflowActionBean.Status.END_RETRY
163                        || action.getStatus() == WorkflowActionBean.Status.END_MANUAL
164                        || action.getStatus() == WorkflowActionBean.Status.USER_RETRY) {
165
166                    action.setStatus(WorkflowActionBean.Status.KILLED);
167                    action.resetPending();
168                    SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(action.getSlaXml(), action.getId(),
169                            Status.KILLED, SlaAppType.WORKFLOW_ACTION);
170                    if(slaEvent != null) {
171                        insertList.add(slaEvent);
172                    }
173                    updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_STATUS_PENDING, action));
174                    if (EventHandlerService.isEnabled()
175                            && !(actionService.getExecutor(action.getType()) instanceof ControlNodeActionExecutor)) {
176                        generateEvent(action, wfJob.getUser());
177                    }
178                }
179            }
180            wfJob.setLastModifiedTime(new Date());
181            updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_END, wfJob));
182            BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null);
183            if (EventHandlerService.isEnabled()) {
184                generateEvent(wfJob);
185            }
186            queue(new WorkflowNotificationXCommand(wfJob));
187        }
188        catch (JPAExecutorException e) {
189            throw new CommandException(e);
190        }
191        finally {
192            if(wfJob.getStatus() == WorkflowJob.Status.KILLED) {
193                 new WfEndXCommand(wfJob).call(); //To delete the WF temp dir
194            }
195            updateParentIfNecessary(wfJob);
196        }
197
198        LOG.info("ENDED WorkflowKillXCommand for jobId=" + wfId);
199        return null;
200    }
201
202}