001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.command.wf; 020 021import java.util.ArrayList; 022import java.util.Date; 023import java.util.List; 024 025import org.apache.hadoop.fs.FileStatus; 026import org.apache.hadoop.fs.FileSystem; 027import org.apache.hadoop.fs.Path; 028import org.apache.oozie.ErrorCode; 029import org.apache.oozie.SLAEventBean; 030import org.apache.oozie.WorkflowActionBean; 031import org.apache.oozie.WorkflowJobBean; 032import org.apache.oozie.XException; 033import org.apache.oozie.action.ActionExecutor; 034import org.apache.oozie.action.ActionExecutor.Context; 035import org.apache.oozie.action.ActionExecutorException; 036import org.apache.oozie.action.control.ControlNodeActionExecutor; 037import org.apache.oozie.client.SLAEvent.SlaAppType; 038import org.apache.oozie.client.SLAEvent.Status; 039import org.apache.oozie.client.rest.JsonBean; 040import org.apache.oozie.command.CommandException; 041import org.apache.oozie.command.PreconditionException; 042import org.apache.oozie.executor.jpa.BatchQueryExecutor; 043import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry; 044import org.apache.oozie.executor.jpa.JPAExecutorException; 045import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor; 046import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery; 047import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor; 048import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery; 049import org.apache.oozie.service.ActionService; 050import org.apache.oozie.service.EventHandlerService; 051import org.apache.oozie.service.JPAService; 052import org.apache.oozie.service.Services; 053import org.apache.oozie.service.UUIDService; 054import org.apache.oozie.util.Instrumentation; 055import org.apache.oozie.util.LogUtils; 056import org.apache.oozie.util.db.SLADbXOperations; 057 058/** 059 * Kill workflow action and invoke action executor to kill the underlying context. 060 * 061 */ 062@SuppressWarnings("deprecation") 063public class ActionKillXCommand extends ActionXCommand<Void> { 064 private String actionId; 065 private String jobId; 066 private WorkflowJobBean wfJob; 067 private WorkflowActionBean wfAction; 068 private JPAService jpaService = null; 069 private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>(); 070 private List<JsonBean> insertList = new ArrayList<JsonBean>(); 071 072 public ActionKillXCommand(String actionId, String type) { 073 super("action.kill", type, 0); 074 this.actionId = actionId; 075 this.jobId = Services.get().get(UUIDService.class).getId(actionId); 076 } 077 078 public ActionKillXCommand(String actionId) { 079 this(actionId, "action.kill"); 080 } 081 082 @Override 083 protected void setLogInfo() { 084 LogUtils.setLogInfo(actionId); 085 } 086 087 @Override 088 protected boolean isLockRequired() { 089 return true; 090 } 091 092 @Override 093 public String getEntityKey() { 094 return this.jobId; 095 } 096 097 @Override 098 public String getKey() { 099 return getName() + "_" + this.actionId; 100 } 101 102 @Override 103 protected void loadState() throws CommandException { 104 try { 105 jpaService = Services.get().get(JPAService.class); 106 107 if (jpaService != null) { 108 this.wfJob = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_ACTION_OP, jobId); 109 this.wfAction = WorkflowActionQueryExecutor.getInstance().get(WorkflowActionQuery.GET_ACTION, actionId); 110 LogUtils.setLogInfo(wfJob); 111 LogUtils.setLogInfo(wfAction); 112 } 113 else { 114 throw new CommandException(ErrorCode.E0610); 115 } 116 } 117 catch (XException ex) { 118 throw new CommandException(ex); 119 } 120 } 121 122 @Override 123 protected void verifyPrecondition() throws CommandException, PreconditionException { 124 if (wfAction.getStatus() != WorkflowActionBean.Status.KILLED) { 125 throw new PreconditionException(ErrorCode.E0726, wfAction.getId()); 126 } 127 } 128 129 @Override 130 protected Void execute() throws CommandException { 131 LOG.debug("STARTED WorkflowActionKillXCommand for action " + actionId); 132 133 if (wfAction.isPending()) { 134 ActionExecutor executor = Services.get().get(ActionService.class).getExecutor(wfAction.getType()); 135 if (executor != null) { 136 ActionExecutorContext context = null; 137 try { 138 boolean isRetry = false; 139 boolean isUserRetry = false; 140 context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, 141 isRetry, isUserRetry); 142 incrActionCounter(wfAction.getType(), 1); 143 144 Instrumentation.Cron cron = new Instrumentation.Cron(); 145 cron.start(); 146 executor.kill(context, wfAction); 147 cron.stop(); 148 addActionCron(wfAction.getType(), cron); 149 150 wfAction.resetPending(); 151 wfAction.setStatus(WorkflowActionBean.Status.KILLED); 152 wfAction.setEndTime(new Date()); 153 154 updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_END, wfAction)); 155 wfJob.setLastModifiedTime(new Date()); 156 updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_MODTIME, wfJob)); 157 // Add SLA status event (KILLED) for WF_ACTION 158 SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), Status.KILLED, 159 SlaAppType.WORKFLOW_ACTION); 160 if(slaEvent != null) { 161 insertList.add(slaEvent); 162 } 163 queue(new WorkflowNotificationXCommand(wfJob, wfAction)); 164 } 165 catch (ActionExecutorException ex) { 166 wfAction.resetPending(); 167 wfAction.setStatus(WorkflowActionBean.Status.FAILED); 168 wfAction.setErrorInfo(ex.getErrorCode().toString(), 169 "KILL COMMAND FAILED - exception while executing job kill"); 170 wfAction.setEndTime(new Date()); 171 172 wfJob.setStatus(WorkflowJobBean.Status.KILLED); 173 updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_END, wfAction)); 174 wfJob.setLastModifiedTime(new Date()); 175 updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_MODTIME, wfJob)); 176 // What will happen to WF and COORD_ACTION, NOTIFICATION? 177 SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), Status.FAILED, 178 SlaAppType.WORKFLOW_ACTION); 179 if(slaEvent != null) { 180 insertList.add(slaEvent); 181 } 182 LOG.warn("Exception while executing kill(). Error Code [{0}], Message[{1}]", 183 ex.getErrorCode(), ex.getMessage(), ex); 184 } 185 finally { 186 try { 187 cleanupActionDir(context); 188 BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null); 189 if (!(executor instanceof ControlNodeActionExecutor) && EventHandlerService.isEnabled()) { 190 generateEvent(wfAction, wfJob.getUser()); 191 } 192 } 193 catch (JPAExecutorException e) { 194 throw new CommandException(e); 195 } 196 } 197 } 198 } 199 LOG.debug("ENDED WorkflowActionKillXCommand for action " + actionId); 200 return null; 201 } 202 203 /* 204 * Cleans up the action directory 205 */ 206 private void cleanupActionDir(Context context) { 207 try { 208 FileSystem actionFs = context.getAppFileSystem(); 209 Path actionDir = context.getActionDir(); 210 Path jobDir = actionDir.getParent(); 211 if (!context.getProtoActionConf().getBoolean("oozie.action.keep.action.dir", false) 212 && actionFs.exists(actionDir)) { 213 actionFs.delete(actionDir, true); 214 } 215 if (actionFs.exists(jobDir) && actionFs.getFileStatus(jobDir).isDir()) { 216 FileStatus[] statuses = actionFs.listStatus(jobDir); 217 if (statuses == null || statuses.length == 0) { 218 actionFs.delete(jobDir, true); 219 } 220 } 221 } 222 catch (Exception e) { 223 LOG.warn("Exception while cleaning up action dir. Message[{1}]", e.getMessage(), e); 224 } 225 } 226 227}