001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.wf; 019 020 import org.apache.oozie.ErrorCode; 021 import org.apache.oozie.WorkflowActionBean; 022 import org.apache.oozie.WorkflowJobBean; 023 import org.apache.oozie.XException; 024 import org.apache.oozie.client.SLAEvent.SlaAppType; 025 import org.apache.oozie.client.SLAEvent.Status; 026 import org.apache.oozie.command.CommandException; 027 import org.apache.oozie.command.PreconditionException; 028 import org.apache.oozie.executor.jpa.JPAExecutorException; 029 import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor; 030 import org.apache.oozie.executor.jpa.WorkflowActionUpdateJPAExecutor; 031 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor; 032 import org.apache.oozie.executor.jpa.WorkflowJobUpdateJPAExecutor; 033 import org.apache.oozie.action.ActionExecutor; 034 import org.apache.oozie.action.ActionExecutorException; 035 import org.apache.oozie.service.ActionService; 036 import org.apache.oozie.service.JPAService; 037 import org.apache.oozie.service.UUIDService; 038 import org.apache.oozie.service.Services; 039 import org.apache.oozie.util.LogUtils; 040 import org.apache.oozie.util.Instrumentation; 041 import org.apache.oozie.util.db.SLADbXOperations; 042 043 /** 044 * Kill workflow action and invoke action executor to kill the underlying context. 045 * 046 */ 047 public class ActionKillXCommand extends ActionXCommand<Void> { 048 private String actionId; 049 private String jobId; 050 private WorkflowJobBean wfJob; 051 private WorkflowActionBean wfAction; 052 private JPAService jpaService = null; 053 054 public ActionKillXCommand(String actionId, String type) { 055 super("action.kill", type, 0); 056 this.actionId = actionId; 057 this.jobId = Services.get().get(UUIDService.class).getId(actionId); 058 } 059 060 public ActionKillXCommand(String actionId) { 061 this(actionId, "action.kill"); 062 } 063 064 @Override 065 protected boolean isLockRequired() { 066 return true; 067 } 068 069 @Override 070 protected String getEntityKey() { 071 return this.jobId; 072 } 073 074 @Override 075 protected void loadState() throws CommandException { 076 try { 077 jpaService = Services.get().get(JPAService.class); 078 079 if (jpaService != null) { 080 this.wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(jobId)); 081 this.wfAction = jpaService.execute(new WorkflowActionGetJPAExecutor(actionId)); 082 LogUtils.setLogInfo(wfJob, logInfo); 083 LogUtils.setLogInfo(wfAction, logInfo); 084 } 085 else { 086 throw new CommandException(ErrorCode.E0610); 087 } 088 } 089 catch (XException ex) { 090 throw new CommandException(ex); 091 } 092 } 093 094 @Override 095 protected void verifyPrecondition() throws CommandException, PreconditionException { 096 if (wfAction.getStatus() != WorkflowActionBean.Status.KILLED) { 097 throw new PreconditionException(ErrorCode.E0726, wfAction.getId()); 098 } 099 } 100 101 @Override 102 protected Void execute() throws CommandException { 103 LOG.debug("STARTED WorkflowActionKillXCommand for action " + actionId); 104 105 if (wfAction.isPending()) { 106 ActionExecutor executor = Services.get().get(ActionService.class).getExecutor(wfAction.getType()); 107 if (executor != null) { 108 try { 109 boolean isRetry = false; 110 boolean isUserRetry = false; 111 ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, 112 isRetry, isUserRetry); 113 incrActionCounter(wfAction.getType(), 1); 114 115 Instrumentation.Cron cron = new Instrumentation.Cron(); 116 cron.start(); 117 executor.kill(context, wfAction); 118 cron.stop(); 119 addActionCron(wfAction.getType(), cron); 120 121 wfAction.resetPending(); 122 wfAction.setStatus(WorkflowActionBean.Status.KILLED); 123 124 jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction)); 125 jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob)); 126 // Add SLA status event (KILLED) for WF_ACTION 127 SLADbXOperations.writeStausEvent(wfAction.getSlaXml(), wfAction.getId(), Status.KILLED, 128 SlaAppType.WORKFLOW_ACTION); 129 queue(new NotificationXCommand(wfJob, wfAction)); 130 } 131 catch (ActionExecutorException ex) { 132 wfAction.resetPending(); 133 wfAction.setStatus(WorkflowActionBean.Status.FAILED); 134 wfAction.setErrorInfo(ex.getErrorCode().toString(), 135 "KILL COMMAND FAILED - exception while executing job kill"); 136 wfJob.setStatus(WorkflowJobBean.Status.KILLED); 137 try { 138 jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction)); 139 jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob)); 140 } 141 catch (JPAExecutorException je) { 142 throw new CommandException(je); 143 } 144 // What will happen to WF and COORD_ACTION, NOTIFICATION? 145 SLADbXOperations.writeStausEvent(wfAction.getSlaXml(), wfAction.getId(), Status.FAILED, 146 SlaAppType.WORKFLOW_ACTION); 147 LOG.warn("Exception while executing kill(). Error Code [{0}], Message[{1}]", 148 ex.getErrorCode(), ex.getMessage(), ex); 149 } 150 catch (JPAExecutorException je) { 151 throw new CommandException(je); 152 } 153 } 154 } 155 LOG.debug("ENDED WorkflowActionKillXCommand for action " + actionId); 156 return null; 157 } 158 159 }