This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.service;
019
020 import java.util.ArrayList;
021 import java.util.List;
022
023 import org.apache.hadoop.conf.Configuration;
024 import org.apache.oozie.CoordinatorActionBean;
025 import org.apache.oozie.ErrorCode;
026 import org.apache.oozie.WorkflowActionBean;
027 import org.apache.oozie.command.CommandException;
028 import org.apache.oozie.command.coord.CoordActionCheckCommand;
029 import org.apache.oozie.command.coord.CoordActionCheckXCommand;
030 import org.apache.oozie.command.wf.ActionCheckCommand;
031 import org.apache.oozie.command.wf.ActionCheckXCommand;
032 import org.apache.oozie.executor.jpa.CoordActionsRunningGetJPAExecutor;
033 import org.apache.oozie.executor.jpa.JPAExecutorException;
034 import org.apache.oozie.executor.jpa.WorkflowActionsRunningGetJPAExecutor;
035 import org.apache.oozie.util.XCallable;
036 import org.apache.oozie.util.XLog;
037
038 /**
039 * The Action Checker Service queue ActionCheckCommands to check the status of
040 * running actions and CoordActionCheckCommands to check the status of
041 * coordinator actions. The delay between checks on the same action can be
042 * configured.
043 */
044 public class ActionCheckerService implements Service {
045
046 public static final String CONF_PREFIX = Service.CONF_PREFIX + "ActionCheckerService.";
047 /**
048 * The frequency at which the ActionCheckService will run.
049 */
050 public static final String CONF_ACTION_CHECK_INTERVAL = CONF_PREFIX + "action.check.interval";
051 /**
052 * The time, in seconds, between an ActionCheck for the same action.
053 */
054 public static final String CONF_ACTION_CHECK_DELAY = CONF_PREFIX + "action.check.delay";
055
056 /**
057 * The number of callables to be queued in a batch.
058 */
059 public static final String CONF_CALLABLE_BATCH_SIZE = CONF_PREFIX + "callable.batch.size";
060
061 protected static final String INSTRUMENTATION_GROUP = "actionchecker";
062 protected static final String INSTR_CHECK_ACTIONS_COUNTER = "checks_wf_actions";
063 protected static final String INSTR_CHECK_COORD_ACTIONS_COUNTER = "checks_coord_actions";
064
065 private static boolean useXCommand = true;
066
067 /**
068 * {@link ActionCheckRunnable} is the runnable which is scheduled to run and
069 * queue Action checks.
070 */
071 static class ActionCheckRunnable implements Runnable {
072 private int actionCheckDelay;
073 private List<XCallable<Void>> callables;
074 private StringBuilder msg = null;
075
076 public ActionCheckRunnable(int actionCheckDelay) {
077 this.actionCheckDelay = actionCheckDelay;
078 }
079
080 public void run() {
081 XLog.Info.get().clear();
082 XLog LOG = XLog.getLog(getClass());
083 msg = new StringBuilder();
084 try {
085 runWFActionCheck();
086 runCoordActionCheck();
087 }
088 catch (CommandException ce) {
089 LOG.error("Unable to run action checks, ", ce);
090 }
091
092 LOG.debug("QUEUING [{0}] for potential checking", msg.toString());
093 if (null != callables) {
094 boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables);
095 if (ret == false) {
096 LOG.warn("Unable to queue the callables commands for CheckerService. "
097 + "Most possibly command queue is full. Queue size is :"
098 + Services.get().get(CallableQueueService.class).queueSize());
099 }
100 callables = null;
101 }
102 }
103
104 /**
105 * check workflow actions
106 *
107 * @throws CommandException
108 */
109 private void runWFActionCheck() throws CommandException {
110 JPAService jpaService = Services.get().get(JPAService.class);
111 if (jpaService == null) {
112 throw new CommandException(ErrorCode.E0610);
113 }
114
115 List<WorkflowActionBean> actions;
116 try {
117 actions = jpaService
118 .execute(new WorkflowActionsRunningGetJPAExecutor(actionCheckDelay));
119 }
120 catch (JPAExecutorException je) {
121 throw new CommandException(je);
122 }
123
124 if (actions == null || actions.size() == 0) {
125 return;
126 }
127 msg.append(" WF_ACTIONS : " + actions.size());
128
129 for (WorkflowActionBean action : actions) {
130 Services.get().get(InstrumentationService.class).get().incr(INSTRUMENTATION_GROUP,
131 INSTR_CHECK_ACTIONS_COUNTER, 1);
132 if (useXCommand) {
133 queueCallable(new ActionCheckXCommand(action.getId()));
134 }
135 else {
136 queueCallable(new ActionCheckCommand(action.getId()));
137 }
138 }
139
140 }
141
142 /**
143 * check coordinator actions
144 *
145 * @throws CommandException
146 */
147 private void runCoordActionCheck() throws CommandException {
148 JPAService jpaService = Services.get().get(JPAService.class);
149 if (jpaService == null) {
150 throw new CommandException(ErrorCode.E0610);
151 }
152
153 List<CoordinatorActionBean> cactions;
154 try {
155 cactions = jpaService.execute(new CoordActionsRunningGetJPAExecutor(
156 actionCheckDelay));
157 }
158 catch (JPAExecutorException je) {
159 throw new CommandException(je);
160 }
161
162 if (cactions == null || cactions.size() == 0) {
163 return;
164 }
165
166 msg.append(" COORD_ACTIONS : " + cactions.size());
167
168 for (CoordinatorActionBean caction : cactions) {
169 Services.get().get(InstrumentationService.class).get().incr(INSTRUMENTATION_GROUP,
170 INSTR_CHECK_COORD_ACTIONS_COUNTER, 1);
171 if (useXCommand) {
172 queueCallable(new CoordActionCheckXCommand(caction.getId(), actionCheckDelay));
173 }
174 else {
175 queueCallable(new CoordActionCheckCommand(caction.getId(), actionCheckDelay));
176 }
177 }
178
179 }
180
181 /**
182 * Adds callables to a list. If the number of callables in the list
183 * reaches {@link ActionCheckerService#CONF_CALLABLE_BATCH_SIZE}, the
184 * entire batch is queued and the callables list is reset.
185 *
186 * @param callable the callable to queue.
187 */
188 private void queueCallable(XCallable<Void> callable) {
189 if (callables == null) {
190 callables = new ArrayList<XCallable<Void>>();
191 }
192 callables.add(callable);
193 if (callables.size() == Services.get().getConf().getInt(CONF_CALLABLE_BATCH_SIZE, 10)) {
194 boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables);
195 if (ret == false) {
196 XLog.getLog(getClass()).warn(
197 "Unable to queue the callables commands for CheckerService. "
198 + "Most possibly command queue is full. Queue size is :"
199 + Services.get().get(CallableQueueService.class).queueSize());
200 }
201 callables = new ArrayList<XCallable<Void>>();
202 }
203 }
204 }
205
206 /**
207 * Initializes the Action Check service.
208 *
209 * @param services services instance.
210 */
211 @Override
212 public void init(Services services) {
213 Configuration conf = services.getConf();
214 Runnable actionCheckRunnable = new ActionCheckRunnable(conf.getInt(CONF_ACTION_CHECK_DELAY, 600));
215 services.get(SchedulerService.class).schedule(actionCheckRunnable, 10,
216 conf.getInt(CONF_ACTION_CHECK_INTERVAL, 60), SchedulerService.Unit.SEC);
217
218 if (Services.get().getConf().getBoolean(USE_XCOMMAND, true) == false) {
219 useXCommand = false;
220 }
221
222 }
223
224 /**
225 * Destroy the Action Checker Services.
226 */
227 @Override
228 public void destroy() {
229 }
230
231 /**
232 * Return the public interface for the action checker service.
233 *
234 * @return {@link ActionCheckerService}.
235 */
236 @Override
237 public Class<? extends Service> getInterface() {
238 return ActionCheckerService.class;
239 }
240 }