This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.wf;
019
020 import java.util.Date;
021
022 import javax.servlet.jsp.el.ELException;
023 import org.apache.hadoop.conf.Configuration;
024 import org.apache.oozie.ErrorCode;
025 import org.apache.oozie.FaultInjection;
026 import org.apache.oozie.WorkflowActionBean;
027 import org.apache.oozie.WorkflowJobBean;
028 import org.apache.oozie.XException;
029 import org.apache.oozie.action.ActionExecutor;
030 import org.apache.oozie.action.ActionExecutorException;
031 import org.apache.oozie.client.OozieClient;
032 import org.apache.oozie.client.WorkflowAction;
033 import org.apache.oozie.client.WorkflowJob;
034 import org.apache.oozie.client.SLAEvent.SlaAppType;
035 import org.apache.oozie.client.SLAEvent.Status;
036 import org.apache.oozie.command.CommandException;
037 import org.apache.oozie.command.PreconditionException;
038 import org.apache.oozie.command.coord.CoordActionUpdateXCommand;
039 import org.apache.oozie.executor.jpa.JPAExecutorException;
040 import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
041 import org.apache.oozie.executor.jpa.WorkflowActionUpdateJPAExecutor;
042 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
043 import org.apache.oozie.executor.jpa.WorkflowJobUpdateJPAExecutor;
044 import org.apache.oozie.service.ActionService;
045 import org.apache.oozie.service.JPAService;
046 import org.apache.oozie.service.Services;
047 import org.apache.oozie.service.UUIDService;
048 import org.apache.oozie.util.ELEvaluationException;
049 import org.apache.oozie.util.Instrumentation;
050 import org.apache.oozie.util.LogUtils;
051 import org.apache.oozie.util.XLog;
052 import org.apache.oozie.util.XmlUtils;
053 import org.apache.oozie.util.db.SLADbXOperations;
054
055 public class ActionStartXCommand extends ActionXCommand<Void> {
056 public static final String EL_ERROR = "EL_ERROR";
057 public static final String EL_EVAL_ERROR = "EL_EVAL_ERROR";
058 public static final String COULD_NOT_START = "COULD_NOT_START";
059 public static final String START_DATA_MISSING = "START_DATA_MISSING";
060 public static final String EXEC_DATA_MISSING = "EXEC_DATA_MISSING";
061
062 private String jobId = null;
063 private String actionId = null;
064 private WorkflowJobBean wfJob = null;
065 private WorkflowActionBean wfAction = null;
066 private JPAService jpaService = null;
067 private ActionExecutor executor = null;
068
069 public ActionStartXCommand(String actionId, String type) {
070 super("action.start", type, 0);
071 this.actionId = actionId;
072 this.jobId = Services.get().get(UUIDService.class).getId(actionId);
073 }
074
075 @Override
076 protected boolean isLockRequired() {
077 return true;
078 }
079
080 @Override
081 public String getEntityKey() {
082 return this.jobId;
083 }
084
085 @Override
086 protected void loadState() throws CommandException {
087 try {
088 jpaService = Services.get().get(JPAService.class);
089 if (jpaService != null) {
090 this.wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(jobId));
091 this.wfAction = jpaService.execute(new WorkflowActionGetJPAExecutor(actionId));
092 LogUtils.setLogInfo(wfJob, logInfo);
093 LogUtils.setLogInfo(wfAction, logInfo);
094 }
095 else {
096 throw new CommandException(ErrorCode.E0610);
097 }
098 }
099 catch (XException ex) {
100 throw new CommandException(ex);
101 }
102 }
103
104 @Override
105 protected void verifyPrecondition() throws CommandException, PreconditionException {
106 if (wfJob == null) {
107 throw new PreconditionException(ErrorCode.E0604, jobId);
108 }
109 if (wfAction == null) {
110 throw new PreconditionException(ErrorCode.E0605, actionId);
111 }
112 if (wfAction.isPending()
113 && (wfAction.getStatus() == WorkflowActionBean.Status.PREP
114 || wfAction.getStatus() == WorkflowActionBean.Status.START_RETRY
115 || wfAction.getStatus() == WorkflowActionBean.Status.START_MANUAL
116 || wfAction.getStatus() == WorkflowActionBean.Status.USER_RETRY
117 )) {
118 if (wfJob.getStatus() != WorkflowJob.Status.RUNNING) {
119 throw new PreconditionException(ErrorCode.E0810, WorkflowJob.Status.RUNNING.toString());
120 }
121 }
122 else {
123 throw new PreconditionException(ErrorCode.E0816, wfAction.getPending(), wfAction.getStatusStr());
124 }
125
126 executor = Services.get().get(ActionService.class).getExecutor(wfAction.getType());
127 if (executor == null) {
128 throw new CommandException(ErrorCode.E0802, wfAction.getType());
129 }
130 }
131
132 @Override
133 protected Void execute() throws CommandException {
134
135 LOG.debug("STARTED ActionStartXCommand for wf actionId=" + actionId);
136 Configuration conf = wfJob.getWorkflowInstance().getConf();
137
138 int maxRetries = conf.getInt(OozieClient.ACTION_MAX_RETRIES, executor.getMaxRetries());
139 long retryInterval = conf.getLong(OozieClient.ACTION_RETRY_INTERVAL, executor.getRetryInterval());
140 executor.setMaxRetries(maxRetries);
141 executor.setRetryInterval(retryInterval);
142
143 ActionExecutorContext context = null;
144 try {
145 boolean isRetry = false;
146 if (wfAction.getStatus() == WorkflowActionBean.Status.START_RETRY
147 || wfAction.getStatus() == WorkflowActionBean.Status.START_MANUAL) {
148 isRetry = true;
149 }
150 boolean isUserRetry = false;
151 if (wfAction.getStatus() == WorkflowActionBean.Status.USER_RETRY) {
152 isUserRetry = true;
153 }
154 context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry, isUserRetry);
155 try {
156 String tmpActionConf = XmlUtils.removeComments(wfAction.getConf());
157 String actionConf = context.getELEvaluator().evaluate(tmpActionConf, String.class);
158 wfAction.setConf(actionConf);
159 LOG.debug("Start, name [{0}] type [{1}] configuration{E}{E}{2}{E}", wfAction.getName(), wfAction
160 .getType(), actionConf);
161 }
162 catch (ELEvaluationException ex) {
163 throw new ActionExecutorException(ActionExecutorException.ErrorType.TRANSIENT, EL_EVAL_ERROR, ex
164 .getMessage(), ex);
165 }
166 catch (ELException ex) {
167 context.setErrorInfo(EL_ERROR, ex.getMessage());
168 LOG.warn("ELException in ActionStartXCommand ", ex.getMessage(), ex);
169 handleError(context, wfJob, wfAction);
170 return null;
171 }
172 catch (org.jdom.JDOMException je) {
173 context.setErrorInfo("ParsingError", je.getMessage());
174 LOG.warn("JDOMException in ActionStartXCommand ", je.getMessage(), je);
175 handleError(context, wfJob, wfAction);
176 return null;
177 }
178 catch (Exception ex) {
179 context.setErrorInfo(EL_ERROR, ex.getMessage());
180 LOG.warn("Exception in ActionStartXCommand ", ex.getMessage(), ex);
181 handleError(context, wfJob, wfAction);
182 return null;
183 }
184 wfAction.setErrorInfo(null, null);
185 incrActionCounter(wfAction.getType(), 1);
186
187 LOG.info("Start action [{0}] with user-retry state : userRetryCount [{1}], userRetryMax [{2}], userRetryInterval [{3}]",
188 wfAction.getId(), wfAction.getUserRetryCount(), wfAction.getUserRetryMax(), wfAction
189 .getUserRetryInterval());
190
191 Instrumentation.Cron cron = new Instrumentation.Cron();
192 cron.start();
193 context.setStartTime();
194 executor.start(context, wfAction);
195 cron.stop();
196 FaultInjection.activate("org.apache.oozie.command.SkipCommitFaultInjection");
197 addActionCron(wfAction.getType(), cron);
198
199 wfAction.setRetries(0);
200 if (wfAction.isExecutionComplete()) {
201 if (!context.isExecuted()) {
202 LOG.warn(XLog.OPS, "Action Completed, ActionExecutor [{0}] must call setExecutionData()", executor
203 .getType());
204 wfAction.setErrorInfo(EXEC_DATA_MISSING,
205 "Execution Complete, but Execution Data Missing from Action");
206 failJob(context);
207 jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
208 jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
209 return null;
210 }
211 wfAction.setPending();
212 queue(new ActionEndXCommand(wfAction.getId(), wfAction.getType()));
213 }
214 else {
215 if (!context.isStarted()) {
216 LOG.warn(XLog.OPS, "Action Started, ActionExecutor [{0}] must call setStartData()", executor
217 .getType());
218 wfAction.setErrorInfo(START_DATA_MISSING, "Execution Started, but Start Data Missing from Action");
219 failJob(context);
220 jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
221 jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
222 return null;
223 }
224 queue(new NotificationXCommand(wfJob, wfAction));
225 }
226
227 LOG.warn(XLog.STD, "[***" + wfAction.getId() + "***]" + "Action status=" + wfAction.getStatusStr());
228
229 jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
230 jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
231 // Add SLA status event (STARTED) for WF_ACTION
232 SLADbXOperations.writeStausEvent(wfAction.getSlaXml(), wfAction.getId(), Status.STARTED,
233 SlaAppType.WORKFLOW_ACTION);
234 LOG.warn(XLog.STD, "[***" + wfAction.getId() + "***]" + "Action updated in DB!");
235
236 }
237 catch (ActionExecutorException ex) {
238 LOG.warn("Error starting action [{0}]. ErrorType [{1}], ErrorCode [{2}], Message [{3}]",
239 wfAction.getName(), ex.getErrorType(), ex.getErrorCode(), ex.getMessage(), ex);
240 wfAction.setErrorInfo(ex.getErrorCode(), ex.getMessage());
241 switch (ex.getErrorType()) {
242 case TRANSIENT:
243 if (!handleTransient(context, executor, WorkflowAction.Status.START_RETRY)) {
244 handleNonTransient(context, executor, WorkflowAction.Status.START_MANUAL);
245 wfAction.setPendingAge(new Date());
246 wfAction.setRetries(0);
247 wfAction.setStartTime(null);
248 }
249 break;
250 case NON_TRANSIENT:
251 handleNonTransient(context, executor, WorkflowAction.Status.START_MANUAL);
252 break;
253 case ERROR:
254 handleError(context, executor, WorkflowAction.Status.ERROR.toString(), true,
255 WorkflowAction.Status.DONE);
256 break;
257 case FAILED:
258 try {
259 failJob(context);
260 // update coordinator action
261 new CoordActionUpdateXCommand(wfJob, 3).call();
262 new WfEndXCommand(wfJob).call(); // To delete the WF temp dir
263 SLADbXOperations.writeStausEvent(wfAction.getSlaXml(), wfAction.getId(), Status.FAILED,
264 SlaAppType.WORKFLOW_ACTION);
265 SLADbXOperations.writeStausEvent(wfJob.getSlaXml(), wfJob.getId(), Status.FAILED,
266 SlaAppType.WORKFLOW_JOB);
267 }
268 catch (XException x) {
269 LOG.warn("ActionStartXCommand - case:FAILED ", x.getMessage());
270 }
271 break;
272 }
273 try {
274 jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
275 jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
276 }
277 catch (JPAExecutorException je) {
278 throw new CommandException(je);
279 }
280 }
281 catch (JPAExecutorException je) {
282 throw new CommandException(je);
283 }
284
285 LOG.debug("ENDED ActionStartXCommand for wf actionId=" + actionId + ", jobId=" + jobId);
286
287 return null;
288 }
289
290 private void handleError(ActionExecutorContext context, WorkflowJobBean workflow, WorkflowActionBean action)
291 throws CommandException {
292 failJob(context);
293 try {
294 jpaService.execute(new WorkflowActionUpdateJPAExecutor(action));
295 jpaService.execute(new WorkflowJobUpdateJPAExecutor(workflow));
296 }
297 catch (JPAExecutorException je) {
298 throw new CommandException(je);
299 }
300 SLADbXOperations.writeStausEvent(action.getSlaXml(), action.getId(), Status.FAILED, SlaAppType.WORKFLOW_ACTION);
301 SLADbXOperations.writeStausEvent(workflow.getSlaXml(), workflow.getId(), Status.FAILED, SlaAppType.WORKFLOW_JOB);
302 // update coordinator action
303 new CoordActionUpdateXCommand(workflow, 3).call();
304 new WfEndXCommand(wfJob).call(); //To delete the WF temp dir
305 return;
306 }
307
308 /* (non-Javadoc)
309 * @see org.apache.oozie.command.XCommand#getKey()
310 */
311 @Override
312 public String getKey(){
313 return getName() + "_" + actionId;
314 }
315
316 }