001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.command.wf; 019 020import org.apache.oozie.action.control.ControlNodeActionExecutor; 021import org.apache.oozie.client.WorkflowJob; 022import org.apache.oozie.client.SLAEvent.SlaAppType; 023import org.apache.oozie.client.SLAEvent.Status; 024import org.apache.oozie.client.rest.JsonBean; 025import org.apache.oozie.ErrorCode; 026import org.apache.oozie.SLAEventBean; 027import org.apache.oozie.WorkflowActionBean; 028import org.apache.oozie.WorkflowJobBean; 029import org.apache.oozie.XException; 030import org.apache.oozie.command.CommandException; 031import org.apache.oozie.command.PreconditionException; 032import org.apache.oozie.executor.jpa.BatchQueryExecutor; 033import org.apache.oozie.executor.jpa.JPAExecutorException; 034import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery; 035import org.apache.oozie.executor.jpa.WorkflowActionsGetForJobJPAExecutor; 036import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor; 037import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery; 038import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry; 039import org.apache.oozie.service.ActionService; 040import org.apache.oozie.service.EventHandlerService; 041import org.apache.oozie.service.JPAService; 042import org.apache.oozie.service.Services; 043import org.apache.oozie.workflow.WorkflowException; 044import org.apache.oozie.workflow.WorkflowInstance; 045import org.apache.oozie.workflow.lite.LiteWorkflowInstance; 046import org.apache.oozie.util.InstrumentUtils; 047import org.apache.oozie.util.LogUtils; 048import org.apache.oozie.util.ParamChecker; 049import org.apache.oozie.util.db.SLADbXOperations; 050 051import java.util.ArrayList; 052import java.util.Date; 053import java.util.List; 054 055/** 056 * Kill workflow job and its workflow instance and queue a {@link WorkflowActionKillXCommand} to kill the workflow 057 * actions. 058 */ 059@SuppressWarnings("deprecation") 060public class KillXCommand extends WorkflowXCommand<Void> { 061 062 private String wfId; 063 private WorkflowJobBean wfJob; 064 private List<WorkflowActionBean> actionList; 065 private ActionService actionService; 066 private JPAService jpaService = null; 067 private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>(); 068 private List<JsonBean> insertList = new ArrayList<JsonBean>(); 069 070 public KillXCommand(String wfId) { 071 super("kill", "kill", 1); 072 this.wfId = ParamChecker.notEmpty(wfId, "wfId"); 073 } 074 075 @Override 076 protected boolean isLockRequired() { 077 return true; 078 } 079 080 @Override 081 public String getEntityKey() { 082 return this.wfId; 083 } 084 085 @Override 086 public String getKey() { 087 return getName() + "_" + this.wfId; 088 } 089 090 @Override 091 protected void loadState() throws CommandException { 092 try { 093 jpaService = Services.get().get(JPAService.class); 094 if (jpaService != null) { 095 this.wfJob = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_KILL, wfId); 096 this.actionList = jpaService.execute(new WorkflowActionsGetForJobJPAExecutor(wfId)); 097 LogUtils.setLogInfo(wfJob, logInfo); 098 } 099 else { 100 throw new CommandException(ErrorCode.E0610); 101 } 102 actionService = Services.get().get(ActionService.class); 103 } 104 catch (XException ex) { 105 throw new CommandException(ex); 106 } 107 } 108 109 @Override 110 protected void verifyPrecondition() throws CommandException, PreconditionException { 111 if (wfJob.getStatus() != WorkflowJob.Status.PREP && wfJob.getStatus() != WorkflowJob.Status.RUNNING 112 && wfJob.getStatus() != WorkflowJob.Status.SUSPENDED && wfJob.getStatus() != WorkflowJob.Status.FAILED) { 113 throw new PreconditionException(ErrorCode.E0725, wfJob.getId()); 114 } 115 } 116 117 @Override 118 protected Void execute() throws CommandException { 119 LOG.info("STARTED WorkflowKillXCommand for jobId=" + wfId); 120 121 wfJob.setEndTime(new Date()); 122 123 if (wfJob.getStatus() != WorkflowJob.Status.FAILED) { 124 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation()); 125 wfJob.setStatus(WorkflowJob.Status.KILLED); 126 SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfJob.getSlaXml(), wfJob.getId(), 127 Status.KILLED, SlaAppType.WORKFLOW_JOB); 128 if(slaEvent != null) { 129 insertList.add(slaEvent); 130 } 131 try { 132 wfJob.getWorkflowInstance().kill(); 133 } 134 catch (WorkflowException e) { 135 throw new CommandException(ErrorCode.E0725, e.getMessage(), e); 136 } 137 WorkflowInstance wfInstance = wfJob.getWorkflowInstance(); 138 ((LiteWorkflowInstance) wfInstance).setStatus(WorkflowInstance.Status.KILLED); 139 wfJob.setWorkflowInstance(wfInstance); 140 } 141 try { 142 for (WorkflowActionBean action : actionList) { 143 if (action.getStatus() == WorkflowActionBean.Status.RUNNING 144 || action.getStatus() == WorkflowActionBean.Status.DONE) { 145 if (!(actionService.getExecutor(action.getType()) instanceof ControlNodeActionExecutor)) { 146 action.setPending(); 147 } 148 action.setStatus(WorkflowActionBean.Status.KILLED); 149 updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_STATUS_PENDING, action)); 150 151 queue(new ActionKillXCommand(action.getId(), action.getType())); 152 } 153 else if (action.getStatus() == WorkflowActionBean.Status.PREP 154 || action.getStatus() == WorkflowActionBean.Status.START_RETRY 155 || action.getStatus() == WorkflowActionBean.Status.START_MANUAL 156 || action.getStatus() == WorkflowActionBean.Status.END_RETRY 157 || action.getStatus() == WorkflowActionBean.Status.END_MANUAL 158 || action.getStatus() == WorkflowActionBean.Status.USER_RETRY) { 159 160 action.setStatus(WorkflowActionBean.Status.KILLED); 161 action.resetPending(); 162 SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(action.getSlaXml(), action.getId(), 163 Status.KILLED, SlaAppType.WORKFLOW_ACTION); 164 if(slaEvent != null) { 165 insertList.add(slaEvent); 166 } 167 updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_STATUS_PENDING, action)); 168 if (EventHandlerService.isEnabled() 169 && !(actionService.getExecutor(action.getType()) instanceof ControlNodeActionExecutor)) { 170 generateEvent(action, wfJob.getUser()); 171 } 172 } 173 } 174 wfJob.setLastModifiedTime(new Date()); 175 updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_END, wfJob)); 176 BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null); 177 if (EventHandlerService.isEnabled()) { 178 generateEvent(wfJob); 179 } 180 queue(new NotificationXCommand(wfJob)); 181 } 182 catch (JPAExecutorException e) { 183 throw new CommandException(e); 184 } 185 finally { 186 if(wfJob.getStatus() == WorkflowJob.Status.KILLED) { 187 new WfEndXCommand(wfJob).call(); //To delete the WF temp dir 188 } 189 updateParentIfNecessary(wfJob); 190 } 191 192 LOG.info("ENDED WorkflowKillXCommand for jobId=" + wfId); 193 return null; 194 } 195 196}