This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.wf;
019
020 import java.util.Date;
021
022 import javax.servlet.jsp.el.ELException;
023
024 import org.apache.hadoop.conf.Configuration;
025 import org.apache.oozie.ErrorCode;
026 import org.apache.oozie.FaultInjection;
027 import org.apache.oozie.WorkflowActionBean;
028 import org.apache.oozie.WorkflowJobBean;
029 import org.apache.oozie.XException;
030 import org.apache.oozie.action.ActionExecutor;
031 import org.apache.oozie.action.ActionExecutorException;
032 import org.apache.oozie.client.OozieClient;
033 import org.apache.oozie.client.WorkflowAction;
034 import org.apache.oozie.client.WorkflowJob;
035 import org.apache.oozie.client.SLAEvent.SlaAppType;
036 import org.apache.oozie.client.SLAEvent.Status;
037 import org.apache.oozie.command.CommandException;
038 import org.apache.oozie.command.coord.CoordActionUpdateCommand;
039 import org.apache.oozie.service.ActionService;
040 import org.apache.oozie.service.Services;
041 import org.apache.oozie.service.UUIDService;
042 import org.apache.oozie.store.StoreException;
043 import org.apache.oozie.store.WorkflowStore;
044 import org.apache.oozie.util.ELEvaluationException;
045 import org.apache.oozie.util.Instrumentation;
046 import org.apache.oozie.util.XLog;
047 import org.apache.oozie.util.XmlUtils;
048 import org.apache.oozie.util.db.SLADbOperations;
049
050 public class ActionStartCommand extends ActionCommand<Void> {
051 public static final String EL_ERROR = "EL_ERROR";
052 public static final String EL_EVAL_ERROR = "EL_EVAL_ERROR";
053 public static final String COULD_NOT_START = "COULD_NOT_START";
054 public static final String START_DATA_MISSING = "START_DATA_MISSING";
055 public static final String EXEC_DATA_MISSING = "EXEC_DATA_MISSING";
056
057 private String id;
058 private String jobId;
059
060 public ActionStartCommand(String id, String type) {
061 super("action.start", type, 0);
062 this.id = id;
063 }
064
065 @Override
066 protected Void call(WorkflowStore store) throws StoreException, CommandException {
067 WorkflowJobBean workflow = store.getWorkflow(jobId, false);
068 setLogInfo(workflow);
069 WorkflowActionBean action = store.getAction(id, false);
070 XLog.getLog(getClass()).warn(XLog.STD,
071 "[***" + action.getId() + "***]" + "In call()....status=" + action.getStatusStr());
072 setLogInfo(action);
073 if (action.isPending()
074 && (action.getStatus() == WorkflowActionBean.Status.PREP
075 || action.getStatus() == WorkflowActionBean.Status.START_RETRY || action.getStatus() == WorkflowActionBean.Status.START_MANUAL)) {
076 if (workflow.getStatus() == WorkflowJob.Status.RUNNING) {
077
078 ActionExecutor executor = Services.get().get(ActionService.class).getExecutor(action.getType());
079 Configuration conf = workflow.getWorkflowInstance().getConf();
080
081 int maxRetries = conf.getInt(OozieClient.ACTION_MAX_RETRIES, executor.getMaxRetries());
082 long retryInterval = conf.getLong(OozieClient.ACTION_RETRY_INTERVAL, executor.getRetryInterval());
083 executor.setMaxRetries(maxRetries);
084 executor.setRetryInterval(retryInterval);
085
086 if (executor != null) {
087 ActionExecutorContext context = null;
088 try {
089 boolean isRetry = false;
090 if (action.getStatus() == WorkflowActionBean.Status.START_RETRY
091 || action.getStatus() == WorkflowActionBean.Status.START_MANUAL) {
092 isRetry = true;
093 }
094 context = new ActionCommand.ActionExecutorContext(workflow, action, isRetry);
095 try {
096 String tmpActionConf = XmlUtils.removeComments(action.getConf());
097 String actionConf = context.getELEvaluator().evaluate(tmpActionConf, String.class);
098 action.setConf(actionConf);
099
100 XLog.getLog(getClass()).debug("Start, name [{0}] type [{1}] configuration{E}{E}{2}{E}",
101 action.getName(), action.getType(), actionConf);
102
103 }
104 catch (ELEvaluationException ex) {
105 throw new ActionExecutorException(ActionExecutorException.ErrorType.TRANSIENT,
106 EL_EVAL_ERROR, ex.getMessage(), ex);
107 }
108 catch (ELException ex) {
109 context.setErrorInfo(EL_ERROR, ex.getMessage());
110 XLog.getLog(getClass()).warn("ELException in ActionStartCommand ", ex.getMessage(), ex);
111 handleError(context, store, workflow, action);
112 return null;
113 }
114 catch (org.jdom.JDOMException je) {
115 context.setErrorInfo("ParsingError", je.getMessage());
116 XLog.getLog(getClass()).warn("JDOMException in ActionStartCommand ", je.getMessage(), je);
117 handleError(context, store, workflow, action);
118 return null;
119 }
120 catch (Exception ex) {
121 context.setErrorInfo(EL_ERROR, ex.getMessage());
122 XLog.getLog(getClass()).warn("Exception in ActionStartCommand ", ex.getMessage(), ex);
123 handleError(context, store, workflow, action);
124 return null;
125 }
126 action.setErrorInfo(null, null);
127 incrActionCounter(action.getType(), 1);
128
129 Instrumentation.Cron cron = new Instrumentation.Cron();
130 cron.start();
131 executor.start(context, action);
132 cron.stop();
133 FaultInjection.activate("org.apache.oozie.command.SkipCommitFaultInjection");
134 addActionCron(action.getType(), cron);
135
136 action.setRetries(0);
137 if (action.isExecutionComplete()) {
138 if (!context.isExecuted()) {
139 XLog.getLog(getClass()).warn(XLog.OPS,
140 "Action Completed, ActionExecutor [{0}] must call setExecutionData()",
141 executor.getType());
142 action.setErrorInfo(EXEC_DATA_MISSING,
143 "Execution Complete, but Execution Data Missing from Action");
144 failJob(context);
145 store.updateAction(action);
146 store.updateWorkflow(workflow);
147 return null;
148 }
149 action.setPending();
150 queueCallable(new ActionEndCommand(action.getId(), action.getType()));
151 }
152 else {
153 if (!context.isStarted()) {
154 XLog.getLog(getClass()).warn(XLog.OPS,
155 "Action Started, ActionExecutor [{0}] must call setStartData()",
156 executor.getType());
157 action.setErrorInfo(START_DATA_MISSING,
158 "Execution Started, but Start Data Missing from Action");
159 failJob(context);
160 store.updateAction(action);
161 store.updateWorkflow(workflow);
162 return null;
163 }
164 queueCallable(new NotificationCommand(workflow, action));
165 }
166
167 XLog.getLog(getClass()).warn(XLog.STD,
168 "[***" + action.getId() + "***]" + "Action status=" + action.getStatusStr());
169
170 store.updateAction(action);
171 store.updateWorkflow(workflow);
172 // Add SLA status event (STARTED) for WF_ACTION
173 // SLADbOperations.writeSlaStatusEvent(eSla,
174 // action.getId(), Status.STARTED, store);
175 SLADbOperations.writeStausEvent(action.getSlaXml(), action.getId(), store, Status.STARTED,
176 SlaAppType.WORKFLOW_ACTION);
177 XLog.getLog(getClass()).warn(XLog.STD,
178 "[***" + action.getId() + "***]" + "Action updated in DB!");
179
180 }
181 catch (ActionExecutorException ex) {
182 XLog.getLog(getClass()).warn(
183 "Error starting action [{0}]. ErrorType [{1}], ErrorCode [{2}], Message [{3}]",
184 action.getName(), ex.getErrorType(), ex.getErrorCode(), ex.getMessage(), ex);
185 action.setErrorInfo(ex.getErrorCode(), ex.getMessage());
186 switch (ex.getErrorType()) {
187 case TRANSIENT:
188 if (!handleTransient(context, executor, WorkflowAction.Status.START_RETRY)) {
189 handleNonTransient(store, context, executor, WorkflowAction.Status.START_MANUAL);
190 action.setPendingAge(new Date());
191 action.setRetries(0);
192 action.setStartTime(null);
193 }
194 break;
195 case NON_TRANSIENT:
196 handleNonTransient(store, context, executor, WorkflowAction.Status.START_MANUAL);
197 break;
198 case ERROR:
199 handleError(context, executor, WorkflowAction.Status.ERROR.toString(), true,
200 WorkflowAction.Status.DONE);
201 break;
202 case FAILED:
203 try {
204 failJob(context);
205 queueCallable(new CoordActionUpdateCommand(workflow));
206 SLADbOperations.writeStausEvent(action.getSlaXml(), action.getId(), store,
207 Status.FAILED, SlaAppType.WORKFLOW_ACTION);
208 SLADbOperations.writeStausEvent(workflow.getSlaXml(), workflow.getId(), store,
209 Status.FAILED, SlaAppType.WORKFLOW_JOB);
210 }
211 catch (XException x) {
212 XLog.getLog(getClass()).warn("ActionStartCommand - case:FAILED ", x.getMessage());
213 }
214 break;
215 }
216 store.updateAction(action);
217 store.updateWorkflow(workflow);
218 }
219 }
220 else {
221 throw new CommandException(ErrorCode.E0802, action.getType());
222 }
223
224 }
225 else {
226 XLog.getLog(getClass()).warn("Job state is not {0}. Skipping Action Execution",
227 WorkflowJob.Status.RUNNING.toString());
228 }
229 }
230 return null;
231 }
232
233 private void handleError(ActionExecutorContext context, WorkflowStore store, WorkflowJobBean workflow,
234 WorkflowActionBean action) throws CommandException, StoreException {
235 failJob(context);
236 store.updateAction(action);
237 store.updateWorkflow(workflow);
238 SLADbOperations.writeStausEvent(action.getSlaXml(), action.getId(), store, Status.FAILED,
239 SlaAppType.WORKFLOW_ACTION);
240 SLADbOperations.writeStausEvent(workflow.getSlaXml(), workflow.getId(), store, Status.FAILED,
241 SlaAppType.WORKFLOW_JOB);
242 queueCallable(new CoordActionUpdateCommand(workflow));
243 return;
244 }
245
246 @Override
247 protected Void execute(WorkflowStore store) throws CommandException, StoreException {
248 try {
249 XLog.getLog(getClass()).debug("STARTED ActionStartCommand for wf actionId=" + id);
250 jobId = Services.get().get(UUIDService.class).getId(id);
251 if (lock(jobId)) {
252 call(store);
253 }
254 else {
255 queueCallable(new ActionStartCommand(id, getType()), LOCK_FAILURE_REQUEUE_INTERVAL);
256 XLog.getLog(getClass()).warn("ActionStartCommand lock was not acquired - failed {0}", id);
257 }
258 }
259 catch (InterruptedException e) {
260 queueCallable(new ActionStartCommand(id, getType()), LOCK_FAILURE_REQUEUE_INTERVAL);
261 XLog.getLog(getClass()).warn("ActionStartCommand lock was not acquired - interrupted exception failed {0}",
262 id);
263 }
264 XLog.getLog(getClass()).debug("ENDED ActionStartCommand for wf actionId=" + id + ", jobId=" + jobId);
265 return null;
266 }
267
268 }