This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.wf;
019
020 import java.util.ArrayList;
021 import java.util.Date;
022 import java.util.List;
023
024 import org.apache.hadoop.conf.Configuration;
025 import org.apache.oozie.DagELFunctions;
026 import org.apache.oozie.ErrorCode;
027 import org.apache.oozie.SLAEventBean;
028 import org.apache.oozie.WorkflowActionBean;
029 import org.apache.oozie.WorkflowJobBean;
030 import org.apache.oozie.XException;
031 import org.apache.oozie.action.ActionExecutor;
032 import org.apache.oozie.action.ActionExecutorException;
033 import org.apache.oozie.action.control.ControlNodeActionExecutor;
034 import org.apache.oozie.client.OozieClient;
035 import org.apache.oozie.client.WorkflowAction;
036 import org.apache.oozie.client.WorkflowJob;
037 import org.apache.oozie.client.SLAEvent.SlaAppType;
038 import org.apache.oozie.client.SLAEvent.Status;
039 import org.apache.oozie.client.rest.JsonBean;
040 import org.apache.oozie.command.CommandException;
041 import org.apache.oozie.command.PreconditionException;
042 import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
043 import org.apache.oozie.executor.jpa.JPAExecutorException;
044 import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
045 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
046 import org.apache.oozie.service.ActionService;
047 import org.apache.oozie.service.EventHandlerService;
048 import org.apache.oozie.service.JPAService;
049 import org.apache.oozie.service.Services;
050 import org.apache.oozie.service.UUIDService;
051 import org.apache.oozie.util.Instrumentation;
052 import org.apache.oozie.util.LogUtils;
053 import org.apache.oozie.util.XLog;
054 import org.apache.oozie.util.db.SLADbXOperations;
055 import org.apache.oozie.workflow.WorkflowInstance;
056
057 @SuppressWarnings("deprecation")
058 public class ActionEndXCommand extends ActionXCommand<Void> {
059 public static final String COULD_NOT_END = "COULD_NOT_END";
060 public static final String END_DATA_MISSING = "END_DATA_MISSING";
061
062 private String jobId = null;
063 private String actionId = null;
064 private WorkflowJobBean wfJob = null;
065 private WorkflowActionBean wfAction = null;
066 private JPAService jpaService = null;
067 private ActionExecutor executor = null;
068 private List<JsonBean> updateList = new ArrayList<JsonBean>();
069 private List<JsonBean> insertList = new ArrayList<JsonBean>();
070
071 public ActionEndXCommand(String actionId, String type) {
072 super("action.end", type, 0);
073 this.actionId = actionId;
074 this.jobId = Services.get().get(UUIDService.class).getId(actionId);
075 }
076
077 @Override
078 protected boolean isLockRequired() {
079 return true;
080 }
081
082 @Override
083 public String getEntityKey() {
084 return this.jobId;
085 }
086
087 @Override
088 public String getKey() {
089 return getName() + "_" + actionId;
090 }
091
092 @Override
093 protected void loadState() throws CommandException {
094 try {
095 jpaService = Services.get().get(JPAService.class);
096 if (jpaService != null) {
097 this.wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(jobId));
098 this.wfAction = jpaService.execute(new WorkflowActionGetJPAExecutor(actionId));
099 LogUtils.setLogInfo(wfJob, logInfo);
100 LogUtils.setLogInfo(wfAction, logInfo);
101 }
102 else {
103 throw new CommandException(ErrorCode.E0610);
104 }
105 }
106 catch (XException ex) {
107 throw new CommandException(ex);
108 }
109 }
110
111 @Override
112 protected void verifyPrecondition() throws CommandException, PreconditionException {
113 if (wfJob == null) {
114 throw new PreconditionException(ErrorCode.E0604, jobId);
115 }
116 if (wfAction == null) {
117 throw new PreconditionException(ErrorCode.E0605, actionId);
118 }
119 if (wfAction.isPending()
120 && (wfAction.getStatus() == WorkflowActionBean.Status.DONE
121 || wfAction.getStatus() == WorkflowActionBean.Status.END_RETRY || wfAction.getStatus() == WorkflowActionBean.Status.END_MANUAL)) {
122
123 if (wfJob.getStatus() != WorkflowJob.Status.RUNNING) {
124 throw new PreconditionException(ErrorCode.E0811, WorkflowJob.Status.RUNNING.toString());
125 }
126 }
127 else {
128 throw new PreconditionException(ErrorCode.E0812, wfAction.getPending(), wfAction.getStatusStr());
129 }
130
131 executor = Services.get().get(ActionService.class).getExecutor(wfAction.getType());
132 if (executor == null) {
133 throw new CommandException(ErrorCode.E0802, wfAction.getType());
134 }
135 }
136
137 @Override
138 protected Void execute() throws CommandException {
139 LOG.debug("STARTED ActionEndXCommand for action " + actionId);
140
141 Configuration conf = wfJob.getWorkflowInstance().getConf();
142
143 int maxRetries = 0;
144 long retryInterval = 0;
145
146 if (!(executor instanceof ControlNodeActionExecutor)) {
147 maxRetries = conf.getInt(OozieClient.ACTION_MAX_RETRIES, executor.getMaxRetries());
148 retryInterval = conf.getLong(OozieClient.ACTION_RETRY_INTERVAL, executor.getRetryInterval());
149 }
150
151 executor.setMaxRetries(maxRetries);
152 executor.setRetryInterval(retryInterval);
153
154 boolean isRetry = false;
155 if (wfAction.getStatus() == WorkflowActionBean.Status.END_RETRY
156 || wfAction.getStatus() == WorkflowActionBean.Status.END_MANUAL) {
157 isRetry = true;
158 }
159 boolean isUserRetry = false;
160 ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry, isUserRetry);
161 try {
162
163 LOG.debug(
164 "End, name [{0}] type [{1}] status[{2}] external status [{3}] signal value [{4}]",
165 wfAction.getName(), wfAction.getType(), wfAction.getStatus(), wfAction.getExternalStatus(),
166 wfAction.getSignalValue());
167
168 Instrumentation.Cron cron = new Instrumentation.Cron();
169 cron.start();
170 executor.end(context, wfAction);
171 cron.stop();
172 addActionCron(wfAction.getType(), cron);
173
174 WorkflowInstance wfInstance = wfJob.getWorkflowInstance();
175 DagELFunctions.setActionInfo(wfInstance, wfAction);
176 wfJob.setWorkflowInstance(wfInstance);
177 incrActionCounter(wfAction.getType(), 1);
178
179 if (!context.isEnded()) {
180 LOG.warn(XLog.OPS, "Action Ended, ActionExecutor [{0}] must call setEndData()",
181 executor.getType());
182 wfAction.setErrorInfo(END_DATA_MISSING, "Execution Ended, but End Data Missing from Action");
183 failJob(context);
184 } else {
185 wfAction.setRetries(0);
186 wfAction.setEndTime(new Date());
187
188 boolean shouldHandleUserRetry = false;
189 Status slaStatus = null;
190 switch (wfAction.getStatus()) {
191 case OK:
192 slaStatus = Status.SUCCEEDED;
193 break;
194 case KILLED:
195 slaStatus = Status.KILLED;
196 break;
197 case FAILED:
198 slaStatus = Status.FAILED;
199 shouldHandleUserRetry = true;
200 break;
201 case ERROR:
202 LOG.info("ERROR is considered as FAILED for SLA");
203 slaStatus = Status.KILLED;
204 shouldHandleUserRetry = true;
205 break;
206 default:
207 slaStatus = Status.FAILED;
208 shouldHandleUserRetry = true;
209 break;
210 }
211 if (!shouldHandleUserRetry || !handleUserRetry(wfAction)) {
212 SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), slaStatus, SlaAppType.WORKFLOW_ACTION);
213 LOG.debug("Queuing commands for action=" + actionId + ", status=" + wfAction.getStatus()
214 + ", Set pending=" + wfAction.getPending());
215 if(slaEvent != null) {
216 insertList.add(slaEvent);
217 }
218 queue(new SignalXCommand(jobId, actionId));
219 }
220 }
221 updateList.add(wfAction);
222 wfJob.setLastModifiedTime(new Date());
223 updateList.add(wfJob);
224 }
225 catch (ActionExecutorException ex) {
226 LOG.warn(
227 "Error ending action [{0}]. ErrorType [{1}], ErrorCode [{2}], Message [{3}]",
228 wfAction.getName(), ex.getErrorType(), ex.getErrorCode(), ex.getMessage());
229 wfAction.setErrorInfo(ex.getErrorCode(), ex.getMessage());
230 wfAction.setEndTime(null);
231
232 switch (ex.getErrorType()) {
233 case TRANSIENT:
234 if (!handleTransient(context, executor, WorkflowAction.Status.END_RETRY)) {
235 handleNonTransient(context, executor, WorkflowAction.Status.END_MANUAL);
236 wfAction.setPendingAge(new Date());
237 wfAction.setRetries(0);
238 }
239 wfAction.setEndTime(null);
240 break;
241 case NON_TRANSIENT:
242 handleNonTransient(context, executor, WorkflowAction.Status.END_MANUAL);
243 wfAction.setEndTime(null);
244 break;
245 case ERROR:
246 handleError(context, executor, COULD_NOT_END, false, WorkflowAction.Status.ERROR);
247 queue(new SignalXCommand(jobId, actionId));
248 break;
249 case FAILED:
250 failJob(context);
251 break;
252 }
253
254 WorkflowInstance wfInstance = wfJob.getWorkflowInstance();
255 DagELFunctions.setActionInfo(wfInstance, wfAction);
256 wfJob.setWorkflowInstance(wfInstance);
257
258 updateList.add(wfAction);
259 wfJob.setLastModifiedTime(new Date());
260 updateList.add(wfJob);
261 }
262 finally {
263 try {
264 jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
265 if (!(executor instanceof ControlNodeActionExecutor) && EventHandlerService.isEnabled()) {
266 generateEvent(wfAction, wfJob.getUser());
267 }
268 }
269 catch (JPAExecutorException e) {
270 throw new CommandException(e);
271 }
272 }
273
274 LOG.debug("ENDED ActionEndXCommand for action " + actionId);
275 return null;
276 }
277
278 }