This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.wf;
019
020 import java.util.Date;
021
022 import org.apache.hadoop.conf.Configuration;
023 import org.apache.oozie.DagELFunctions;
024 import org.apache.oozie.ErrorCode;
025 import org.apache.oozie.WorkflowActionBean;
026 import org.apache.oozie.WorkflowJobBean;
027 import org.apache.oozie.XException;
028 import org.apache.oozie.action.ActionExecutor;
029 import org.apache.oozie.action.ActionExecutorException;
030 import org.apache.oozie.client.OozieClient;
031 import org.apache.oozie.client.WorkflowAction;
032 import org.apache.oozie.client.WorkflowJob;
033 import org.apache.oozie.client.SLAEvent.SlaAppType;
034 import org.apache.oozie.client.SLAEvent.Status;
035 import org.apache.oozie.command.CommandException;
036 import org.apache.oozie.command.PreconditionException;
037 import org.apache.oozie.executor.jpa.JPAExecutorException;
038 import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
039 import org.apache.oozie.executor.jpa.WorkflowActionUpdateJPAExecutor;
040 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
041 import org.apache.oozie.executor.jpa.WorkflowJobUpdateJPAExecutor;
042 import org.apache.oozie.service.ActionService;
043 import org.apache.oozie.service.JPAService;
044 import org.apache.oozie.service.Services;
045 import org.apache.oozie.service.UUIDService;
046 import org.apache.oozie.util.Instrumentation;
047 import org.apache.oozie.util.LogUtils;
048 import org.apache.oozie.util.XLog;
049 import org.apache.oozie.util.db.SLADbXOperations;
050 import org.apache.oozie.workflow.WorkflowInstance;
051
052 public class ActionEndXCommand extends ActionXCommand<Void> {
053 public static final String COULD_NOT_END = "COULD_NOT_END";
054 public static final String END_DATA_MISSING = "END_DATA_MISSING";
055
056 private String jobId = null;
057 private String actionId = null;
058 private WorkflowJobBean wfJob = null;
059 private WorkflowActionBean wfAction = null;
060 private JPAService jpaService = null;
061 private ActionExecutor executor = null;
062
063 public ActionEndXCommand(String actionId, String type) {
064 super("action.end", type, 0);
065 this.actionId = actionId;
066 this.jobId = Services.get().get(UUIDService.class).getId(actionId);
067 }
068
069 @Override
070 protected boolean isLockRequired() {
071 return true;
072 }
073
074 @Override
075 protected String getEntityKey() {
076 return this.jobId;
077 }
078
079 @Override
080 protected void loadState() throws CommandException {
081 try {
082 jpaService = Services.get().get(JPAService.class);
083 if (jpaService != null) {
084 this.wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(jobId));
085 this.wfAction = jpaService.execute(new WorkflowActionGetJPAExecutor(actionId));
086 LogUtils.setLogInfo(wfJob, logInfo);
087 LogUtils.setLogInfo(wfAction, logInfo);
088 }
089 else {
090 throw new CommandException(ErrorCode.E0610);
091 }
092 }
093 catch (XException ex) {
094 throw new CommandException(ex);
095 }
096 }
097
098 @Override
099 protected void verifyPrecondition() throws CommandException, PreconditionException {
100 if (wfJob == null) {
101 throw new PreconditionException(ErrorCode.E0604, jobId);
102 }
103 if (wfAction == null) {
104 throw new PreconditionException(ErrorCode.E0605, actionId);
105 }
106 if (wfAction.isPending()
107 && (wfAction.getStatus() == WorkflowActionBean.Status.DONE
108 || wfAction.getStatus() == WorkflowActionBean.Status.END_RETRY || wfAction.getStatus() == WorkflowActionBean.Status.END_MANUAL)) {
109
110 if (wfJob.getStatus() != WorkflowJob.Status.RUNNING) {
111 throw new PreconditionException(ErrorCode.E0811, WorkflowJob.Status.RUNNING.toString());
112 }
113 }
114 else {
115 throw new PreconditionException(ErrorCode.E0812, wfAction.getPending(), wfAction.getStatusStr());
116 }
117
118 executor = Services.get().get(ActionService.class).getExecutor(wfAction.getType());
119 if (executor == null) {
120 throw new CommandException(ErrorCode.E0802, wfAction.getType());
121 }
122 }
123
124 @Override
125 protected Void execute() throws CommandException {
126 LOG.debug("STARTED ActionEndXCommand for action " + actionId);
127
128 Configuration conf = wfJob.getWorkflowInstance().getConf();
129 int maxRetries = conf.getInt(OozieClient.ACTION_MAX_RETRIES, executor.getMaxRetries());
130 long retryInterval = conf.getLong(OozieClient.ACTION_RETRY_INTERVAL, executor.getRetryInterval());
131 executor.setMaxRetries(maxRetries);
132 executor.setRetryInterval(retryInterval);
133
134 boolean isRetry = false;
135 if (wfAction.getStatus() == WorkflowActionBean.Status.END_RETRY
136 || wfAction.getStatus() == WorkflowActionBean.Status.END_MANUAL) {
137 isRetry = true;
138 }
139 boolean isUserRetry = false;
140 ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry, isUserRetry);
141 try {
142
143 LOG.debug(
144 "End, name [{0}] type [{1}] status[{2}] external status [{3}] signal value [{4}]",
145 wfAction.getName(), wfAction.getType(), wfAction.getStatus(), wfAction.getExternalStatus(),
146 wfAction.getSignalValue());
147
148 Instrumentation.Cron cron = new Instrumentation.Cron();
149 cron.start();
150 executor.end(context, wfAction);
151 cron.stop();
152 addActionCron(wfAction.getType(), cron);
153
154 WorkflowInstance wfInstance = wfJob.getWorkflowInstance();
155 DagELFunctions.setActionInfo(wfInstance, wfAction);
156 wfJob.setWorkflowInstance(wfInstance);
157 incrActionCounter(wfAction.getType(), 1);
158
159 if (!context.isEnded()) {
160 LOG.warn(XLog.OPS, "Action Ended, ActionExecutor [{0}] must call setEndData()",
161 executor.getType());
162 wfAction.setErrorInfo(END_DATA_MISSING, "Execution Ended, but End Data Missing from Action");
163 failJob(context);
164 jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
165 jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
166 return null;
167 }
168 wfAction.setRetries(0);
169 wfAction.setEndTime(new Date());
170
171 boolean shouldHandleUserRetry = false;
172 Status slaStatus = null;
173 switch (wfAction.getStatus()) {
174 case OK:
175 slaStatus = Status.SUCCEEDED;
176 break;
177 case KILLED:
178 slaStatus = Status.KILLED;
179 break;
180 case FAILED:
181 slaStatus = Status.FAILED;
182 shouldHandleUserRetry = true;
183 break;
184 case ERROR:
185 LOG.info("ERROR is considered as FAILED for SLA");
186 slaStatus = Status.KILLED;
187 shouldHandleUserRetry = true;
188 break;
189 default:
190 slaStatus = Status.FAILED;
191 shouldHandleUserRetry = true;
192 break;
193 }
194 if (!shouldHandleUserRetry || !handleUserRetry(wfAction)) {
195 SLADbXOperations.writeStausEvent(wfAction.getSlaXml(), wfAction.getId(), slaStatus, SlaAppType.WORKFLOW_ACTION);
196 queue(new NotificationXCommand(wfJob, wfAction));
197 LOG.debug(
198 "Queuing commands for action=" + actionId + ", status=" + wfAction.getStatus()
199 + ", Set pending=" + wfAction.getPending());
200 queue(new SignalXCommand(jobId, actionId));
201 }
202
203 jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
204 jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
205 }
206 catch (ActionExecutorException ex) {
207 LOG.warn(
208 "Error ending action [{0}]. ErrorType [{1}], ErrorCode [{2}], Message [{3}]",
209 wfAction.getName(), ex.getErrorType(), ex.getErrorCode(), ex.getMessage());
210 wfAction.setErrorInfo(ex.getErrorCode(), ex.getMessage());
211 wfAction.setEndTime(null);
212
213 switch (ex.getErrorType()) {
214 case TRANSIENT:
215 if (!handleTransient(context, executor, WorkflowAction.Status.END_RETRY)) {
216 handleNonTransient(context, executor, WorkflowAction.Status.END_MANUAL);
217 wfAction.setPendingAge(new Date());
218 wfAction.setRetries(0);
219 }
220 wfAction.setEndTime(null);
221 break;
222 case NON_TRANSIENT:
223 handleNonTransient(context, executor, WorkflowAction.Status.END_MANUAL);
224 wfAction.setEndTime(null);
225 break;
226 case ERROR:
227 handleError(context, executor, COULD_NOT_END, false, WorkflowAction.Status.ERROR);
228 queue(new SignalXCommand(jobId, actionId));
229 break;
230 case FAILED:
231 failJob(context);
232 break;
233 }
234
235 WorkflowInstance wfInstance = wfJob.getWorkflowInstance();
236 DagELFunctions.setActionInfo(wfInstance, wfAction);
237 wfJob.setWorkflowInstance(wfInstance);
238
239 try {
240 jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
241 jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
242 }
243 catch (JPAExecutorException je) {
244 throw new CommandException(je);
245 }
246
247 }
248 catch (JPAExecutorException je) {
249 throw new CommandException(je);
250 }
251
252
253 LOG.debug("ENDED ActionEndXCommand for action " + actionId);
254 return null;
255 }
256
257 }