This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.service;
019
020 import org.apache.oozie.client.OozieClient.SYSTEM_MODE;
021 import org.apache.oozie.util.XLog;
022
023 import java.util.concurrent.Callable;
024 import java.util.concurrent.ScheduledExecutorService;
025 import java.util.concurrent.ScheduledThreadPoolExecutor;
026 import java.util.concurrent.TimeUnit;
027
028 /**
029 * This service executes scheduled Runnables and Callables at regular intervals. <p/> It uses a
030 * java.util.concurrent.ScheduledExecutorService. <p/> The {@link #SCHEDULER_THREADS} configuration property indicates
031 * how many threads the scheduler will use to run scheduled commands.
032 */
033 public class SchedulerService implements Service {
034
035 public static final String CONF_PREFIX = Service.CONF_PREFIX + "SchedulerService.";
036
037 public static final String SCHEDULER_THREADS = CONF_PREFIX + "threads";
038
039 private final XLog log = XLog.getLog(getClass());
040
041 private ScheduledExecutorService scheduler;
042
043 /**
044 * Initialize the scheduler service.
045 *
046 * @param services services instance.
047 */
048 @Override
049 public void init(Services services) {
050 scheduler = new ScheduledThreadPoolExecutor(services.getConf().getInt(SCHEDULER_THREADS, 5));
051 }
052
053 /**
054 * Destroy the scheduler service.
055 */
056 @Override
057 public void destroy() {
058 try {
059 long limit = System.currentTimeMillis() + 30 * 1000;// 30 seconds
060 scheduler.shutdownNow();
061 while (!scheduler.awaitTermination(1000, TimeUnit.MILLISECONDS)) {
062 log.info("Waiting for scheduler to shutdown");
063 if (System.currentTimeMillis() > limit) {
064 log.warn("Gave up, continuing without waiting for scheduler to shutdown");
065 break;
066 }
067 }
068 }
069 catch (InterruptedException ex) {
070 log.warn(ex);
071 }
072 }
073
074 /**
075 * Return the public interface for scheduler service.
076 *
077 * @return {@link SchedulerService}.
078 */
079 @Override
080 public Class<? extends Service> getInterface() {
081 return SchedulerService.class;
082 }
083
084 /**
085 * Return the java.util.concurrent.ScheduledExecutorService instance used by the SchedulerService. <p/>
086 *
087 * @return the scheduled executor service instance.
088 */
089 public ScheduledExecutorService getScheduler() {
090 return scheduler;
091 }
092
093 public enum Unit {
094 MILLISEC(1),
095 SEC(1000),
096 MIN(1000 * 60),
097 HOUR(1000 * 60 * 60);
098
099 private long millis;
100
101 private Unit(long millis) {
102 this.millis = millis;
103 }
104
105 private long getMillis() {
106 return millis;
107 }
108
109 }
110
111 /**
112 * Schedule a Callable for execution.
113 *
114 * @param callable callable to schedule for execution.
115 * @param delay delay for first execution since scheduling.
116 * @param interval interval between executions.
117 * @param unit scheduling unit.
118 */
119 public void schedule(final Callable<Void> callable, long delay, long interval, Unit unit) {
120 log.trace("Scheduling callable [{0}], interval [{1}] seconds, delay [{2}] in [{3}]",
121 callable.getClass(), delay, interval, unit);
122 Runnable r = new Runnable() {
123 public void run() {
124 if (Services.get().getSystemMode() == SYSTEM_MODE.SAFEMODE) {
125 log.trace("schedule[run/callable] System is in SAFEMODE. Therefore nothing will run");
126 return;
127 }
128 try {
129 callable.call();
130 }
131 catch (Exception ex) {
132 log.warn("Error executing callable [{0}], {1}", callable.getClass().getSimpleName(),
133 ex.getMessage(), ex);
134 }
135 }
136 };
137 if (!scheduler.isShutdown()) {
138 scheduler.scheduleWithFixedDelay(r, delay * unit.getMillis(), interval * unit.getMillis(),
139 TimeUnit.MILLISECONDS);
140 }
141 else {
142 log.warn("Scheduler shutting down, ignoring scheduling of [{0}]", callable.getClass());
143 }
144 }
145
146 /**
147 * Schedule a Runnable for execution.
148 *
149 * @param runnable Runnable to schedule for execution.
150 * @param delay delay for first execution since scheduling.
151 * @param interval interval between executions.
152 * @param unit scheduling unit.
153 */
154 public void schedule(final Runnable runnable, long delay, long interval, Unit unit) {
155 log.trace("Scheduling runnable [{0}], interval [{1}], delay [{2}] in [{3}]",
156 runnable.getClass(), delay, interval, unit);
157 Runnable r = new Runnable() {
158 public void run() {
159 if (Services.get().getSystemMode() == SYSTEM_MODE.SAFEMODE) {
160 log.trace("schedule[run/Runnable] System is in SAFEMODE. Therefore nothing will run");
161 return;
162 }
163 try {
164 runnable.run();
165 }
166 catch (Exception ex) {
167 log.warn("Error executing runnable [{0}], {1}", runnable.getClass().getSimpleName(),
168 ex.getMessage(), ex);
169 }
170 }
171 };
172 if (!scheduler.isShutdown()) {
173 scheduler.scheduleWithFixedDelay(r, delay * unit.getMillis(), interval * unit.getMillis(),
174 TimeUnit.MILLISECONDS);
175 }
176 else {
177 log.warn("Scheduler shutting down, ignoring scheduling of [{0}]", runnable.getClass());
178 }
179 }
180
181 }