This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.wf;
019
020 import java.util.ArrayList;
021 import java.util.Date;
022 import java.util.List;
023
024 import javax.servlet.jsp.el.ELException;
025 import org.apache.hadoop.conf.Configuration;
026 import org.apache.oozie.ErrorCode;
027 import org.apache.oozie.FaultInjection;
028 import org.apache.oozie.SLAEventBean;
029 import org.apache.oozie.WorkflowActionBean;
030 import org.apache.oozie.WorkflowJobBean;
031 import org.apache.oozie.XException;
032 import org.apache.oozie.action.ActionExecutor;
033 import org.apache.oozie.action.ActionExecutorException;
034 import org.apache.oozie.action.control.ControlNodeActionExecutor;
035 import org.apache.oozie.client.OozieClient;
036 import org.apache.oozie.client.WorkflowAction;
037 import org.apache.oozie.client.WorkflowJob;
038 import org.apache.oozie.client.SLAEvent.SlaAppType;
039 import org.apache.oozie.client.SLAEvent.Status;
040 import org.apache.oozie.client.rest.JsonBean;
041 import org.apache.oozie.command.CommandException;
042 import org.apache.oozie.command.PreconditionException;
043 import org.apache.oozie.command.coord.CoordActionUpdateXCommand;
044 import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
045 import org.apache.oozie.executor.jpa.JPAExecutorException;
046 import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
047 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
048 import org.apache.oozie.service.ActionService;
049 import org.apache.oozie.service.EventHandlerService;
050 import org.apache.oozie.service.JPAService;
051 import org.apache.oozie.service.Services;
052 import org.apache.oozie.service.UUIDService;
053 import org.apache.oozie.util.ELEvaluationException;
054 import org.apache.oozie.util.Instrumentation;
055 import org.apache.oozie.util.LogUtils;
056 import org.apache.oozie.util.XLog;
057 import org.apache.oozie.util.XmlUtils;
058 import org.apache.oozie.util.db.SLADbXOperations;
059
060 @SuppressWarnings("deprecation")
061 public class ActionStartXCommand extends ActionXCommand<Void> {
062 public static final String EL_ERROR = "EL_ERROR";
063 public static final String EL_EVAL_ERROR = "EL_EVAL_ERROR";
064 public static final String COULD_NOT_START = "COULD_NOT_START";
065 public static final String START_DATA_MISSING = "START_DATA_MISSING";
066 public static final String EXEC_DATA_MISSING = "EXEC_DATA_MISSING";
067
068 private String jobId = null;
069 private String actionId = null;
070 private WorkflowJobBean wfJob = null;
071 private WorkflowActionBean wfAction = null;
072 private JPAService jpaService = null;
073 private ActionExecutor executor = null;
074 private List<JsonBean> updateList = new ArrayList<JsonBean>();
075 private List<JsonBean> insertList = new ArrayList<JsonBean>();
076
077 public ActionStartXCommand(String actionId, String type) {
078 super("action.start", type, 0);
079 this.actionId = actionId;
080 this.jobId = Services.get().get(UUIDService.class).getId(actionId);
081 }
082
083 @Override
084 protected boolean isLockRequired() {
085 return true;
086 }
087
088 @Override
089 public String getEntityKey() {
090 return this.jobId;
091 }
092
093 @Override
094 protected void loadState() throws CommandException {
095 try {
096 jpaService = Services.get().get(JPAService.class);
097 if (jpaService != null) {
098 this.wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(jobId));
099 this.wfAction = jpaService.execute(new WorkflowActionGetJPAExecutor(actionId));
100 LogUtils.setLogInfo(wfJob, logInfo);
101 LogUtils.setLogInfo(wfAction, logInfo);
102 }
103 else {
104 throw new CommandException(ErrorCode.E0610);
105 }
106 }
107 catch (XException ex) {
108 throw new CommandException(ex);
109 }
110 }
111
112 @Override
113 protected void verifyPrecondition() throws CommandException, PreconditionException {
114 if (wfJob == null) {
115 throw new PreconditionException(ErrorCode.E0604, jobId);
116 }
117 if (wfAction == null) {
118 throw new PreconditionException(ErrorCode.E0605, actionId);
119 }
120 if (wfAction.isPending()
121 && (wfAction.getStatus() == WorkflowActionBean.Status.PREP
122 || wfAction.getStatus() == WorkflowActionBean.Status.START_RETRY
123 || wfAction.getStatus() == WorkflowActionBean.Status.START_MANUAL
124 || wfAction.getStatus() == WorkflowActionBean.Status.USER_RETRY
125 )) {
126 if (wfJob.getStatus() != WorkflowJob.Status.RUNNING) {
127 throw new PreconditionException(ErrorCode.E0810, WorkflowJob.Status.RUNNING.toString());
128 }
129 }
130 else {
131 throw new PreconditionException(ErrorCode.E0816, wfAction.getPending(), wfAction.getStatusStr());
132 }
133
134 executor = Services.get().get(ActionService.class).getExecutor(wfAction.getType());
135 if (executor == null) {
136 throw new CommandException(ErrorCode.E0802, wfAction.getType());
137 }
138 }
139
140 @Override
141 protected Void execute() throws CommandException {
142
143 LOG.debug("STARTED ActionStartXCommand for wf actionId=" + actionId);
144 Configuration conf = wfJob.getWorkflowInstance().getConf();
145
146 int maxRetries = 0;
147 long retryInterval = 0;
148
149 if (!(executor instanceof ControlNodeActionExecutor)) {
150 maxRetries = conf.getInt(OozieClient.ACTION_MAX_RETRIES, executor.getMaxRetries());
151 retryInterval = conf.getLong(OozieClient.ACTION_RETRY_INTERVAL, executor.getRetryInterval());
152 }
153
154 executor.setMaxRetries(maxRetries);
155 executor.setRetryInterval(retryInterval);
156
157 ActionExecutorContext context = null;
158 try {
159 boolean isRetry = false;
160 if (wfAction.getStatus() == WorkflowActionBean.Status.START_RETRY
161 || wfAction.getStatus() == WorkflowActionBean.Status.START_MANUAL) {
162 isRetry = true;
163 prepareForRetry(wfAction);
164 }
165 boolean isUserRetry = false;
166 if (wfAction.getStatus() == WorkflowActionBean.Status.USER_RETRY) {
167 isUserRetry = true;
168 prepareForRetry(wfAction);
169 }
170 context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry, isUserRetry);
171 boolean caught = false;
172 try {
173 if (!(executor instanceof ControlNodeActionExecutor)) {
174 String tmpActionConf = XmlUtils.removeComments(wfAction.getConf());
175 String actionConf = context.getELEvaluator().evaluate(tmpActionConf, String.class);
176 wfAction.setConf(actionConf);
177 LOG.debug("Start, name [{0}] type [{1}] configuration{E}{E}{2}{E}", wfAction.getName(), wfAction
178 .getType(), actionConf);
179 }
180 }
181 catch (ELEvaluationException ex) {
182 caught = true;
183 throw new ActionExecutorException(ActionExecutorException.ErrorType.TRANSIENT, EL_EVAL_ERROR, ex
184 .getMessage(), ex);
185 }
186 catch (ELException ex) {
187 caught = true;
188 context.setErrorInfo(EL_ERROR, ex.getMessage());
189 LOG.warn("ELException in ActionStartXCommand ", ex.getMessage(), ex);
190 handleError(context, wfJob, wfAction);
191 }
192 catch (org.jdom.JDOMException je) {
193 caught = true;
194 context.setErrorInfo("ParsingError", je.getMessage());
195 LOG.warn("JDOMException in ActionStartXCommand ", je.getMessage(), je);
196 handleError(context, wfJob, wfAction);
197 }
198 catch (Exception ex) {
199 caught = true;
200 context.setErrorInfo(EL_ERROR, ex.getMessage());
201 LOG.warn("Exception in ActionStartXCommand ", ex.getMessage(), ex);
202 handleError(context, wfJob, wfAction);
203 }
204 if(!caught) {
205 wfAction.setErrorInfo(null, null);
206 incrActionCounter(wfAction.getType(), 1);
207
208 LOG.info("Start action [{0}] with user-retry state : userRetryCount [{1}], userRetryMax [{2}], userRetryInterval [{3}]",
209 wfAction.getId(), wfAction.getUserRetryCount(), wfAction.getUserRetryMax(), wfAction
210 .getUserRetryInterval());
211
212 Instrumentation.Cron cron = new Instrumentation.Cron();
213 cron.start();
214 context.setStartTime();
215 executor.start(context, wfAction);
216 cron.stop();
217 FaultInjection.activate("org.apache.oozie.command.SkipCommitFaultInjection");
218 addActionCron(wfAction.getType(), cron);
219
220 wfAction.setRetries(0);
221 if (wfAction.isExecutionComplete()) {
222 if (!context.isExecuted()) {
223 LOG.warn(XLog.OPS, "Action Completed, ActionExecutor [{0}] must call setExecutionData()", executor
224 .getType());
225 wfAction.setErrorInfo(EXEC_DATA_MISSING,
226 "Execution Complete, but Execution Data Missing from Action");
227 failJob(context);
228 } else {
229 wfAction.setPending();
230 queue(new ActionEndXCommand(wfAction.getId(), wfAction.getType()));
231 }
232 }
233 else {
234 if (!context.isStarted()) {
235 LOG.warn(XLog.OPS, "Action Started, ActionExecutor [{0}] must call setStartData()", executor
236 .getType());
237 wfAction.setErrorInfo(START_DATA_MISSING, "Execution Started, but Start Data Missing from Action");
238 failJob(context);
239 } else {
240 queue(new NotificationXCommand(wfJob, wfAction));
241 }
242 }
243
244 LOG.warn(XLog.STD, "[***" + wfAction.getId() + "***]" + "Action status=" + wfAction.getStatusStr());
245
246 updateList.add(wfAction);
247 wfJob.setLastModifiedTime(new Date());
248 updateList.add(wfJob);
249 // Add SLA status event (STARTED) for WF_ACTION
250 SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), Status.STARTED,
251 SlaAppType.WORKFLOW_ACTION);
252 if(slaEvent != null) {
253 insertList.add(slaEvent);
254 }
255 LOG.warn(XLog.STD, "[***" + wfAction.getId() + "***]" + "Action updated in DB!");
256 }
257 }
258 catch (ActionExecutorException ex) {
259 LOG.warn("Error starting action [{0}]. ErrorType [{1}], ErrorCode [{2}], Message [{3}]",
260 wfAction.getName(), ex.getErrorType(), ex.getErrorCode(), ex.getMessage(), ex);
261 wfAction.setErrorInfo(ex.getErrorCode(), ex.getMessage());
262 switch (ex.getErrorType()) {
263 case TRANSIENT:
264 if (!handleTransient(context, executor, WorkflowAction.Status.START_RETRY)) {
265 handleNonTransient(context, executor, WorkflowAction.Status.START_MANUAL);
266 wfAction.setPendingAge(new Date());
267 wfAction.setRetries(0);
268 wfAction.setStartTime(null);
269 }
270 break;
271 case NON_TRANSIENT:
272 handleNonTransient(context, executor, WorkflowAction.Status.START_MANUAL);
273 break;
274 case ERROR:
275 handleError(context, executor, WorkflowAction.Status.ERROR.toString(), true,
276 WorkflowAction.Status.DONE);
277 break;
278 case FAILED:
279 try {
280 failJob(context);
281 // update coordinator action
282 new CoordActionUpdateXCommand(wfJob, 3).call();
283 new WfEndXCommand(wfJob).call(); // To delete the WF temp dir
284 SLAEventBean slaEvent1 = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), Status.FAILED,
285 SlaAppType.WORKFLOW_ACTION);
286 if(slaEvent1 != null) {
287 insertList.add(slaEvent1);
288 }
289 SLAEventBean slaEvent2 = SLADbXOperations.createStatusEvent(wfJob.getSlaXml(), wfJob.getId(), Status.FAILED,
290 SlaAppType.WORKFLOW_JOB);
291 if(slaEvent2 != null) {
292 insertList.add(slaEvent2);
293 }
294 }
295 catch (XException x) {
296 LOG.warn("ActionStartXCommand - case:FAILED ", x.getMessage());
297 }
298 break;
299 }
300 updateList.add(wfAction);
301 wfJob.setLastModifiedTime(new Date());
302 updateList.add(wfJob);
303 }
304 finally {
305 try {
306 jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
307 if (!(executor instanceof ControlNodeActionExecutor) && EventHandlerService.isEnabled()) {
308 generateEvent(wfAction, wfJob.getUser());
309 }
310 }
311 catch (JPAExecutorException e) {
312 throw new CommandException(e);
313 }
314 }
315
316 LOG.debug("ENDED ActionStartXCommand for wf actionId=" + actionId + ", jobId=" + jobId);
317
318 return null;
319 }
320
321 private void handleError(ActionExecutorContext context, WorkflowJobBean workflow, WorkflowActionBean action)
322 throws CommandException {
323 failJob(context);
324 updateList.add(wfAction);
325 wfJob.setLastModifiedTime(new Date());
326 updateList.add(wfJob);
327 SLAEventBean slaEvent1 = SLADbXOperations.createStatusEvent(action.getSlaXml(), action.getId(),
328 Status.FAILED, SlaAppType.WORKFLOW_ACTION);
329 if(slaEvent1 != null) {
330 insertList.add(slaEvent1);
331 }
332 SLAEventBean slaEvent2 = SLADbXOperations.createStatusEvent(workflow.getSlaXml(), workflow.getId(),
333 Status.FAILED, SlaAppType.WORKFLOW_JOB);
334 if(slaEvent2 != null) {
335 insertList.add(slaEvent2);
336 }
337
338 new WfEndXCommand(wfJob).call(); //To delete the WF temp dir
339 return;
340 }
341
342 /* (non-Javadoc)
343 * @see org.apache.oozie.command.XCommand#getKey()
344 */
345 @Override
346 public String getKey(){
347 return getName() + "_" + actionId;
348 }
349
350 private void prepareForRetry(WorkflowActionBean wfAction) {
351 if (wfAction.getType().equals("map-reduce")) {
352 // need to delete child job id of original run
353 wfAction.setExternalChildIDs("");
354 }
355 }
356
357 }