This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.wf;
019
020 import java.sql.Timestamp;
021 import java.util.ArrayList;
022 import java.util.Date;
023 import java.util.List;
024
025 import org.apache.oozie.ErrorCode;
026 import org.apache.oozie.WorkflowActionBean;
027 import org.apache.oozie.WorkflowJobBean;
028 import org.apache.oozie.XException;
029 import org.apache.oozie.action.ActionExecutor;
030 import org.apache.oozie.action.ActionExecutorException;
031 import org.apache.oozie.client.WorkflowAction;
032 import org.apache.oozie.client.WorkflowJob;
033 import org.apache.oozie.client.WorkflowAction.Status;
034 import org.apache.oozie.client.rest.JsonBean;
035 import org.apache.oozie.command.CommandException;
036 import org.apache.oozie.command.PreconditionException;
037 import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
038 import org.apache.oozie.executor.jpa.JPAExecutorException;
039 import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
040 import org.apache.oozie.executor.jpa.WorkflowActionUpdateJPAExecutor;
041 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
042 import org.apache.oozie.service.ActionCheckerService;
043 import org.apache.oozie.service.ActionService;
044 import org.apache.oozie.service.JPAService;
045 import org.apache.oozie.service.Services;
046 import org.apache.oozie.service.UUIDService;
047 import org.apache.oozie.util.InstrumentUtils;
048 import org.apache.oozie.util.Instrumentation;
049 import org.apache.oozie.util.LogUtils;
050 import org.apache.oozie.util.XLog;
051
052 /**
053 * Executes the check command for ActionHandlers. </p> Ensures the action is in
054 * RUNNING state before executing
055 * {@link ActionExecutor#check(org.apache.oozie.action.ActionExecutor.Context, org.apache.oozie.client.WorkflowAction)}
056 */
057 public class ActionCheckXCommand extends ActionXCommand<Void> {
058 public static final String EXEC_DATA_MISSING = "EXEC_DATA_MISSING";
059 private String actionId;
060 private String jobId;
061 private int actionCheckDelay;
062 private WorkflowJobBean wfJob = null;
063 private WorkflowActionBean wfAction = null;
064 private JPAService jpaService = null;
065 private ActionExecutor executor = null;
066 private List<JsonBean> updateList = new ArrayList<JsonBean>();
067
068 public ActionCheckXCommand(String actionId) {
069 this(actionId, -1);
070 }
071
072 public ActionCheckXCommand(String actionId, int priority, int checkDelay) {
073 super("action.check", "action.check", priority);
074 this.actionId = actionId;
075 this.actionCheckDelay = checkDelay;
076 this.jobId = Services.get().get(UUIDService.class).getId(actionId);
077 }
078
079 public ActionCheckXCommand(String actionId, int checkDelay) {
080 this(actionId, 0, checkDelay);
081 }
082
083 @Override
084 protected void eagerLoadState() throws CommandException {
085 try {
086 jpaService = Services.get().get(JPAService.class);
087 if (jpaService != null) {
088 this.wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(jobId));
089 this.wfAction = jpaService.execute(new WorkflowActionGetJPAExecutor(actionId));
090 LogUtils.setLogInfo(wfJob, logInfo);
091 LogUtils.setLogInfo(wfAction, logInfo);
092 }
093 else {
094 throw new CommandException(ErrorCode.E0610);
095 }
096 }
097 catch (XException ex) {
098 throw new CommandException(ex);
099 }
100 }
101
102 @Override
103 protected void eagerVerifyPrecondition() throws CommandException, PreconditionException {
104 if (wfJob == null) {
105 throw new PreconditionException(ErrorCode.E0604, jobId);
106 }
107 if (wfAction == null) {
108 throw new PreconditionException(ErrorCode.E0605, actionId);
109 }
110 // if the action has been updated, quit this command
111 if (actionCheckDelay > 0) {
112 Timestamp actionCheckTs = new Timestamp(System.currentTimeMillis() - actionCheckDelay * 1000);
113 Timestamp actionLmt = wfAction.getLastCheckTimestamp();
114 if (actionLmt.after(actionCheckTs)) {
115 throw new PreconditionException(ErrorCode.E0817, actionId);
116 }
117 }
118
119 executor = Services.get().get(ActionService.class).getExecutor(wfAction.getType());
120 if (executor == null) {
121 throw new CommandException(ErrorCode.E0802, wfAction.getType());
122 }
123 }
124
125 @Override
126 protected boolean isLockRequired() {
127 return true;
128 }
129
130 @Override
131 public String getEntityKey() {
132 return this.jobId;
133 }
134
135 @Override
136 protected void loadState() throws CommandException {
137 }
138
139 @Override
140 protected void verifyPrecondition() throws CommandException, PreconditionException {
141 if (!wfAction.isPending() || wfAction.getStatus() != WorkflowActionBean.Status.RUNNING) {
142 throw new PreconditionException(ErrorCode.E0815, wfAction.getPending(), wfAction.getStatusStr());
143 }
144 if (wfJob.getStatus() != WorkflowJob.Status.RUNNING) {
145 wfAction.setLastCheckTime(new Date());
146 try {
147 jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
148 }
149 catch (JPAExecutorException e) {
150 throw new CommandException(e);
151 }
152 throw new PreconditionException(ErrorCode.E0818, wfAction.getId(), wfJob.getId(), wfJob.getStatus());
153 }
154 }
155
156 @Override
157 protected Void execute() throws CommandException {
158 LOG.debug("STARTED ActionCheckXCommand for wf actionId=" + actionId + " priority =" + getPriority());
159
160 long retryInterval = Services.get().getConf().getLong(ActionCheckerService.CONF_ACTION_CHECK_INTERVAL, executor
161 .getRetryInterval());
162 executor.setRetryInterval(retryInterval);
163
164 ActionExecutorContext context = null;
165 try {
166 boolean isRetry = false;
167 if (wfAction.getRetries() > 0) {
168 isRetry = true;
169 }
170 boolean isUserRetry = false;
171 context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry, isUserRetry);
172 incrActionCounter(wfAction.getType(), 1);
173
174 Instrumentation.Cron cron = new Instrumentation.Cron();
175 cron.start();
176 executor.check(context, wfAction);
177 cron.stop();
178 addActionCron(wfAction.getType(), cron);
179
180 if (wfAction.isExecutionComplete()) {
181 if (!context.isExecuted()) {
182 LOG.warn(XLog.OPS, "Action Completed, ActionExecutor [{0}] must call setExecutionData()", executor
183 .getType());
184 wfAction.setErrorInfo(EXEC_DATA_MISSING,
185 "Execution Complete, but Execution Data Missing from Action");
186 failJob(context);
187 } else {
188 wfAction.setPending();
189 queue(new ActionEndXCommand(wfAction.getId(), wfAction.getType()));
190 }
191 }
192 wfAction.setLastCheckTime(new Date());
193 updateList.add(wfAction);
194 wfJob.setLastModifiedTime(new Date());
195 updateList.add(wfJob);
196 }
197 catch (ActionExecutorException ex) {
198 LOG.warn("Exception while executing check(). Error Code [{0}], Message[{1}]", ex.getErrorCode(), ex
199 .getMessage(), ex);
200
201 wfAction.setErrorInfo(ex.getErrorCode(), ex.getMessage());
202
203 switch (ex.getErrorType()) {
204 case FAILED:
205 failAction(wfJob, wfAction);
206 break;
207 case ERROR:
208 handleUserRetry(wfAction);
209 break;
210 case TRANSIENT: // retry N times, then suspend workflow
211 if (!handleTransient(context, executor, WorkflowAction.Status.RUNNING)) {
212 handleNonTransient(context, executor, WorkflowAction.Status.START_MANUAL);
213 wfAction.setPendingAge(new Date());
214 wfAction.setRetries(0);
215 wfAction.setStartTime(null);
216 }
217 break;
218 }
219 wfAction.setLastCheckTime(new Date());
220 updateList = new ArrayList<JsonBean>();
221 updateList.add(wfAction);
222 wfJob.setLastModifiedTime(new Date());
223 updateList.add(wfJob);
224 }
225 finally {
226 try {
227 jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, null));
228 }
229 catch (JPAExecutorException e) {
230 throw new CommandException(e);
231 }
232 }
233
234 LOG.debug("ENDED ActionCheckXCommand for wf actionId=" + actionId + ", jobId=" + jobId);
235 return null;
236 }
237
238 private void failAction(WorkflowJobBean workflow, WorkflowActionBean action) throws CommandException {
239 if (!handleUserRetry(action)) {
240 LOG.warn("Failing Job [{0}] due to failed action [{1}]", workflow.getId(), action.getId());
241 action.resetPending();
242 action.setStatus(Status.FAILED);
243 workflow.setStatus(WorkflowJob.Status.FAILED);
244 InstrumentUtils.incrJobCounter(INSTR_FAILED_JOBS_COUNTER, 1, getInstrumentation());
245 }
246 }
247
248 protected long getRetryInterval() {
249 return (executor != null) ? executor.getRetryInterval() : ActionExecutor.RETRY_INTERVAL;
250 }
251
252 }