This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.wf;
019
020 import org.apache.oozie.ErrorCode;
021 import org.apache.oozie.WorkflowActionBean;
022 import org.apache.oozie.WorkflowJobBean;
023 import org.apache.oozie.XException;
024 import org.apache.oozie.client.SLAEvent.SlaAppType;
025 import org.apache.oozie.client.SLAEvent.Status;
026 import org.apache.oozie.command.CommandException;
027 import org.apache.oozie.command.PreconditionException;
028 import org.apache.oozie.executor.jpa.JPAExecutorException;
029 import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
030 import org.apache.oozie.executor.jpa.WorkflowActionUpdateJPAExecutor;
031 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
032 import org.apache.oozie.executor.jpa.WorkflowJobUpdateJPAExecutor;
033 import org.apache.oozie.action.ActionExecutor;
034 import org.apache.oozie.action.ActionExecutorException;
035 import org.apache.oozie.service.ActionService;
036 import org.apache.oozie.service.JPAService;
037 import org.apache.oozie.service.UUIDService;
038 import org.apache.oozie.service.Services;
039 import org.apache.oozie.util.LogUtils;
040 import org.apache.oozie.util.Instrumentation;
041 import org.apache.oozie.util.db.SLADbXOperations;
042
043 /**
044 * Kill workflow action and invoke action executor to kill the underlying context.
045 *
046 */
047 public class ActionKillXCommand extends ActionXCommand<Void> {
048 private String actionId;
049 private String jobId;
050 private WorkflowJobBean wfJob;
051 private WorkflowActionBean wfAction;
052 private JPAService jpaService = null;
053
054 public ActionKillXCommand(String actionId, String type) {
055 super("action.kill", type, 0);
056 this.actionId = actionId;
057 this.jobId = Services.get().get(UUIDService.class).getId(actionId);
058 }
059
060 public ActionKillXCommand(String actionId) {
061 this(actionId, "action.kill");
062 }
063
064 @Override
065 protected boolean isLockRequired() {
066 return true;
067 }
068
069 @Override
070 public String getEntityKey() {
071 return this.jobId;
072 }
073
074 @Override
075 protected void loadState() throws CommandException {
076 try {
077 jpaService = Services.get().get(JPAService.class);
078
079 if (jpaService != null) {
080 this.wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(jobId));
081 this.wfAction = jpaService.execute(new WorkflowActionGetJPAExecutor(actionId));
082 LogUtils.setLogInfo(wfJob, logInfo);
083 LogUtils.setLogInfo(wfAction, logInfo);
084 }
085 else {
086 throw new CommandException(ErrorCode.E0610);
087 }
088 }
089 catch (XException ex) {
090 throw new CommandException(ex);
091 }
092 }
093
094 @Override
095 protected void verifyPrecondition() throws CommandException, PreconditionException {
096 if (wfAction.getStatus() != WorkflowActionBean.Status.KILLED) {
097 throw new PreconditionException(ErrorCode.E0726, wfAction.getId());
098 }
099 }
100
101 @Override
102 protected Void execute() throws CommandException {
103 LOG.debug("STARTED WorkflowActionKillXCommand for action " + actionId);
104
105 if (wfAction.isPending()) {
106 ActionExecutor executor = Services.get().get(ActionService.class).getExecutor(wfAction.getType());
107 if (executor != null) {
108 try {
109 boolean isRetry = false;
110 boolean isUserRetry = false;
111 ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction,
112 isRetry, isUserRetry);
113 incrActionCounter(wfAction.getType(), 1);
114
115 Instrumentation.Cron cron = new Instrumentation.Cron();
116 cron.start();
117 executor.kill(context, wfAction);
118 cron.stop();
119 addActionCron(wfAction.getType(), cron);
120
121 wfAction.resetPending();
122 wfAction.setStatus(WorkflowActionBean.Status.KILLED);
123
124 jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
125 jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
126 // Add SLA status event (KILLED) for WF_ACTION
127 SLADbXOperations.writeStausEvent(wfAction.getSlaXml(), wfAction.getId(), Status.KILLED,
128 SlaAppType.WORKFLOW_ACTION);
129 queue(new NotificationXCommand(wfJob, wfAction));
130 }
131 catch (ActionExecutorException ex) {
132 wfAction.resetPending();
133 wfAction.setStatus(WorkflowActionBean.Status.FAILED);
134 wfAction.setErrorInfo(ex.getErrorCode().toString(),
135 "KILL COMMAND FAILED - exception while executing job kill");
136 wfJob.setStatus(WorkflowJobBean.Status.KILLED);
137 try {
138 jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
139 jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
140 }
141 catch (JPAExecutorException je) {
142 throw new CommandException(je);
143 }
144 // What will happen to WF and COORD_ACTION, NOTIFICATION?
145 SLADbXOperations.writeStausEvent(wfAction.getSlaXml(), wfAction.getId(), Status.FAILED,
146 SlaAppType.WORKFLOW_ACTION);
147 LOG.warn("Exception while executing kill(). Error Code [{0}], Message[{1}]",
148 ex.getErrorCode(), ex.getMessage(), ex);
149 }
150 catch (JPAExecutorException je) {
151 throw new CommandException(je);
152 }
153 }
154 }
155 LOG.debug("ENDED WorkflowActionKillXCommand for action " + actionId);
156 return null;
157 }
158
159 }