001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.wf; 019 020 import java.util.ArrayList; 021 import java.util.Date; 022 import java.util.List; 023 024 import org.apache.oozie.ErrorCode; 025 import org.apache.oozie.SLAEventBean; 026 import org.apache.oozie.WorkflowActionBean; 027 import org.apache.oozie.WorkflowJobBean; 028 import org.apache.oozie.XException; 029 import org.apache.oozie.client.SLAEvent.SlaAppType; 030 import org.apache.oozie.client.SLAEvent.Status; 031 import org.apache.oozie.client.rest.JsonBean; 032 import org.apache.oozie.command.CommandException; 033 import org.apache.oozie.command.PreconditionException; 034 import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor; 035 import org.apache.oozie.executor.jpa.JPAExecutorException; 036 import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor; 037 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor; 038 import org.apache.oozie.action.ActionExecutor; 039 import org.apache.oozie.action.ActionExecutorException; 040 import org.apache.oozie.service.ActionService; 041 import org.apache.oozie.service.JPAService; 042 import org.apache.oozie.service.UUIDService; 043 import org.apache.oozie.service.Services; 044 import org.apache.oozie.util.LogUtils; 045 import org.apache.oozie.util.Instrumentation; 046 import org.apache.oozie.util.db.SLADbXOperations; 047 048 /** 049 * Kill workflow action and invoke action executor to kill the underlying context. 050 * 051 */ 052 public class ActionKillXCommand extends ActionXCommand<Void> { 053 private String actionId; 054 private String jobId; 055 private WorkflowJobBean wfJob; 056 private WorkflowActionBean wfAction; 057 private JPAService jpaService = null; 058 private List<JsonBean> updateList = new ArrayList<JsonBean>(); 059 private List<JsonBean> insertList = new ArrayList<JsonBean>(); 060 061 public ActionKillXCommand(String actionId, String type) { 062 super("action.kill", type, 0); 063 this.actionId = actionId; 064 this.jobId = Services.get().get(UUIDService.class).getId(actionId); 065 } 066 067 public ActionKillXCommand(String actionId) { 068 this(actionId, "action.kill"); 069 } 070 071 @Override 072 protected boolean isLockRequired() { 073 return true; 074 } 075 076 @Override 077 public String getEntityKey() { 078 return this.jobId; 079 } 080 081 @Override 082 protected void loadState() throws CommandException { 083 try { 084 jpaService = Services.get().get(JPAService.class); 085 086 if (jpaService != null) { 087 this.wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(jobId)); 088 this.wfAction = jpaService.execute(new WorkflowActionGetJPAExecutor(actionId)); 089 LogUtils.setLogInfo(wfJob, logInfo); 090 LogUtils.setLogInfo(wfAction, logInfo); 091 } 092 else { 093 throw new CommandException(ErrorCode.E0610); 094 } 095 } 096 catch (XException ex) { 097 throw new CommandException(ex); 098 } 099 } 100 101 @Override 102 protected void verifyPrecondition() throws CommandException, PreconditionException { 103 if (wfAction.getStatus() != WorkflowActionBean.Status.KILLED) { 104 throw new PreconditionException(ErrorCode.E0726, wfAction.getId()); 105 } 106 } 107 108 @Override 109 protected Void execute() throws CommandException { 110 LOG.debug("STARTED WorkflowActionKillXCommand for action " + actionId); 111 112 if (wfAction.isPending()) { 113 ActionExecutor executor = Services.get().get(ActionService.class).getExecutor(wfAction.getType()); 114 if (executor != null) { 115 try { 116 boolean isRetry = false; 117 boolean isUserRetry = false; 118 ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, 119 isRetry, isUserRetry); 120 incrActionCounter(wfAction.getType(), 1); 121 122 Instrumentation.Cron cron = new Instrumentation.Cron(); 123 cron.start(); 124 executor.kill(context, wfAction); 125 cron.stop(); 126 addActionCron(wfAction.getType(), cron); 127 128 wfAction.resetPending(); 129 wfAction.setStatus(WorkflowActionBean.Status.KILLED); 130 131 updateList.add(wfAction); 132 wfJob.setLastModifiedTime(new Date()); 133 updateList.add(wfJob); 134 // Add SLA status event (KILLED) for WF_ACTION 135 SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), Status.KILLED, 136 SlaAppType.WORKFLOW_ACTION); 137 if(slaEvent != null) { 138 insertList.add(slaEvent); 139 } 140 queue(new NotificationXCommand(wfJob, wfAction)); 141 } 142 catch (ActionExecutorException ex) { 143 wfAction.resetPending(); 144 wfAction.setStatus(WorkflowActionBean.Status.FAILED); 145 wfAction.setErrorInfo(ex.getErrorCode().toString(), 146 "KILL COMMAND FAILED - exception while executing job kill"); 147 wfJob.setStatus(WorkflowJobBean.Status.KILLED); 148 updateList.add(wfAction); 149 wfJob.setLastModifiedTime(new Date()); 150 updateList.add(wfJob); 151 // What will happen to WF and COORD_ACTION, NOTIFICATION? 152 SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), Status.FAILED, 153 SlaAppType.WORKFLOW_ACTION); 154 if(slaEvent != null) { 155 insertList.add(slaEvent); 156 } 157 LOG.warn("Exception while executing kill(). Error Code [{0}], Message[{1}]", 158 ex.getErrorCode(), ex.getMessage(), ex); 159 } 160 finally { 161 try { 162 jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList)); 163 } 164 catch (JPAExecutorException e) { 165 throw new CommandException(e); 166 } 167 } 168 } 169 } 170 LOG.debug("ENDED WorkflowActionKillXCommand for action " + actionId); 171 return null; 172 } 173 174 }