This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.wf;
019
020 import java.sql.Timestamp;
021 import java.util.Date;
022 import org.apache.oozie.ErrorCode;
023 import org.apache.oozie.WorkflowActionBean;
024 import org.apache.oozie.WorkflowJobBean;
025 import org.apache.oozie.XException;
026 import org.apache.oozie.action.ActionExecutor;
027 import org.apache.oozie.action.ActionExecutorException;
028 import org.apache.oozie.client.WorkflowJob;
029 import org.apache.oozie.client.WorkflowAction.Status;
030 import org.apache.oozie.command.CommandException;
031 import org.apache.oozie.command.PreconditionException;
032 import org.apache.oozie.executor.jpa.JPAExecutorException;
033 import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
034 import org.apache.oozie.executor.jpa.WorkflowActionUpdateJPAExecutor;
035 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
036 import org.apache.oozie.executor.jpa.WorkflowJobUpdateJPAExecutor;
037 import org.apache.oozie.service.ActionService;
038 import org.apache.oozie.service.JPAService;
039 import org.apache.oozie.service.Services;
040 import org.apache.oozie.service.UUIDService;
041 import org.apache.oozie.util.InstrumentUtils;
042 import org.apache.oozie.util.Instrumentation;
043 import org.apache.oozie.util.LogUtils;
044 import org.apache.oozie.util.XLog;
045
046 /**
047 * Executes the check command for ActionHandlers. </p> Ensures the action is in
048 * RUNNING state before executing
049 * {@link ActionExecutor#check(org.apache.oozie.action.ActionExecutor.Context, org.apache.oozie.client.WorkflowAction)}
050 */
051 public class ActionCheckXCommand extends ActionXCommand<Void> {
052 public static final String EXEC_DATA_MISSING = "EXEC_DATA_MISSING";
053 private String actionId;
054 private String jobId;
055 private int actionCheckDelay;
056 private WorkflowJobBean wfJob = null;
057 private WorkflowActionBean wfAction = null;
058 private JPAService jpaService = null;
059 private ActionExecutor executor = null;
060
061 public ActionCheckXCommand(String actionId) {
062 this(actionId, -1);
063 }
064
065 public ActionCheckXCommand(String actionId, int priority, int checkDelay) {
066 super("action.check", "action.check", priority);
067 this.actionId = actionId;
068 this.actionCheckDelay = checkDelay;
069 this.jobId = Services.get().get(UUIDService.class).getId(actionId);
070 }
071
072 public ActionCheckXCommand(String actionId, int checkDelay) {
073 this(actionId, 0, checkDelay);
074 }
075
076 @Override
077 protected void eagerLoadState() throws CommandException {
078 try {
079 jpaService = Services.get().get(JPAService.class);
080 if (jpaService != null) {
081 this.wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(jobId));
082 this.wfAction = jpaService.execute(new WorkflowActionGetJPAExecutor(actionId));
083 LogUtils.setLogInfo(wfJob, logInfo);
084 LogUtils.setLogInfo(wfAction, logInfo);
085 }
086 else {
087 throw new CommandException(ErrorCode.E0610);
088 }
089 }
090 catch (XException ex) {
091 throw new CommandException(ex);
092 }
093 }
094
095 @Override
096 protected void eagerVerifyPrecondition() throws CommandException, PreconditionException {
097 if (wfJob == null) {
098 throw new PreconditionException(ErrorCode.E0604, jobId);
099 }
100 if (wfAction == null) {
101 throw new PreconditionException(ErrorCode.E0605, actionId);
102 }
103 // if the action has been updated, quit this command
104 if (actionCheckDelay > 0) {
105 Timestamp actionCheckTs = new Timestamp(System.currentTimeMillis() - actionCheckDelay * 1000);
106 Timestamp actionLmt = wfAction.getLastCheckTimestamp();
107 if (actionLmt.after(actionCheckTs)) {
108 throw new PreconditionException(ErrorCode.E0817, actionId);
109 }
110 }
111
112 executor = Services.get().get(ActionService.class).getExecutor(wfAction.getType());
113 if (executor == null) {
114 throw new CommandException(ErrorCode.E0802, wfAction.getType());
115 }
116 }
117
118 @Override
119 protected boolean isLockRequired() {
120 return true;
121 }
122
123 @Override
124 public String getEntityKey() {
125 return this.jobId;
126 }
127
128 @Override
129 protected void loadState() throws CommandException {
130 }
131
132 @Override
133 protected void verifyPrecondition() throws CommandException, PreconditionException {
134 if (!wfAction.isPending() || wfAction.getStatus() != WorkflowActionBean.Status.RUNNING) {
135 throw new PreconditionException(ErrorCode.E0815, wfAction.getPending(), wfAction.getStatusStr());
136 }
137 if (wfJob.getStatus() != WorkflowJob.Status.RUNNING) {
138 wfAction.setLastCheckTime(new Date());
139 try {
140 jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
141 }
142 catch (JPAExecutorException e) {
143 throw new CommandException(e);
144 }
145 throw new PreconditionException(ErrorCode.E0818, wfAction.getId(), wfJob.getId(), wfJob.getStatus());
146 }
147 }
148
149 @Override
150 protected Void execute() throws CommandException {
151 LOG.debug("STARTED ActionCheckXCommand for wf actionId=" + actionId + " priority =" + getPriority());
152
153 ActionExecutorContext context = null;
154 try {
155 boolean isRetry = false;
156 boolean isUserRetry = false;
157 context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry, isUserRetry);
158 incrActionCounter(wfAction.getType(), 1);
159
160 Instrumentation.Cron cron = new Instrumentation.Cron();
161 cron.start();
162 executor.check(context, wfAction);
163 cron.stop();
164 addActionCron(wfAction.getType(), cron);
165
166 if (wfAction.isExecutionComplete()) {
167 if (!context.isExecuted()) {
168 LOG.warn(XLog.OPS, "Action Completed, ActionExecutor [{0}] must call setExecutionData()", executor
169 .getType());
170 wfAction.setErrorInfo(EXEC_DATA_MISSING,
171 "Execution Complete, but Execution Data Missing from Action");
172 failJob(context);
173 wfAction.setLastCheckTime(new Date());
174 jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
175 jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
176 return null;
177 }
178 wfAction.setPending();
179 queue(new ActionEndXCommand(wfAction.getId(), wfAction.getType()));
180 }
181 wfAction.setLastCheckTime(new Date());
182 jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
183 jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
184 }
185 catch (ActionExecutorException ex) {
186 LOG.warn("Exception while executing check(). Error Code [{0}], Message[{1}]", ex.getErrorCode(), ex
187 .getMessage(), ex);
188
189 wfAction.setErrorInfo(ex.getErrorCode(), ex.getMessage());
190
191 switch (ex.getErrorType()) {
192 case FAILED:
193 failAction(wfJob, wfAction);
194 break;
195 case ERROR:
196 handleUserRetry(wfAction);
197 break;
198 }
199 wfAction.setLastCheckTime(new Date());
200 try {
201 jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
202 jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
203 }
204 catch (JPAExecutorException e) {
205 throw new CommandException(e);
206 }
207 return null;
208 }
209 catch (JPAExecutorException e) {
210 throw new CommandException(e);
211 }
212
213 LOG.debug("ENDED ActionCheckXCommand for wf actionId=" + actionId + ", jobId=" + jobId);
214 return null;
215 }
216
217 private void failAction(WorkflowJobBean workflow, WorkflowActionBean action) throws CommandException {
218 if (!handleUserRetry(action)) {
219 LOG.warn("Failing Job [{0}] due to failed action [{1}]", workflow.getId(), action.getId());
220 action.resetPending();
221 action.setStatus(Status.FAILED);
222 workflow.setStatus(WorkflowJob.Status.FAILED);
223 InstrumentUtils.incrJobCounter(INSTR_FAILED_JOBS_COUNTER, 1, getInstrumentation());
224 }
225 }
226 }