001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.wf; 019 020 import org.apache.oozie.action.control.ControlNodeActionExecutor; 021 import org.apache.oozie.client.WorkflowJob; 022 import org.apache.oozie.client.SLAEvent.SlaAppType; 023 import org.apache.oozie.client.SLAEvent.Status; 024 import org.apache.oozie.client.rest.JsonBean; 025 import org.apache.oozie.ErrorCode; 026 import org.apache.oozie.SLAEventBean; 027 import org.apache.oozie.WorkflowActionBean; 028 import org.apache.oozie.WorkflowJobBean; 029 import org.apache.oozie.XException; 030 import org.apache.oozie.command.CommandException; 031 import org.apache.oozie.command.PreconditionException; 032 import org.apache.oozie.command.coord.CoordActionUpdateXCommand; 033 import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor; 034 import org.apache.oozie.executor.jpa.JPAExecutorException; 035 import org.apache.oozie.executor.jpa.WorkflowActionsGetForJobJPAExecutor; 036 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor; 037 import org.apache.oozie.service.ActionService; 038 import org.apache.oozie.service.EventHandlerService; 039 import org.apache.oozie.service.JPAService; 040 import org.apache.oozie.service.Services; 041 import org.apache.oozie.workflow.WorkflowException; 042 import org.apache.oozie.workflow.WorkflowInstance; 043 import org.apache.oozie.workflow.lite.LiteWorkflowInstance; 044 import org.apache.oozie.util.InstrumentUtils; 045 import org.apache.oozie.util.LogUtils; 046 import org.apache.oozie.util.ParamChecker; 047 import org.apache.oozie.util.db.SLADbXOperations; 048 049 import java.util.ArrayList; 050 import java.util.Date; 051 import java.util.List; 052 053 /** 054 * Kill workflow job and its workflow instance and queue a {@link WorkflowActionKillXCommand} to kill the workflow 055 * actions. 056 */ 057 @SuppressWarnings("deprecation") 058 public class KillXCommand extends WorkflowXCommand<Void> { 059 060 private String wfId; 061 private WorkflowJobBean wfJob; 062 private List<WorkflowActionBean> actionList; 063 private ActionService actionService; 064 private JPAService jpaService = null; 065 private List<JsonBean> updateList = new ArrayList<JsonBean>(); 066 private List<JsonBean> insertList = new ArrayList<JsonBean>(); 067 068 public KillXCommand(String wfId) { 069 super("kill", "kill", 1); 070 this.wfId = ParamChecker.notEmpty(wfId, "wfId"); 071 } 072 073 @Override 074 protected boolean isLockRequired() { 075 return true; 076 } 077 078 @Override 079 public String getEntityKey() { 080 return this.wfId; 081 } 082 083 @Override 084 public String getKey() { 085 return getName() + "_" + this.wfId; 086 } 087 088 @Override 089 protected void loadState() throws CommandException { 090 try { 091 jpaService = Services.get().get(JPAService.class); 092 if (jpaService != null) { 093 this.wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(wfId)); 094 this.actionList = jpaService.execute(new WorkflowActionsGetForJobJPAExecutor(wfId)); 095 LogUtils.setLogInfo(wfJob, logInfo); 096 } 097 else { 098 throw new CommandException(ErrorCode.E0610); 099 } 100 actionService = Services.get().get(ActionService.class); 101 } 102 catch (XException ex) { 103 throw new CommandException(ex); 104 } 105 } 106 107 @Override 108 protected void verifyPrecondition() throws CommandException, PreconditionException { 109 if (wfJob.getStatus() != WorkflowJob.Status.PREP && wfJob.getStatus() != WorkflowJob.Status.RUNNING 110 && wfJob.getStatus() != WorkflowJob.Status.SUSPENDED && wfJob.getStatus() != WorkflowJob.Status.FAILED) { 111 throw new PreconditionException(ErrorCode.E0725, wfJob.getId()); 112 } 113 } 114 115 @Override 116 protected Void execute() throws CommandException { 117 LOG.info("STARTED WorkflowKillXCommand for jobId=" + wfId); 118 119 wfJob.setEndTime(new Date()); 120 121 if (wfJob.getStatus() != WorkflowJob.Status.FAILED) { 122 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation()); 123 wfJob.setStatus(WorkflowJob.Status.KILLED); 124 SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfJob.getSlaXml(), wfJob.getId(), 125 Status.KILLED, SlaAppType.WORKFLOW_JOB); 126 if(slaEvent != null) { 127 insertList.add(slaEvent); 128 } 129 try { 130 wfJob.getWorkflowInstance().kill(); 131 } 132 catch (WorkflowException e) { 133 throw new CommandException(ErrorCode.E0725, e.getMessage(), e); 134 } 135 WorkflowInstance wfInstance = wfJob.getWorkflowInstance(); 136 ((LiteWorkflowInstance) wfInstance).setStatus(WorkflowInstance.Status.KILLED); 137 wfJob.setWorkflowInstance(wfInstance); 138 } 139 try { 140 for (WorkflowActionBean action : actionList) { 141 if (action.getStatus() == WorkflowActionBean.Status.RUNNING 142 || action.getStatus() == WorkflowActionBean.Status.DONE) { 143 action.setPending(); 144 action.setStatus(WorkflowActionBean.Status.KILLED); 145 146 updateList.add(action); 147 148 queue(new ActionKillXCommand(action.getId(), action.getType())); 149 } 150 else if (action.getStatus() == WorkflowActionBean.Status.PREP 151 || action.getStatus() == WorkflowActionBean.Status.START_RETRY 152 || action.getStatus() == WorkflowActionBean.Status.START_MANUAL 153 || action.getStatus() == WorkflowActionBean.Status.END_RETRY 154 || action.getStatus() == WorkflowActionBean.Status.END_MANUAL) { 155 156 action.setStatus(WorkflowActionBean.Status.KILLED); 157 action.resetPending(); 158 SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(action.getSlaXml(), action.getId(), 159 Status.KILLED, SlaAppType.WORKFLOW_ACTION); 160 if(slaEvent != null) { 161 insertList.add(slaEvent); 162 } 163 updateList.add(action); 164 if (EventHandlerService.isEnabled() 165 && !(actionService.getExecutor(action.getType()) instanceof ControlNodeActionExecutor)) { 166 generateEvent(action, wfJob.getUser()); 167 } 168 } 169 } 170 wfJob.setLastModifiedTime(new Date()); 171 updateList.add(wfJob); 172 jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList)); 173 if (EventHandlerService.isEnabled()) { 174 generateEvent(wfJob); 175 } 176 queue(new NotificationXCommand(wfJob)); 177 } 178 catch (JPAExecutorException e) { 179 throw new CommandException(e); 180 } 181 finally { 182 if(wfJob.getStatus() == WorkflowJob.Status.KILLED) { 183 new WfEndXCommand(wfJob).call(); //To delete the WF temp dir 184 } 185 // update coordinator action 186 new CoordActionUpdateXCommand(wfJob).call(); 187 } 188 189 LOG.info("ENDED WorkflowKillXCommand for jobId=" + wfId); 190 return null; 191 } 192 193 }