This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.service;
020
021import org.apache.hadoop.conf.Configuration;
022import org.apache.oozie.client.OozieClient.SYSTEM_MODE;
023import org.apache.oozie.util.XLog;
024
025import java.util.concurrent.Callable;
026import java.util.concurrent.ScheduledExecutorService;
027import java.util.concurrent.ScheduledThreadPoolExecutor;
028import java.util.concurrent.TimeUnit;
029
030/**
031 * This service executes scheduled Runnables and Callables at regular intervals. <p/> It uses a
032 * java.util.concurrent.ScheduledExecutorService. <p/> The {@link #SCHEDULER_THREADS} configuration property indicates
033 * how many threads the scheduler will use to run scheduled commands.
034 */
035public class SchedulerService implements Service {
036
037    public static final String CONF_PREFIX = Service.CONF_PREFIX + "SchedulerService.";
038
039    public static final String SCHEDULER_THREADS = CONF_PREFIX + "threads";
040
041    private final XLog log = XLog.getLog(getClass());
042
043    private ScheduledExecutorService scheduler;
044
045    /**
046     * Initialize the scheduler service.
047     *
048     * @param services services instance.
049     */
050    @Override
051    public void init(Services services) {
052        scheduler = new ScheduledThreadPoolExecutor(getSchedulableThreads(services.getConf()));
053    }
054
055    /**
056     * Destroy the scheduler service.
057     */
058    @Override
059    public void destroy() {
060        try {
061            long limit = System.currentTimeMillis() + 30 * 1000;// 30 seconds
062            scheduler.shutdownNow();
063            while (!scheduler.awaitTermination(1000, TimeUnit.MILLISECONDS)) {
064                log.info("Waiting for scheduler to shutdown");
065                if (System.currentTimeMillis() > limit) {
066                    log.warn("Gave up, continuing without waiting for scheduler to shutdown");
067                    break;
068                }
069            }
070        }
071        catch (InterruptedException ex) {
072            log.warn(ex);
073        }
074    }
075
076    /**
077     * Return the public interface for scheduler service.
078     *
079     * @return {@link SchedulerService}.
080     */
081    @Override
082    public Class<? extends Service> getInterface() {
083        return SchedulerService.class;
084    }
085
086    /**
087     * Return the java.util.concurrent.ScheduledExecutorService instance used by the SchedulerService. <p/>
088     *
089     * @return the scheduled executor service instance.
090     */
091    public ScheduledExecutorService getScheduler() {
092        return scheduler;
093    }
094
095    /**
096     * Return the number of threads configured with the Scheduler Service
097     * @param conf
098     * @return int num threads
099     */
100    public int getSchedulableThreads(Configuration conf) {
101        return ConfigurationService.getInt(conf, SCHEDULER_THREADS);
102    }
103
104    public enum Unit {
105        MILLISEC(1),
106        SEC(1000),
107        MIN(1000 * 60),
108        HOUR(1000 * 60 * 60);
109
110        private long millis;
111
112        private Unit(long millis) {
113            this.millis = millis;
114        }
115
116        private long getMillis() {
117            return millis;
118        }
119
120    }
121
122    /**
123     * Schedule a Callable for execution.
124     *
125     * @param callable callable to schedule for execution.
126     * @param delay delay for first execution since scheduling.
127     * @param interval interval between executions.
128     * @param unit scheduling unit.
129     */
130    public void schedule(final Callable<Void> callable, long delay, long interval, Unit unit) {
131        log.trace("Scheduling callable [{0}], interval [{1}] seconds, delay [{2}] in [{3}]",
132                  callable.getClass(), delay, interval, unit);
133        Runnable r = new Runnable() {
134            public void run() {
135                if (Services.get().getSystemMode() == SYSTEM_MODE.SAFEMODE) {
136                    log.trace("schedule[run/callable] System is in SAFEMODE. Therefore nothing will run");
137                    return;
138                }
139                try {
140                    callable.call();
141                }
142                catch (Exception ex) {
143                    log.warn("Error executing callable [{0}], {1}", callable.getClass().getSimpleName(),
144                             ex.getMessage(), ex);
145                }
146            }
147        };
148        if (!scheduler.isShutdown()) {
149            scheduler.scheduleWithFixedDelay(r, delay * unit.getMillis(), interval * unit.getMillis(),
150                                             TimeUnit.MILLISECONDS);
151        }
152        else {
153            log.warn("Scheduler shutting down, ignoring scheduling of [{0}]", callable.getClass());
154        }
155    }
156
157    /**
158     * Schedule a Runnable for execution.
159     *
160     * @param runnable Runnable to schedule for execution.
161     * @param delay delay for first execution since scheduling.
162     * @param interval interval between executions.
163     * @param unit scheduling unit.
164     */
165    public void schedule(final Runnable runnable, long delay, long interval, Unit unit) {
166        log.trace("Scheduling runnable [{0}], interval [{1}], delay [{2}] in [{3}]",
167                  runnable.getClass(), delay, interval, unit);
168        Runnable r = new Runnable() {
169            public void run() {
170                if (Services.get().getSystemMode() == SYSTEM_MODE.SAFEMODE) {
171                    log.trace("schedule[run/Runnable] System is in SAFEMODE. Therefore nothing will run");
172                    return;
173                }
174                try {
175                    runnable.run();
176                }
177                catch (Exception ex) {
178                    log.warn("Error executing runnable [{0}], {1}", runnable.getClass().getSimpleName(),
179                             ex.getMessage(), ex);
180                }
181            }
182        };
183        if (!scheduler.isShutdown()) {
184            scheduler.scheduleWithFixedDelay(r, delay * unit.getMillis(), interval * unit.getMillis(),
185                                                 TimeUnit.MILLISECONDS);
186        }
187        else {
188            log.warn("Scheduler shutting down, ignoring scheduling of [{0}]", runnable.getClass());
189        }
190    }
191
192    /**
193     * Schedule a Runnable for execution.
194     *
195     * @param runnable Runnable to schedule for execution.
196     * @param delay the time from now to delay execution.
197     * @param unit scheduling unit.
198     */
199    public void schedule(final Runnable runnable, long delay, Unit unit) {
200        log.trace("Scheduling runnable [{0}], delay [{1}] in [{2}]",
201                  runnable.getClass(), delay, unit);
202        Runnable r = new Runnable() {
203            public void run() {
204                if (Services.get().getSystemMode() == SYSTEM_MODE.SAFEMODE) {
205                    log.trace("schedule[run/Runnable] System is in SAFEMODE. Therefore nothing will run");
206                    return;
207                }
208                try {
209                    runnable.run();
210                }
211                catch (Exception ex) {
212                    log.warn("Error executing runnable [{0}], {1}", runnable.getClass().getSimpleName(),
213                             ex.getMessage(), ex);
214                }
215            }
216        };
217        if (!scheduler.isShutdown()) {
218            scheduler.schedule(r, delay * unit.getMillis(), TimeUnit.MILLISECONDS);
219        }
220        else {
221            log.warn("Scheduler shutting down, ignoring scheduling of [{0}]", runnable.getClass());
222        }
223    }
224
225}