This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.wf;
019
020 import java.util.ArrayList;
021 import java.util.Date;
022 import java.util.List;
023
024 import javax.servlet.jsp.el.ELException;
025 import org.apache.hadoop.conf.Configuration;
026 import org.apache.oozie.ErrorCode;
027 import org.apache.oozie.FaultInjection;
028 import org.apache.oozie.SLAEventBean;
029 import org.apache.oozie.WorkflowActionBean;
030 import org.apache.oozie.WorkflowJobBean;
031 import org.apache.oozie.XException;
032 import org.apache.oozie.action.ActionExecutor;
033 import org.apache.oozie.action.ActionExecutorException;
034 import org.apache.oozie.action.control.ControlNodeActionExecutor;
035 import org.apache.oozie.client.OozieClient;
036 import org.apache.oozie.client.WorkflowAction;
037 import org.apache.oozie.client.WorkflowJob;
038 import org.apache.oozie.client.SLAEvent.SlaAppType;
039 import org.apache.oozie.client.SLAEvent.Status;
040 import org.apache.oozie.client.rest.JsonBean;
041 import org.apache.oozie.command.CommandException;
042 import org.apache.oozie.command.PreconditionException;
043 import org.apache.oozie.command.coord.CoordActionUpdateXCommand;
044 import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
045 import org.apache.oozie.executor.jpa.JPAExecutorException;
046 import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
047 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
048 import org.apache.oozie.service.ActionService;
049 import org.apache.oozie.service.JPAService;
050 import org.apache.oozie.service.Services;
051 import org.apache.oozie.service.UUIDService;
052 import org.apache.oozie.util.ELEvaluationException;
053 import org.apache.oozie.util.Instrumentation;
054 import org.apache.oozie.util.LogUtils;
055 import org.apache.oozie.util.XLog;
056 import org.apache.oozie.util.XmlUtils;
057 import org.apache.oozie.util.db.SLADbXOperations;
058
059 public class ActionStartXCommand extends ActionXCommand<Void> {
060 public static final String EL_ERROR = "EL_ERROR";
061 public static final String EL_EVAL_ERROR = "EL_EVAL_ERROR";
062 public static final String COULD_NOT_START = "COULD_NOT_START";
063 public static final String START_DATA_MISSING = "START_DATA_MISSING";
064 public static final String EXEC_DATA_MISSING = "EXEC_DATA_MISSING";
065
066 private String jobId = null;
067 private String actionId = null;
068 private WorkflowJobBean wfJob = null;
069 private WorkflowActionBean wfAction = null;
070 private JPAService jpaService = null;
071 private ActionExecutor executor = null;
072 private List<JsonBean> updateList = new ArrayList<JsonBean>();
073 private List<JsonBean> insertList = new ArrayList<JsonBean>();
074
075 public ActionStartXCommand(String actionId, String type) {
076 super("action.start", type, 0);
077 this.actionId = actionId;
078 this.jobId = Services.get().get(UUIDService.class).getId(actionId);
079 }
080
081 @Override
082 protected boolean isLockRequired() {
083 return true;
084 }
085
086 @Override
087 public String getEntityKey() {
088 return this.jobId;
089 }
090
091 @Override
092 protected void loadState() throws CommandException {
093 try {
094 jpaService = Services.get().get(JPAService.class);
095 if (jpaService != null) {
096 this.wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(jobId));
097 this.wfAction = jpaService.execute(new WorkflowActionGetJPAExecutor(actionId));
098 LogUtils.setLogInfo(wfJob, logInfo);
099 LogUtils.setLogInfo(wfAction, logInfo);
100 }
101 else {
102 throw new CommandException(ErrorCode.E0610);
103 }
104 }
105 catch (XException ex) {
106 throw new CommandException(ex);
107 }
108 }
109
110 @Override
111 protected void verifyPrecondition() throws CommandException, PreconditionException {
112 if (wfJob == null) {
113 throw new PreconditionException(ErrorCode.E0604, jobId);
114 }
115 if (wfAction == null) {
116 throw new PreconditionException(ErrorCode.E0605, actionId);
117 }
118 if (wfAction.isPending()
119 && (wfAction.getStatus() == WorkflowActionBean.Status.PREP
120 || wfAction.getStatus() == WorkflowActionBean.Status.START_RETRY
121 || wfAction.getStatus() == WorkflowActionBean.Status.START_MANUAL
122 || wfAction.getStatus() == WorkflowActionBean.Status.USER_RETRY
123 )) {
124 if (wfJob.getStatus() != WorkflowJob.Status.RUNNING) {
125 throw new PreconditionException(ErrorCode.E0810, WorkflowJob.Status.RUNNING.toString());
126 }
127 }
128 else {
129 throw new PreconditionException(ErrorCode.E0816, wfAction.getPending(), wfAction.getStatusStr());
130 }
131
132 executor = Services.get().get(ActionService.class).getExecutor(wfAction.getType());
133 if (executor == null) {
134 throw new CommandException(ErrorCode.E0802, wfAction.getType());
135 }
136 }
137
138 @Override
139 protected Void execute() throws CommandException {
140
141 LOG.debug("STARTED ActionStartXCommand for wf actionId=" + actionId);
142 Configuration conf = wfJob.getWorkflowInstance().getConf();
143
144 int maxRetries = 0;
145 long retryInterval = 0;
146
147 if (!(executor instanceof ControlNodeActionExecutor)) {
148 maxRetries = conf.getInt(OozieClient.ACTION_MAX_RETRIES, executor.getMaxRetries());
149 retryInterval = conf.getLong(OozieClient.ACTION_RETRY_INTERVAL, executor.getRetryInterval());
150 }
151
152 executor.setMaxRetries(maxRetries);
153 executor.setRetryInterval(retryInterval);
154
155 ActionExecutorContext context = null;
156 try {
157 boolean isRetry = false;
158 if (wfAction.getStatus() == WorkflowActionBean.Status.START_RETRY
159 || wfAction.getStatus() == WorkflowActionBean.Status.START_MANUAL) {
160 isRetry = true;
161 }
162 boolean isUserRetry = false;
163 if (wfAction.getStatus() == WorkflowActionBean.Status.USER_RETRY) {
164 isUserRetry = true;
165 }
166 context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry, isUserRetry);
167 boolean caught = false;
168 try {
169 if (!(executor instanceof ControlNodeActionExecutor)) {
170 String tmpActionConf = XmlUtils.removeComments(wfAction.getConf());
171 String actionConf = context.getELEvaluator().evaluate(tmpActionConf, String.class);
172 wfAction.setConf(actionConf);
173 LOG.debug("Start, name [{0}] type [{1}] configuration{E}{E}{2}{E}", wfAction.getName(), wfAction
174 .getType(), actionConf);
175 }
176 }
177 catch (ELEvaluationException ex) {
178 caught = true;
179 throw new ActionExecutorException(ActionExecutorException.ErrorType.TRANSIENT, EL_EVAL_ERROR, ex
180 .getMessage(), ex);
181 }
182 catch (ELException ex) {
183 caught = true;
184 context.setErrorInfo(EL_ERROR, ex.getMessage());
185 LOG.warn("ELException in ActionStartXCommand ", ex.getMessage(), ex);
186 handleError(context, wfJob, wfAction);
187 }
188 catch (org.jdom.JDOMException je) {
189 caught = true;
190 context.setErrorInfo("ParsingError", je.getMessage());
191 LOG.warn("JDOMException in ActionStartXCommand ", je.getMessage(), je);
192 handleError(context, wfJob, wfAction);
193 }
194 catch (Exception ex) {
195 caught = true;
196 context.setErrorInfo(EL_ERROR, ex.getMessage());
197 LOG.warn("Exception in ActionStartXCommand ", ex.getMessage(), ex);
198 handleError(context, wfJob, wfAction);
199 }
200 if(!caught) {
201 wfAction.setErrorInfo(null, null);
202 incrActionCounter(wfAction.getType(), 1);
203
204 LOG.info("Start action [{0}] with user-retry state : userRetryCount [{1}], userRetryMax [{2}], userRetryInterval [{3}]",
205 wfAction.getId(), wfAction.getUserRetryCount(), wfAction.getUserRetryMax(), wfAction
206 .getUserRetryInterval());
207
208 Instrumentation.Cron cron = new Instrumentation.Cron();
209 cron.start();
210 context.setStartTime();
211 executor.start(context, wfAction);
212 cron.stop();
213 FaultInjection.activate("org.apache.oozie.command.SkipCommitFaultInjection");
214 addActionCron(wfAction.getType(), cron);
215
216 wfAction.setRetries(0);
217 if (wfAction.isExecutionComplete()) {
218 if (!context.isExecuted()) {
219 LOG.warn(XLog.OPS, "Action Completed, ActionExecutor [{0}] must call setExecutionData()", executor
220 .getType());
221 wfAction.setErrorInfo(EXEC_DATA_MISSING,
222 "Execution Complete, but Execution Data Missing from Action");
223 failJob(context);
224 } else {
225 wfAction.setPending();
226 queue(new ActionEndXCommand(wfAction.getId(), wfAction.getType()));
227 }
228 }
229 else {
230 if (!context.isStarted()) {
231 LOG.warn(XLog.OPS, "Action Started, ActionExecutor [{0}] must call setStartData()", executor
232 .getType());
233 wfAction.setErrorInfo(START_DATA_MISSING, "Execution Started, but Start Data Missing from Action");
234 failJob(context);
235 } else {
236 queue(new NotificationXCommand(wfJob, wfAction));
237 }
238 }
239
240 LOG.warn(XLog.STD, "[***" + wfAction.getId() + "***]" + "Action status=" + wfAction.getStatusStr());
241
242 updateList.add(wfAction);
243 wfJob.setLastModifiedTime(new Date());
244 updateList.add(wfJob);
245 // Add SLA status event (STARTED) for WF_ACTION
246 SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), Status.STARTED,
247 SlaAppType.WORKFLOW_ACTION);
248 if(slaEvent != null) {
249 insertList.add(slaEvent);
250 }
251 LOG.warn(XLog.STD, "[***" + wfAction.getId() + "***]" + "Action updated in DB!");
252 }
253 }
254 catch (ActionExecutorException ex) {
255 LOG.warn("Error starting action [{0}]. ErrorType [{1}], ErrorCode [{2}], Message [{3}]",
256 wfAction.getName(), ex.getErrorType(), ex.getErrorCode(), ex.getMessage(), ex);
257 wfAction.setErrorInfo(ex.getErrorCode(), ex.getMessage());
258 switch (ex.getErrorType()) {
259 case TRANSIENT:
260 if (!handleTransient(context, executor, WorkflowAction.Status.START_RETRY)) {
261 handleNonTransient(context, executor, WorkflowAction.Status.START_MANUAL);
262 wfAction.setPendingAge(new Date());
263 wfAction.setRetries(0);
264 wfAction.setStartTime(null);
265 }
266 break;
267 case NON_TRANSIENT:
268 handleNonTransient(context, executor, WorkflowAction.Status.START_MANUAL);
269 break;
270 case ERROR:
271 handleError(context, executor, WorkflowAction.Status.ERROR.toString(), true,
272 WorkflowAction.Status.DONE);
273 break;
274 case FAILED:
275 try {
276 failJob(context);
277 // update coordinator action
278 new CoordActionUpdateXCommand(wfJob, 3).call();
279 new WfEndXCommand(wfJob).call(); // To delete the WF temp dir
280 SLAEventBean slaEvent1 = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), Status.FAILED,
281 SlaAppType.WORKFLOW_ACTION);
282 if(slaEvent1 != null) {
283 insertList.add(slaEvent1);
284 }
285 SLAEventBean slaEvent2 = SLADbXOperations.createStatusEvent(wfJob.getSlaXml(), wfJob.getId(), Status.FAILED,
286 SlaAppType.WORKFLOW_JOB);
287 if(slaEvent2 != null) {
288 insertList.add(slaEvent2);
289 }
290 }
291 catch (XException x) {
292 LOG.warn("ActionStartXCommand - case:FAILED ", x.getMessage());
293 }
294 break;
295 }
296 updateList.add(wfAction);
297 wfJob.setLastModifiedTime(new Date());
298 updateList.add(wfJob);
299 }
300 finally {
301 try {
302 jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
303 }
304 catch (JPAExecutorException e) {
305 throw new CommandException(e);
306 }
307 }
308
309 LOG.debug("ENDED ActionStartXCommand for wf actionId=" + actionId + ", jobId=" + jobId);
310
311 return null;
312 }
313
314 private void handleError(ActionExecutorContext context, WorkflowJobBean workflow, WorkflowActionBean action)
315 throws CommandException {
316 failJob(context);
317 updateList.add(wfAction);
318 wfJob.setLastModifiedTime(new Date());
319 updateList.add(wfJob);
320 SLAEventBean slaEvent1 = SLADbXOperations.createStatusEvent(action.getSlaXml(), action.getId(),
321 Status.FAILED, SlaAppType.WORKFLOW_ACTION);
322 if(slaEvent1 != null) {
323 insertList.add(slaEvent1);
324 }
325 SLAEventBean slaEvent2 = SLADbXOperations.createStatusEvent(workflow.getSlaXml(), workflow.getId(),
326 Status.FAILED, SlaAppType.WORKFLOW_JOB);
327 if(slaEvent2 != null) {
328 insertList.add(slaEvent2);
329 }
330 // update coordinator action
331 new CoordActionUpdateXCommand(workflow, 3).call();
332 new WfEndXCommand(wfJob).call(); //To delete the WF temp dir
333 return;
334 }
335
336 /* (non-Javadoc)
337 * @see org.apache.oozie.command.XCommand#getKey()
338 */
339 @Override
340 public String getKey(){
341 return getName() + "_" + actionId;
342 }
343
344 }