This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.service;
019
020 import org.apache.hadoop.conf.Configuration;
021 import org.apache.oozie.client.OozieClient.SYSTEM_MODE;
022 import org.apache.oozie.util.XLog;
023
024 import java.util.concurrent.Callable;
025 import java.util.concurrent.ScheduledExecutorService;
026 import java.util.concurrent.ScheduledThreadPoolExecutor;
027 import java.util.concurrent.TimeUnit;
028
029 /**
030 * This service executes scheduled Runnables and Callables at regular intervals. <p/> It uses a
031 * java.util.concurrent.ScheduledExecutorService. <p/> The {@link #SCHEDULER_THREADS} configuration property indicates
032 * how many threads the scheduler will use to run scheduled commands.
033 */
034 public class SchedulerService implements Service {
035
036 public static final String CONF_PREFIX = Service.CONF_PREFIX + "SchedulerService.";
037
038 public static final String SCHEDULER_THREADS = CONF_PREFIX + "threads";
039
040 private final XLog log = XLog.getLog(getClass());
041
042 private ScheduledExecutorService scheduler;
043
044 /**
045 * Initialize the scheduler service.
046 *
047 * @param services services instance.
048 */
049 @Override
050 public void init(Services services) {
051 scheduler = new ScheduledThreadPoolExecutor(getSchedulableThreads(services.getConf()));
052 }
053
054 /**
055 * Destroy the scheduler service.
056 */
057 @Override
058 public void destroy() {
059 try {
060 long limit = System.currentTimeMillis() + 30 * 1000;// 30 seconds
061 scheduler.shutdownNow();
062 while (!scheduler.awaitTermination(1000, TimeUnit.MILLISECONDS)) {
063 log.info("Waiting for scheduler to shutdown");
064 if (System.currentTimeMillis() > limit) {
065 log.warn("Gave up, continuing without waiting for scheduler to shutdown");
066 break;
067 }
068 }
069 }
070 catch (InterruptedException ex) {
071 log.warn(ex);
072 }
073 }
074
075 /**
076 * Return the public interface for scheduler service.
077 *
078 * @return {@link SchedulerService}.
079 */
080 @Override
081 public Class<? extends Service> getInterface() {
082 return SchedulerService.class;
083 }
084
085 /**
086 * Return the java.util.concurrent.ScheduledExecutorService instance used by the SchedulerService. <p/>
087 *
088 * @return the scheduled executor service instance.
089 */
090 public ScheduledExecutorService getScheduler() {
091 return scheduler;
092 }
093
094 /**
095 * Return the number of threads configured with the Scheduler Service
096 * @param conf
097 * @return int num threads
098 */
099 public int getSchedulableThreads(Configuration conf) {
100 return conf.getInt(SCHEDULER_THREADS, 10);
101 }
102
103 public enum Unit {
104 MILLISEC(1),
105 SEC(1000),
106 MIN(1000 * 60),
107 HOUR(1000 * 60 * 60);
108
109 private long millis;
110
111 private Unit(long millis) {
112 this.millis = millis;
113 }
114
115 private long getMillis() {
116 return millis;
117 }
118
119 }
120
121 /**
122 * Schedule a Callable for execution.
123 *
124 * @param callable callable to schedule for execution.
125 * @param delay delay for first execution since scheduling.
126 * @param interval interval between executions.
127 * @param unit scheduling unit.
128 */
129 public void schedule(final Callable<Void> callable, long delay, long interval, Unit unit) {
130 log.trace("Scheduling callable [{0}], interval [{1}] seconds, delay [{2}] in [{3}]",
131 callable.getClass(), delay, interval, unit);
132 Runnable r = new Runnable() {
133 public void run() {
134 if (Services.get().getSystemMode() == SYSTEM_MODE.SAFEMODE) {
135 log.trace("schedule[run/callable] System is in SAFEMODE. Therefore nothing will run");
136 return;
137 }
138 try {
139 callable.call();
140 }
141 catch (Exception ex) {
142 log.warn("Error executing callable [{0}], {1}", callable.getClass().getSimpleName(),
143 ex.getMessage(), ex);
144 }
145 }
146 };
147 if (!scheduler.isShutdown()) {
148 scheduler.scheduleWithFixedDelay(r, delay * unit.getMillis(), interval * unit.getMillis(),
149 TimeUnit.MILLISECONDS);
150 }
151 else {
152 log.warn("Scheduler shutting down, ignoring scheduling of [{0}]", callable.getClass());
153 }
154 }
155
156 /**
157 * Schedule a Runnable for execution.
158 *
159 * @param runnable Runnable to schedule for execution.
160 * @param delay delay for first execution since scheduling.
161 * @param interval interval between executions.
162 * @param unit scheduling unit.
163 */
164 public void schedule(final Runnable runnable, long delay, long interval, Unit unit) {
165 log.trace("Scheduling runnable [{0}], interval [{1}], delay [{2}] in [{3}]",
166 runnable.getClass(), delay, interval, unit);
167 Runnable r = new Runnable() {
168 public void run() {
169 if (Services.get().getSystemMode() == SYSTEM_MODE.SAFEMODE) {
170 log.trace("schedule[run/Runnable] System is in SAFEMODE. Therefore nothing will run");
171 return;
172 }
173 try {
174 runnable.run();
175 }
176 catch (Exception ex) {
177 log.warn("Error executing runnable [{0}], {1}", runnable.getClass().getSimpleName(),
178 ex.getMessage(), ex);
179 }
180 }
181 };
182 if (!scheduler.isShutdown()) {
183 scheduler.scheduleWithFixedDelay(r, delay * unit.getMillis(), interval * unit.getMillis(),
184 TimeUnit.MILLISECONDS);
185 }
186 else {
187 log.warn("Scheduler shutting down, ignoring scheduling of [{0}]", runnable.getClass());
188 }
189 }
190
191 /**
192 * Schedule a Runnable for execution.
193 *
194 * @param runnable Runnable to schedule for execution.
195 * @param delay the time from now to delay execution.
196 * @param unit scheduling unit.
197 */
198 public void schedule(final Runnable runnable, long delay, Unit unit) {
199 log.trace("Scheduling runnable [{0}], delay [{1}] in [{2}]",
200 runnable.getClass(), delay, unit);
201 Runnable r = new Runnable() {
202 public void run() {
203 if (Services.get().getSystemMode() == SYSTEM_MODE.SAFEMODE) {
204 log.trace("schedule[run/Runnable] System is in SAFEMODE. Therefore nothing will run");
205 return;
206 }
207 try {
208 runnable.run();
209 }
210 catch (Exception ex) {
211 log.warn("Error executing runnable [{0}], {1}", runnable.getClass().getSimpleName(),
212 ex.getMessage(), ex);
213 }
214 }
215 };
216 if (!scheduler.isShutdown()) {
217 scheduler.schedule(r, delay * unit.getMillis(), TimeUnit.MILLISECONDS);
218 }
219 else {
220 log.warn("Scheduler shutting down, ignoring scheduling of [{0}]", runnable.getClass());
221 }
222 }
223
224 }