This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.wf;
019
020 import java.util.ArrayList;
021 import java.util.Date;
022 import java.util.List;
023
024 import org.apache.oozie.ErrorCode;
025 import org.apache.oozie.SLAEventBean;
026 import org.apache.oozie.WorkflowActionBean;
027 import org.apache.oozie.WorkflowJobBean;
028 import org.apache.oozie.XException;
029 import org.apache.oozie.client.SLAEvent.SlaAppType;
030 import org.apache.oozie.client.SLAEvent.Status;
031 import org.apache.oozie.client.rest.JsonBean;
032 import org.apache.oozie.command.CommandException;
033 import org.apache.oozie.command.PreconditionException;
034 import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
035 import org.apache.oozie.executor.jpa.JPAExecutorException;
036 import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
037 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
038 import org.apache.oozie.action.ActionExecutor;
039 import org.apache.oozie.action.ActionExecutorException;
040 import org.apache.oozie.service.ActionService;
041 import org.apache.oozie.service.JPAService;
042 import org.apache.oozie.service.UUIDService;
043 import org.apache.oozie.service.Services;
044 import org.apache.oozie.util.LogUtils;
045 import org.apache.oozie.util.Instrumentation;
046 import org.apache.oozie.util.db.SLADbXOperations;
047
048 /**
049 * Kill workflow action and invoke action executor to kill the underlying context.
050 *
051 */
052 public class ActionKillXCommand extends ActionXCommand<Void> {
053 private String actionId;
054 private String jobId;
055 private WorkflowJobBean wfJob;
056 private WorkflowActionBean wfAction;
057 private JPAService jpaService = null;
058 private List<JsonBean> updateList = new ArrayList<JsonBean>();
059 private List<JsonBean> insertList = new ArrayList<JsonBean>();
060
061 public ActionKillXCommand(String actionId, String type) {
062 super("action.kill", type, 0);
063 this.actionId = actionId;
064 this.jobId = Services.get().get(UUIDService.class).getId(actionId);
065 }
066
067 public ActionKillXCommand(String actionId) {
068 this(actionId, "action.kill");
069 }
070
071 @Override
072 protected boolean isLockRequired() {
073 return true;
074 }
075
076 @Override
077 public String getEntityKey() {
078 return this.jobId;
079 }
080
081 @Override
082 protected void loadState() throws CommandException {
083 try {
084 jpaService = Services.get().get(JPAService.class);
085
086 if (jpaService != null) {
087 this.wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(jobId));
088 this.wfAction = jpaService.execute(new WorkflowActionGetJPAExecutor(actionId));
089 LogUtils.setLogInfo(wfJob, logInfo);
090 LogUtils.setLogInfo(wfAction, logInfo);
091 }
092 else {
093 throw new CommandException(ErrorCode.E0610);
094 }
095 }
096 catch (XException ex) {
097 throw new CommandException(ex);
098 }
099 }
100
101 @Override
102 protected void verifyPrecondition() throws CommandException, PreconditionException {
103 if (wfAction.getStatus() != WorkflowActionBean.Status.KILLED) {
104 throw new PreconditionException(ErrorCode.E0726, wfAction.getId());
105 }
106 }
107
108 @Override
109 protected Void execute() throws CommandException {
110 LOG.debug("STARTED WorkflowActionKillXCommand for action " + actionId);
111
112 if (wfAction.isPending()) {
113 ActionExecutor executor = Services.get().get(ActionService.class).getExecutor(wfAction.getType());
114 if (executor != null) {
115 try {
116 boolean isRetry = false;
117 boolean isUserRetry = false;
118 ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction,
119 isRetry, isUserRetry);
120 incrActionCounter(wfAction.getType(), 1);
121
122 Instrumentation.Cron cron = new Instrumentation.Cron();
123 cron.start();
124 executor.kill(context, wfAction);
125 cron.stop();
126 addActionCron(wfAction.getType(), cron);
127
128 wfAction.resetPending();
129 wfAction.setStatus(WorkflowActionBean.Status.KILLED);
130
131 updateList.add(wfAction);
132 wfJob.setLastModifiedTime(new Date());
133 updateList.add(wfJob);
134 // Add SLA status event (KILLED) for WF_ACTION
135 SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), Status.KILLED,
136 SlaAppType.WORKFLOW_ACTION);
137 if(slaEvent != null) {
138 insertList.add(slaEvent);
139 }
140 queue(new NotificationXCommand(wfJob, wfAction));
141 }
142 catch (ActionExecutorException ex) {
143 wfAction.resetPending();
144 wfAction.setStatus(WorkflowActionBean.Status.FAILED);
145 wfAction.setErrorInfo(ex.getErrorCode().toString(),
146 "KILL COMMAND FAILED - exception while executing job kill");
147 wfJob.setStatus(WorkflowJobBean.Status.KILLED);
148 updateList.add(wfAction);
149 wfJob.setLastModifiedTime(new Date());
150 updateList.add(wfJob);
151 // What will happen to WF and COORD_ACTION, NOTIFICATION?
152 SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), Status.FAILED,
153 SlaAppType.WORKFLOW_ACTION);
154 if(slaEvent != null) {
155 insertList.add(slaEvent);
156 }
157 LOG.warn("Exception while executing kill(). Error Code [{0}], Message[{1}]",
158 ex.getErrorCode(), ex.getMessage(), ex);
159 }
160 finally {
161 try {
162 jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
163 }
164 catch (JPAExecutorException e) {
165 throw new CommandException(e);
166 }
167 }
168 }
169 }
170 LOG.debug("ENDED WorkflowActionKillXCommand for action " + actionId);
171 return null;
172 }
173
174 }