This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.wf;
019
020 import java.util.ArrayList;
021 import java.util.Date;
022 import java.util.List;
023
024 import org.apache.oozie.ErrorCode;
025 import org.apache.oozie.SLAEventBean;
026 import org.apache.oozie.WorkflowActionBean;
027 import org.apache.oozie.WorkflowJobBean;
028 import org.apache.oozie.XException;
029 import org.apache.oozie.client.SLAEvent.SlaAppType;
030 import org.apache.oozie.client.SLAEvent.Status;
031 import org.apache.oozie.client.rest.JsonBean;
032 import org.apache.oozie.command.CommandException;
033 import org.apache.oozie.command.PreconditionException;
034 import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
035 import org.apache.oozie.executor.jpa.JPAExecutorException;
036 import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
037 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
038 import org.apache.oozie.action.ActionExecutor;
039 import org.apache.oozie.action.ActionExecutorException;
040 import org.apache.oozie.action.control.ControlNodeActionExecutor;
041 import org.apache.oozie.service.ActionService;
042 import org.apache.oozie.service.EventHandlerService;
043 import org.apache.oozie.service.JPAService;
044 import org.apache.oozie.service.UUIDService;
045 import org.apache.oozie.service.Services;
046 import org.apache.oozie.util.LogUtils;
047 import org.apache.oozie.util.Instrumentation;
048 import org.apache.oozie.util.db.SLADbXOperations;
049
050 /**
051 * Kill workflow action and invoke action executor to kill the underlying context.
052 *
053 */
054 @SuppressWarnings("deprecation")
055 public class ActionKillXCommand extends ActionXCommand<Void> {
056 private String actionId;
057 private String jobId;
058 private WorkflowJobBean wfJob;
059 private WorkflowActionBean wfAction;
060 private JPAService jpaService = null;
061 private List<JsonBean> updateList = new ArrayList<JsonBean>();
062 private List<JsonBean> insertList = new ArrayList<JsonBean>();
063
064 public ActionKillXCommand(String actionId, String type) {
065 super("action.kill", type, 0);
066 this.actionId = actionId;
067 this.jobId = Services.get().get(UUIDService.class).getId(actionId);
068 }
069
070 public ActionKillXCommand(String actionId) {
071 this(actionId, "action.kill");
072 }
073
074 @Override
075 protected boolean isLockRequired() {
076 return true;
077 }
078
079 @Override
080 public String getEntityKey() {
081 return this.jobId;
082 }
083
084 @Override
085 public String getKey() {
086 return getName() + "_" + this.actionId;
087 }
088
089 @Override
090 protected void loadState() throws CommandException {
091 try {
092 jpaService = Services.get().get(JPAService.class);
093
094 if (jpaService != null) {
095 this.wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(jobId));
096 this.wfAction = jpaService.execute(new WorkflowActionGetJPAExecutor(actionId));
097 LogUtils.setLogInfo(wfJob, logInfo);
098 LogUtils.setLogInfo(wfAction, logInfo);
099 }
100 else {
101 throw new CommandException(ErrorCode.E0610);
102 }
103 }
104 catch (XException ex) {
105 throw new CommandException(ex);
106 }
107 }
108
109 @Override
110 protected void verifyPrecondition() throws CommandException, PreconditionException {
111 if (wfAction.getStatus() != WorkflowActionBean.Status.KILLED) {
112 throw new PreconditionException(ErrorCode.E0726, wfAction.getId());
113 }
114 }
115
116 @Override
117 protected Void execute() throws CommandException {
118 LOG.debug("STARTED WorkflowActionKillXCommand for action " + actionId);
119
120 if (wfAction.isPending()) {
121 ActionExecutor executor = Services.get().get(ActionService.class).getExecutor(wfAction.getType());
122 if (executor != null) {
123 try {
124 boolean isRetry = false;
125 boolean isUserRetry = false;
126 ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction,
127 isRetry, isUserRetry);
128 incrActionCounter(wfAction.getType(), 1);
129
130 Instrumentation.Cron cron = new Instrumentation.Cron();
131 cron.start();
132 executor.kill(context, wfAction);
133 cron.stop();
134 addActionCron(wfAction.getType(), cron);
135
136 wfAction.resetPending();
137 wfAction.setStatus(WorkflowActionBean.Status.KILLED);
138 wfAction.setEndTime(new Date());
139
140 updateList.add(wfAction);
141 wfJob.setLastModifiedTime(new Date());
142 updateList.add(wfJob);
143 // Add SLA status event (KILLED) for WF_ACTION
144 SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), Status.KILLED,
145 SlaAppType.WORKFLOW_ACTION);
146 if(slaEvent != null) {
147 insertList.add(slaEvent);
148 }
149 queue(new NotificationXCommand(wfJob, wfAction));
150 }
151 catch (ActionExecutorException ex) {
152 wfAction.resetPending();
153 wfAction.setStatus(WorkflowActionBean.Status.FAILED);
154 wfAction.setErrorInfo(ex.getErrorCode().toString(),
155 "KILL COMMAND FAILED - exception while executing job kill");
156 wfAction.setEndTime(new Date());
157
158 wfJob.setStatus(WorkflowJobBean.Status.KILLED);
159 updateList.add(wfAction);
160 wfJob.setLastModifiedTime(new Date());
161 updateList.add(wfJob);
162 // What will happen to WF and COORD_ACTION, NOTIFICATION?
163 SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), Status.FAILED,
164 SlaAppType.WORKFLOW_ACTION);
165 if(slaEvent != null) {
166 insertList.add(slaEvent);
167 }
168 LOG.warn("Exception while executing kill(). Error Code [{0}], Message[{1}]",
169 ex.getErrorCode(), ex.getMessage(), ex);
170 }
171 finally {
172 try {
173 jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
174 if (!(executor instanceof ControlNodeActionExecutor) && EventHandlerService.isEnabled()) {
175 generateEvent(wfAction, wfJob.getUser());
176 }
177 }
178 catch (JPAExecutorException e) {
179 throw new CommandException(e);
180 }
181 }
182 }
183 }
184 LOG.debug("ENDED WorkflowActionKillXCommand for action " + actionId);
185 return null;
186 }
187
188 }