This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.wf;
019
020 import java.sql.Timestamp;
021 import java.util.Date;
022
023 import org.apache.oozie.WorkflowActionBean;
024 import org.apache.oozie.WorkflowJobBean;
025 import org.apache.oozie.action.ActionExecutor;
026 import org.apache.oozie.action.ActionExecutorException;
027 import org.apache.oozie.client.WorkflowJob;
028 import org.apache.oozie.client.WorkflowAction.Status;
029 import org.apache.oozie.command.CommandException;
030 import org.apache.oozie.service.ActionService;
031 import org.apache.oozie.service.Services;
032 import org.apache.oozie.service.UUIDService;
033 import org.apache.oozie.store.StoreException;
034 import org.apache.oozie.store.WorkflowStore;
035 import org.apache.oozie.util.Instrumentation;
036 import org.apache.oozie.util.XLog;
037
038 /**
039 * Executes the check command for ActionHandlers. </p> Ensures the action is in RUNNING state before executing {@link
040 * ActionExecutor#check(org.apache.oozie.action.ActionExecutor.Context, org.apache.oozie.client.WorkflowAction)}
041 */
042 public class ActionCheckCommand extends ActionCommand<Void> {
043 public static final String EXEC_DATA_MISSING = "EXEC_DATA_MISSING";
044 private String id;
045 private String jobId;
046 private int actionCheckDelay;
047
048 public ActionCheckCommand(String id) {
049 this(id, -1);
050 }
051
052 public ActionCheckCommand(String id, int priority, int checkDelay) {
053 super("action.check", "action.check", priority);
054 this.id = id;
055 this.actionCheckDelay = checkDelay;
056 }
057
058 public ActionCheckCommand(String id, int checkDelay) {
059 this(id, 0, checkDelay);
060 }
061
062 @Override
063 protected Void call(WorkflowStore store) throws StoreException, CommandException {
064
065 // String jobId = Services.get().get(UUIDService.class).getId(id);
066 WorkflowJobBean workflow = store.getWorkflow(jobId, false);
067 setLogInfo(workflow);
068 WorkflowActionBean action = store.getAction(id, false);
069 setLogInfo(action);
070 if (action.isPending() && action.getStatus() == WorkflowActionBean.Status.RUNNING) {
071 setLogInfo(workflow);
072 // if the action has been updated, quit this command
073 if (actionCheckDelay > 0) {
074 Timestamp actionCheckTs = new Timestamp(System.currentTimeMillis() - actionCheckDelay * 1000);
075 Timestamp actionLmt = action.getLastCheckTimestamp();
076 if (actionLmt.after(actionCheckTs)) {
077 XLog.getLog(getClass()).debug(
078 "The wf action :" + id + " has been udated recently. Ignoring ActionCheckCommand!");
079 return null;
080 }
081 }
082 if (workflow.getStatus() == WorkflowJob.Status.RUNNING) {
083 ActionExecutor executor = Services.get().get(ActionService.class).getExecutor(action.getType());
084 if (executor != null) {
085 ActionExecutorContext context = null;
086 try {
087 boolean isRetry = false;
088 context = new ActionCommand.ActionExecutorContext(workflow, action, isRetry);
089 incrActionCounter(action.getType(), 1);
090
091 Instrumentation.Cron cron = new Instrumentation.Cron();
092 cron.start();
093 executor.check(context, action);
094 cron.stop();
095 addActionCron(action.getType(), cron);
096
097 if (action.isExecutionComplete()) {
098 if (!context.isExecuted()) {
099 XLog.getLog(getClass()).warn(XLog.OPS,
100 "Action Completed, ActionExecutor [{0}] must call setExecutionData()",
101 executor.getType());
102 action.setErrorInfo(EXEC_DATA_MISSING,
103 "Execution Complete, but Execution Data Missing from Action");
104 failJob(context);
105 action.setLastCheckTime(new Date());
106 store.updateAction(action);
107 store.updateWorkflow(workflow);
108 return null;
109 }
110 action.setPending();
111 queueCallable(new ActionEndCommand(action.getId(), action.getType()));
112 }
113 action.setLastCheckTime(new Date());
114 store.updateAction(action);
115 store.updateWorkflow(workflow);
116 }
117 catch (ActionExecutorException ex) {
118 XLog.getLog(getClass()).warn(
119 "Exception while executing check(). Error Code [{0}], Message[{1}]", ex.getErrorCode(),
120 ex.getMessage(), ex);
121
122 switch (ex.getErrorType()) {
123 case FAILED:
124 failAction(workflow, action);
125 break;
126 }
127 action.setLastCheckTime(new Date());
128 store.updateAction(action);
129 store.updateWorkflow(workflow);
130 return null;
131 }
132 }
133 }
134 else {
135 action.setLastCheckTime(new Date());
136 store.updateAction(action);
137 XLog.getLog(getClass()).warn(
138 "Action [{0}] status is running but WF Job [{1}] status is [{2}]. Expected status is RUNNING.",
139 action.getId(), workflow.getId(), workflow.getStatus());
140 }
141 }
142 return null;
143 }
144
145 private void failAction(WorkflowJobBean workflow, WorkflowActionBean action) throws CommandException {
146 XLog.getLog(getClass()).warn("Failing Job [{0}] due to failed action [{1}]", workflow.getId(), action.getId());
147 action.resetPending();
148 action.setStatus(Status.FAILED);
149 workflow.setStatus(WorkflowJob.Status.FAILED);
150 incrJobCounter(INSTR_FAILED_JOBS_COUNTER, 1);
151 }
152
153 /**
154 * @param args
155 * @throws Exception
156 */
157 public static void main(String[] args) throws Exception {
158 new Services().init();
159
160 try {
161 new ActionCheckCommand("0000001-100122154231282-oozie-dani-W@pig1").call();
162 Thread.sleep(100000);
163 }
164 finally {
165 new Services().destroy();
166 }
167 }
168
169 @Override
170 protected Void execute(WorkflowStore store) throws CommandException, StoreException {
171 try {
172 XLog.getLog(getClass()).debug("STARTED ActionCheckCommand for wf actionId=" + id + " priority =" + getPriority());
173 jobId = Services.get().get(UUIDService.class).getId(id);
174 if (lock(jobId)) {
175 call(store);
176 }
177 else {
178 queueCallable(new ActionCheckCommand(id, actionCheckDelay), LOCK_FAILURE_REQUEUE_INTERVAL);
179 XLog.getLog(getClass()).warn("ActionCheckCommand lock was not acquired - failed {0}", id);
180 }
181 }
182 catch (InterruptedException e) {
183 queueCallable(new ActionCheckCommand(id, actionCheckDelay), LOCK_FAILURE_REQUEUE_INTERVAL);
184 XLog.getLog(getClass()).warn("ActionCheckCommand lock was not acquired - interrupted exception failed {0}",
185 id);
186 }
187 XLog.getLog(getClass()).debug("ENDED ActionCheckCommand for wf actionId=" + id + ", jobId=" + jobId);
188 return null;
189 }
190 }