This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.service;
019
020 import java.util.ArrayList;
021 import java.util.List;
022
023 import org.apache.hadoop.conf.Configuration;
024 import org.apache.oozie.CoordinatorActionBean;
025 import org.apache.oozie.ErrorCode;
026 import org.apache.oozie.WorkflowActionBean;
027 import org.apache.oozie.command.CommandException;
028 import org.apache.oozie.command.coord.CoordActionCheckXCommand;
029 import org.apache.oozie.command.wf.ActionCheckXCommand;
030 import org.apache.oozie.executor.jpa.CoordActionsRunningGetJPAExecutor;
031 import org.apache.oozie.executor.jpa.JPAExecutorException;
032 import org.apache.oozie.executor.jpa.WorkflowActionsRunningGetJPAExecutor;
033 import org.apache.oozie.util.XCallable;
034 import org.apache.oozie.util.XLog;
035
036 /**
037 * The Action Checker Service queue ActionCheckCommands to check the status of
038 * running actions and CoordActionCheckCommands to check the status of
039 * coordinator actions. The delay between checks on the same action can be
040 * configured.
041 */
042 public class ActionCheckerService implements Service {
043
044 public static final String CONF_PREFIX = Service.CONF_PREFIX + "ActionCheckerService.";
045 /**
046 * The frequency at which the ActionCheckService will run.
047 */
048 public static final String CONF_ACTION_CHECK_INTERVAL = CONF_PREFIX + "action.check.interval";
049 /**
050 * The time, in seconds, between an ActionCheck for the same action.
051 */
052 public static final String CONF_ACTION_CHECK_DELAY = CONF_PREFIX + "action.check.delay";
053
054 /**
055 * The number of callables to be queued in a batch.
056 */
057 public static final String CONF_CALLABLE_BATCH_SIZE = CONF_PREFIX + "callable.batch.size";
058
059 protected static final String INSTRUMENTATION_GROUP = "actionchecker";
060 protected static final String INSTR_CHECK_ACTIONS_COUNTER = "checks_wf_actions";
061 protected static final String INSTR_CHECK_COORD_ACTIONS_COUNTER = "checks_coord_actions";
062
063
064 /**
065 * {@link ActionCheckRunnable} is the runnable which is scheduled to run and
066 * queue Action checks.
067 */
068 static class ActionCheckRunnable implements Runnable {
069 private int actionCheckDelay;
070 private List<XCallable<Void>> callables;
071 private StringBuilder msg = null;
072
073 public ActionCheckRunnable(int actionCheckDelay) {
074 this.actionCheckDelay = actionCheckDelay;
075 }
076
077 public void run() {
078 XLog.Info.get().clear();
079 XLog LOG = XLog.getLog(getClass());
080 msg = new StringBuilder();
081 try {
082 runWFActionCheck();
083 runCoordActionCheck();
084 }
085 catch (CommandException ce) {
086 LOG.error("Unable to run action checks, ", ce);
087 }
088
089 LOG.debug("QUEUING [{0}] for potential checking", msg.toString());
090 if (null != callables) {
091 boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables);
092 if (ret == false) {
093 LOG.warn("Unable to queue the callables commands for CheckerService. "
094 + "Most possibly command queue is full. Queue size is :"
095 + Services.get().get(CallableQueueService.class).queueSize());
096 }
097 callables = null;
098 }
099 }
100
101 /**
102 * check workflow actions
103 *
104 * @throws CommandException
105 */
106 private void runWFActionCheck() throws CommandException {
107 JPAService jpaService = Services.get().get(JPAService.class);
108 if (jpaService == null) {
109 throw new CommandException(ErrorCode.E0610);
110 }
111
112 List<WorkflowActionBean> actions;
113 try {
114 actions = jpaService
115 .execute(new WorkflowActionsRunningGetJPAExecutor(actionCheckDelay));
116 }
117 catch (JPAExecutorException je) {
118 throw new CommandException(je);
119 }
120
121 if (actions == null || actions.size() == 0) {
122 return;
123 }
124 msg.append(" WF_ACTIONS : " + actions.size());
125
126 for (WorkflowActionBean action : actions) {
127 Services.get().get(InstrumentationService.class).get().incr(INSTRUMENTATION_GROUP,
128 INSTR_CHECK_ACTIONS_COUNTER, 1);
129 queueCallable(new ActionCheckXCommand(action.getId()));
130 }
131
132 }
133
134 /**
135 * check coordinator actions
136 *
137 * @throws CommandException
138 */
139 private void runCoordActionCheck() throws CommandException {
140 JPAService jpaService = Services.get().get(JPAService.class);
141 if (jpaService == null) {
142 throw new CommandException(ErrorCode.E0610);
143 }
144
145 List<String> cactionIds;
146 try {
147 cactionIds = jpaService.execute(new CoordActionsRunningGetJPAExecutor(
148 actionCheckDelay));
149 }
150 catch (JPAExecutorException je) {
151 throw new CommandException(je);
152 }
153
154 if (cactionIds == null || cactionIds.size() == 0) {
155 return;
156 }
157
158 msg.append(" COORD_ACTIONS : " + cactionIds.size());
159 for (String coordActionId : cactionIds) {
160 Services.get().get(InstrumentationService.class).get().incr(INSTRUMENTATION_GROUP,
161 INSTR_CHECK_COORD_ACTIONS_COUNTER, 1);
162 queueCallable(new CoordActionCheckXCommand(coordActionId, actionCheckDelay));
163 }
164
165 }
166
167 /**
168 * Adds callables to a list. If the number of callables in the list
169 * reaches {@link ActionCheckerService#CONF_CALLABLE_BATCH_SIZE}, the
170 * entire batch is queued and the callables list is reset.
171 *
172 * @param callable the callable to queue.
173 */
174 private void queueCallable(XCallable<Void> callable) {
175 if (callables == null) {
176 callables = new ArrayList<XCallable<Void>>();
177 }
178 callables.add(callable);
179 if (callables.size() == Services.get().getConf().getInt(CONF_CALLABLE_BATCH_SIZE, 10)) {
180 boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables);
181 if (ret == false) {
182 XLog.getLog(getClass()).warn(
183 "Unable to queue the callables commands for CheckerService. "
184 + "Most possibly command queue is full. Queue size is :"
185 + Services.get().get(CallableQueueService.class).queueSize());
186 }
187 callables = new ArrayList<XCallable<Void>>();
188 }
189 }
190 }
191
192 /**
193 * Initializes the Action Check service.
194 *
195 * @param services services instance.
196 */
197 @Override
198 public void init(Services services) {
199 Configuration conf = services.getConf();
200 Runnable actionCheckRunnable = new ActionCheckRunnable(conf.getInt(CONF_ACTION_CHECK_DELAY, 600));
201 services.get(SchedulerService.class).schedule(actionCheckRunnable, 10,
202 conf.getInt(CONF_ACTION_CHECK_INTERVAL, 60), SchedulerService.Unit.SEC);
203 }
204
205 /**
206 * Destroy the Action Checker Services.
207 */
208 @Override
209 public void destroy() {
210 }
211
212 /**
213 * Return the public interface for the action checker service.
214 *
215 * @return {@link ActionCheckerService}.
216 */
217 @Override
218 public Class<? extends Service> getInterface() {
219 return ActionCheckerService.class;
220 }
221 }