001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.wf; 019 020 import java.util.ArrayList; 021 import java.util.Date; 022 import java.util.List; 023 024 import org.apache.oozie.ErrorCode; 025 import org.apache.oozie.SLAEventBean; 026 import org.apache.oozie.WorkflowActionBean; 027 import org.apache.oozie.WorkflowJobBean; 028 import org.apache.oozie.XException; 029 import org.apache.oozie.client.SLAEvent.SlaAppType; 030 import org.apache.oozie.client.SLAEvent.Status; 031 import org.apache.oozie.client.rest.JsonBean; 032 import org.apache.oozie.command.CommandException; 033 import org.apache.oozie.command.PreconditionException; 034 import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor; 035 import org.apache.oozie.executor.jpa.JPAExecutorException; 036 import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor; 037 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor; 038 import org.apache.oozie.action.ActionExecutor; 039 import org.apache.oozie.action.ActionExecutorException; 040 import org.apache.oozie.action.control.ControlNodeActionExecutor; 041 import org.apache.oozie.service.ActionService; 042 import org.apache.oozie.service.EventHandlerService; 043 import org.apache.oozie.service.JPAService; 044 import org.apache.oozie.service.UUIDService; 045 import org.apache.oozie.service.Services; 046 import org.apache.oozie.util.LogUtils; 047 import org.apache.oozie.util.Instrumentation; 048 import org.apache.oozie.util.db.SLADbXOperations; 049 050 /** 051 * Kill workflow action and invoke action executor to kill the underlying context. 052 * 053 */ 054 @SuppressWarnings("deprecation") 055 public class ActionKillXCommand extends ActionXCommand<Void> { 056 private String actionId; 057 private String jobId; 058 private WorkflowJobBean wfJob; 059 private WorkflowActionBean wfAction; 060 private JPAService jpaService = null; 061 private List<JsonBean> updateList = new ArrayList<JsonBean>(); 062 private List<JsonBean> insertList = new ArrayList<JsonBean>(); 063 064 public ActionKillXCommand(String actionId, String type) { 065 super("action.kill", type, 0); 066 this.actionId = actionId; 067 this.jobId = Services.get().get(UUIDService.class).getId(actionId); 068 } 069 070 public ActionKillXCommand(String actionId) { 071 this(actionId, "action.kill"); 072 } 073 074 @Override 075 protected boolean isLockRequired() { 076 return true; 077 } 078 079 @Override 080 public String getEntityKey() { 081 return this.jobId; 082 } 083 084 @Override 085 public String getKey() { 086 return getName() + "_" + this.actionId; 087 } 088 089 @Override 090 protected void loadState() throws CommandException { 091 try { 092 jpaService = Services.get().get(JPAService.class); 093 094 if (jpaService != null) { 095 this.wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(jobId)); 096 this.wfAction = jpaService.execute(new WorkflowActionGetJPAExecutor(actionId)); 097 LogUtils.setLogInfo(wfJob, logInfo); 098 LogUtils.setLogInfo(wfAction, logInfo); 099 } 100 else { 101 throw new CommandException(ErrorCode.E0610); 102 } 103 } 104 catch (XException ex) { 105 throw new CommandException(ex); 106 } 107 } 108 109 @Override 110 protected void verifyPrecondition() throws CommandException, PreconditionException { 111 if (wfAction.getStatus() != WorkflowActionBean.Status.KILLED) { 112 throw new PreconditionException(ErrorCode.E0726, wfAction.getId()); 113 } 114 } 115 116 @Override 117 protected Void execute() throws CommandException { 118 LOG.debug("STARTED WorkflowActionKillXCommand for action " + actionId); 119 120 if (wfAction.isPending()) { 121 ActionExecutor executor = Services.get().get(ActionService.class).getExecutor(wfAction.getType()); 122 if (executor != null) { 123 try { 124 boolean isRetry = false; 125 boolean isUserRetry = false; 126 ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, 127 isRetry, isUserRetry); 128 incrActionCounter(wfAction.getType(), 1); 129 130 Instrumentation.Cron cron = new Instrumentation.Cron(); 131 cron.start(); 132 executor.kill(context, wfAction); 133 cron.stop(); 134 addActionCron(wfAction.getType(), cron); 135 136 wfAction.resetPending(); 137 wfAction.setStatus(WorkflowActionBean.Status.KILLED); 138 wfAction.setEndTime(new Date()); 139 140 updateList.add(wfAction); 141 wfJob.setLastModifiedTime(new Date()); 142 updateList.add(wfJob); 143 // Add SLA status event (KILLED) for WF_ACTION 144 SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), Status.KILLED, 145 SlaAppType.WORKFLOW_ACTION); 146 if(slaEvent != null) { 147 insertList.add(slaEvent); 148 } 149 queue(new NotificationXCommand(wfJob, wfAction)); 150 } 151 catch (ActionExecutorException ex) { 152 wfAction.resetPending(); 153 wfAction.setStatus(WorkflowActionBean.Status.FAILED); 154 wfAction.setErrorInfo(ex.getErrorCode().toString(), 155 "KILL COMMAND FAILED - exception while executing job kill"); 156 wfAction.setEndTime(new Date()); 157 158 wfJob.setStatus(WorkflowJobBean.Status.KILLED); 159 updateList.add(wfAction); 160 wfJob.setLastModifiedTime(new Date()); 161 updateList.add(wfJob); 162 // What will happen to WF and COORD_ACTION, NOTIFICATION? 163 SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), Status.FAILED, 164 SlaAppType.WORKFLOW_ACTION); 165 if(slaEvent != null) { 166 insertList.add(slaEvent); 167 } 168 LOG.warn("Exception while executing kill(). Error Code [{0}], Message[{1}]", 169 ex.getErrorCode(), ex.getMessage(), ex); 170 } 171 finally { 172 try { 173 jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList)); 174 if (!(executor instanceof ControlNodeActionExecutor) && EventHandlerService.isEnabled()) { 175 generateEvent(wfAction, wfJob.getUser()); 176 } 177 } 178 catch (JPAExecutorException e) { 179 throw new CommandException(e); 180 } 181 } 182 } 183 } 184 LOG.debug("ENDED WorkflowActionKillXCommand for action " + actionId); 185 return null; 186 } 187 188 }