001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.command.wf; 020 021import org.apache.oozie.action.control.ControlNodeActionExecutor; 022import org.apache.oozie.client.WorkflowJob; 023import org.apache.oozie.client.SLAEvent.SlaAppType; 024import org.apache.oozie.client.SLAEvent.Status; 025import org.apache.oozie.client.rest.JsonBean; 026import org.apache.oozie.ErrorCode; 027import org.apache.oozie.SLAEventBean; 028import org.apache.oozie.WorkflowActionBean; 029import org.apache.oozie.WorkflowJobBean; 030import org.apache.oozie.XException; 031import org.apache.oozie.command.CommandException; 032import org.apache.oozie.command.PreconditionException; 033import org.apache.oozie.executor.jpa.BatchQueryExecutor; 034import org.apache.oozie.executor.jpa.JPAExecutorException; 035import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery; 036import org.apache.oozie.executor.jpa.WorkflowActionsGetForJobJPAExecutor; 037import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor; 038import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery; 039import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry; 040import org.apache.oozie.service.ActionService; 041import org.apache.oozie.service.EventHandlerService; 042import org.apache.oozie.service.JPAService; 043import org.apache.oozie.service.Services; 044import org.apache.oozie.workflow.WorkflowException; 045import org.apache.oozie.workflow.WorkflowInstance; 046import org.apache.oozie.workflow.lite.LiteWorkflowInstance; 047import org.apache.oozie.util.InstrumentUtils; 048import org.apache.oozie.util.LogUtils; 049import org.apache.oozie.util.ParamChecker; 050import org.apache.oozie.util.db.SLADbXOperations; 051 052import java.util.ArrayList; 053import java.util.Date; 054import java.util.List; 055 056/** 057 * Kill workflow job and its workflow instance and queue a {@link ActionKillXCommand} to kill the workflow 058 * actions. 059 */ 060@SuppressWarnings("deprecation") 061public class KillXCommand extends WorkflowXCommand<Void> { 062 063 private String wfId; 064 private WorkflowJobBean wfJob; 065 private List<WorkflowActionBean> actionList; 066 private ActionService actionService; 067 private JPAService jpaService = null; 068 private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>(); 069 private List<JsonBean> insertList = new ArrayList<JsonBean>(); 070 071 public KillXCommand(String wfId) { 072 super("kill", "kill", 1); 073 this.wfId = ParamChecker.notEmpty(wfId, "wfId"); 074 } 075 076 @Override 077 protected void setLogInfo() { 078 LogUtils.setLogInfo(wfId); 079 } 080 081 @Override 082 protected boolean isLockRequired() { 083 return true; 084 } 085 086 @Override 087 public String getEntityKey() { 088 return this.wfId; 089 } 090 091 @Override 092 public String getKey() { 093 return getName() + "_" + this.wfId; 094 } 095 096 @Override 097 protected void loadState() throws CommandException { 098 try { 099 jpaService = Services.get().get(JPAService.class); 100 if (jpaService != null) { 101 this.wfJob = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_KILL, wfId); 102 this.actionList = jpaService.execute(new WorkflowActionsGetForJobJPAExecutor(wfId)); 103 LogUtils.setLogInfo(wfJob); 104 } 105 else { 106 throw new CommandException(ErrorCode.E0610); 107 } 108 actionService = Services.get().get(ActionService.class); 109 } 110 catch (XException ex) { 111 throw new CommandException(ex); 112 } 113 } 114 115 @Override 116 protected void verifyPrecondition() throws CommandException, PreconditionException { 117 if (wfJob.getStatus() != WorkflowJob.Status.PREP && wfJob.getStatus() != WorkflowJob.Status.RUNNING 118 && wfJob.getStatus() != WorkflowJob.Status.SUSPENDED && wfJob.getStatus() != WorkflowJob.Status.FAILED) { 119 throw new PreconditionException(ErrorCode.E0725, wfJob.getId()); 120 } 121 } 122 123 @Override 124 protected Void execute() throws CommandException { 125 LOG.info("STARTED WorkflowKillXCommand for jobId=" + wfId); 126 127 wfJob.setEndTime(new Date()); 128 129 if (wfJob.getStatus() != WorkflowJob.Status.FAILED) { 130 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation()); 131 wfJob.setStatus(WorkflowJob.Status.KILLED); 132 SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfJob.getSlaXml(), wfJob.getId(), 133 Status.KILLED, SlaAppType.WORKFLOW_JOB); 134 if(slaEvent != null) { 135 insertList.add(slaEvent); 136 } 137 try { 138 wfJob.getWorkflowInstance().kill(); 139 } 140 catch (WorkflowException e) { 141 throw new CommandException(ErrorCode.E0725, e.getMessage(), e); 142 } 143 WorkflowInstance wfInstance = wfJob.getWorkflowInstance(); 144 ((LiteWorkflowInstance) wfInstance).setStatus(WorkflowInstance.Status.KILLED); 145 wfJob.setWorkflowInstance(wfInstance); 146 } 147 try { 148 for (WorkflowActionBean action : actionList) { 149 if (action.getStatus() == WorkflowActionBean.Status.RUNNING 150 || action.getStatus() == WorkflowActionBean.Status.DONE) { 151 if (!(actionService.getExecutor(action.getType()) instanceof ControlNodeActionExecutor)) { 152 action.setPending(); 153 } 154 action.setStatus(WorkflowActionBean.Status.KILLED); 155 updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_STATUS_PENDING, action)); 156 157 queue(new ActionKillXCommand(action.getId(), action.getType())); 158 } 159 else if (action.getStatus() == WorkflowActionBean.Status.PREP 160 || action.getStatus() == WorkflowActionBean.Status.START_RETRY 161 || action.getStatus() == WorkflowActionBean.Status.START_MANUAL 162 || action.getStatus() == WorkflowActionBean.Status.END_RETRY 163 || action.getStatus() == WorkflowActionBean.Status.END_MANUAL 164 || action.getStatus() == WorkflowActionBean.Status.USER_RETRY) { 165 166 action.setStatus(WorkflowActionBean.Status.KILLED); 167 action.resetPending(); 168 SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(action.getSlaXml(), action.getId(), 169 Status.KILLED, SlaAppType.WORKFLOW_ACTION); 170 if(slaEvent != null) { 171 insertList.add(slaEvent); 172 } 173 updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_STATUS_PENDING, action)); 174 if (EventHandlerService.isEnabled() 175 && !(actionService.getExecutor(action.getType()) instanceof ControlNodeActionExecutor)) { 176 generateEvent(action, wfJob.getUser()); 177 } 178 } 179 } 180 wfJob.setLastModifiedTime(new Date()); 181 updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_END, wfJob)); 182 BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null); 183 if (EventHandlerService.isEnabled()) { 184 generateEvent(wfJob); 185 } 186 queue(new WorkflowNotificationXCommand(wfJob)); 187 } 188 catch (JPAExecutorException e) { 189 throw new CommandException(e); 190 } 191 finally { 192 if(wfJob.getStatus() == WorkflowJob.Status.KILLED) { 193 new WfEndXCommand(wfJob).call(); //To delete the WF temp dir 194 } 195 updateParentIfNecessary(wfJob); 196 } 197 198 LOG.info("ENDED WorkflowKillXCommand for jobId=" + wfId); 199 return null; 200 } 201 202}