001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.wf;
019    
020    import org.apache.oozie.client.WorkflowJob;
021    import org.apache.oozie.client.SLAEvent.SlaAppType;
022    import org.apache.oozie.client.SLAEvent.Status;
023    import org.apache.oozie.ErrorCode;
024    import org.apache.oozie.WorkflowActionBean;
025    import org.apache.oozie.WorkflowJobBean;
026    import org.apache.oozie.XException;
027    import org.apache.oozie.command.CommandException;
028    import org.apache.oozie.command.PreconditionException;
029    import org.apache.oozie.command.coord.CoordActionUpdateXCommand;
030    import org.apache.oozie.executor.jpa.JPAExecutorException;
031    import org.apache.oozie.executor.jpa.WorkflowActionUpdateJPAExecutor;
032    import org.apache.oozie.executor.jpa.WorkflowActionsGetForJobJPAExecutor;
033    import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
034    import org.apache.oozie.executor.jpa.WorkflowJobUpdateJPAExecutor;
035    import org.apache.oozie.service.JPAService;
036    import org.apache.oozie.service.Services;
037    import org.apache.oozie.workflow.WorkflowException;
038    import org.apache.oozie.workflow.WorkflowInstance;
039    import org.apache.oozie.workflow.lite.LiteWorkflowInstance;
040    import org.apache.oozie.util.InstrumentUtils;
041    import org.apache.oozie.util.LogUtils;
042    import org.apache.oozie.util.ParamChecker;
043    import org.apache.oozie.util.db.SLADbXOperations;
044    
045    import java.util.Date;
046    import java.util.List;
047    
048    /**
049     * Kill workflow job and its workflow instance and queue a {@link WorkflowActionKillXCommand} to kill the workflow
050     * actions.
051     */
052    public class KillXCommand extends WorkflowXCommand<Void> {
053    
054        private String wfId;
055        private WorkflowJobBean wfJob;
056        private List<WorkflowActionBean> actionList;
057        private JPAService jpaService = null;
058    
059        public KillXCommand(String wfId) {
060            super("kill", "kill", 1);
061            this.wfId = ParamChecker.notEmpty(wfId, "wfId");
062        }
063    
064        @Override
065        protected boolean isLockRequired() {
066            return true;
067        }
068    
069        @Override
070        protected String getEntityKey() {
071            return this.wfId;
072        }
073    
074        @Override
075        protected void loadState() throws CommandException {
076            try {
077                jpaService = Services.get().get(JPAService.class);
078                if (jpaService != null) {
079                    this.wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(wfId));
080                    this.actionList = jpaService.execute(new WorkflowActionsGetForJobJPAExecutor(wfId));
081                    LogUtils.setLogInfo(wfJob, logInfo);
082                }
083                else {
084                    throw new CommandException(ErrorCode.E0610);
085                }
086            }
087            catch (XException ex) {
088                throw new CommandException(ex);
089            }
090        }
091    
092        @Override
093        protected void verifyPrecondition() throws CommandException, PreconditionException {
094            if (wfJob.getStatus() != WorkflowJob.Status.PREP && wfJob.getStatus() != WorkflowJob.Status.RUNNING
095                    && wfJob.getStatus() != WorkflowJob.Status.SUSPENDED && wfJob.getStatus() != WorkflowJob.Status.FAILED) {
096                throw new PreconditionException(ErrorCode.E0725, wfJob.getId());
097            }
098        }
099    
100        @Override
101        protected Void execute() throws CommandException {
102            LOG.info("STARTED WorkflowKillXCommand for jobId=" + wfId);
103    
104            wfJob.setEndTime(new Date());
105    
106            if (wfJob.getStatus() != WorkflowJob.Status.FAILED) {
107                InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
108                wfJob.setStatus(WorkflowJob.Status.KILLED);
109                SLADbXOperations.writeStausEvent(wfJob.getSlaXml(), wfJob.getId(), Status.KILLED, SlaAppType.WORKFLOW_JOB);
110                try {
111                    wfJob.getWorkflowInstance().kill();
112                }
113                catch (WorkflowException e) {
114                    throw new CommandException(ErrorCode.E0725, e.getMessage(), e);
115                }
116                WorkflowInstance wfInstance = wfJob.getWorkflowInstance();
117                ((LiteWorkflowInstance) wfInstance).setStatus(WorkflowInstance.Status.KILLED);
118                wfJob.setWorkflowInstance(wfInstance);
119            }
120            try {
121                for (WorkflowActionBean action : actionList) {
122                    if (action.getStatus() == WorkflowActionBean.Status.RUNNING
123                            || action.getStatus() == WorkflowActionBean.Status.DONE) {
124                        action.setPending();
125                        action.setStatus(WorkflowActionBean.Status.KILLED);
126    
127                        jpaService.execute(new WorkflowActionUpdateJPAExecutor(action));
128    
129                        queue(new ActionKillXCommand(action.getId(), action.getType()));
130                    }
131                    if (action.getStatus() == WorkflowActionBean.Status.PREP
132                            || action.getStatus() == WorkflowActionBean.Status.START_RETRY
133                            || action.getStatus() == WorkflowActionBean.Status.START_MANUAL
134                            || action.getStatus() == WorkflowActionBean.Status.END_RETRY
135                            || action.getStatus() == WorkflowActionBean.Status.END_MANUAL) {
136    
137                        action.setStatus(WorkflowActionBean.Status.KILLED);
138                        action.resetPending();
139                        SLADbXOperations.writeStausEvent(action.getSlaXml(), action.getId(), Status.KILLED,
140                                SlaAppType.WORKFLOW_ACTION);
141                        jpaService.execute(new WorkflowActionUpdateJPAExecutor(action));
142                    }
143                }
144                jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
145                queue(new NotificationXCommand(wfJob));
146            }
147            catch (JPAExecutorException je) {
148                throw new CommandException(je);
149            }
150            finally {
151                if(wfJob.getStatus() == WorkflowJob.Status.KILLED) {
152                     new WfEndXCommand(wfJob).call(); //To delete the WF temp dir
153                }
154                // update coordinator action
155                new CoordActionUpdateXCommand(wfJob).call();
156            }
157    
158            LOG.info("ENDED WorkflowKillXCommand for jobId=" + wfId);
159            return null;
160        }
161    
162    }