001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.service;
020
021import java.util.concurrent.Callable;
022import java.util.concurrent.ScheduledExecutorService;
023import java.util.concurrent.ScheduledThreadPoolExecutor;
024import java.util.concurrent.TimeUnit;
025
026import org.apache.hadoop.conf.Configuration;
027import org.apache.oozie.client.OozieClient.SYSTEM_MODE;
028import org.apache.oozie.util.NamedThreadFactory;
029import org.apache.oozie.util.XLog;
030
031/**
032 * This service executes scheduled Runnables and Callables at regular intervals. <p> It uses a
033 * java.util.concurrent.ScheduledExecutorService. <p> The {@link #SCHEDULER_THREADS} configuration property indicates
034 * how many threads the scheduler will use to run scheduled commands.
035 */
036public class SchedulerService implements Service {
037
038    public static final String CONF_PREFIX = Service.CONF_PREFIX + "SchedulerService.";
039
040    public static final String SCHEDULER_THREADS = CONF_PREFIX + "threads";
041
042    private final XLog log = XLog.getLog(getClass());
043
044    private ScheduledExecutorService scheduler;
045
046    /**
047     * Initialize the scheduler service.
048     *
049     * @param services services instance.
050     */
051    @Override
052    public void init(Services services) {
053        scheduler = new ScheduledThreadPoolExecutor(getSchedulableThreads(services.getConf()), new NamedThreadFactory("Scheduler"));
054    }
055
056    /**
057     * Destroy the scheduler service.
058     */
059    @Override
060    public void destroy() {
061        try {
062            long limit = System.currentTimeMillis() + 30 * 1000;// 30 seconds
063            scheduler.shutdownNow();
064            while (!scheduler.awaitTermination(1000, TimeUnit.MILLISECONDS)) {
065                log.info("Waiting for scheduler to shutdown");
066                if (System.currentTimeMillis() > limit) {
067                    log.warn("Gave up, continuing without waiting for scheduler to shutdown");
068                    break;
069                }
070            }
071        }
072        catch (InterruptedException ex) {
073            log.warn(ex);
074        }
075    }
076
077    /**
078     * Return the public interface for scheduler service.
079     *
080     * @return {@link SchedulerService}.
081     */
082    @Override
083    public Class<? extends Service> getInterface() {
084        return SchedulerService.class;
085    }
086
087    /**
088     * Return the java.util.concurrent.ScheduledExecutorService instance used by the SchedulerService. <p>
089     *
090     * @return the scheduled executor service instance.
091     */
092    public ScheduledExecutorService getScheduler() {
093        return scheduler;
094    }
095
096    /**
097     * Return the number of threads configured with the Scheduler Service
098     * @param conf
099     * @return int num threads
100     */
101    public int getSchedulableThreads(Configuration conf) {
102        return ConfigurationService.getInt(conf, SCHEDULER_THREADS);
103    }
104
105    public enum Unit {
106        MILLISEC(1),
107        SEC(1000),
108        MIN(1000 * 60),
109        HOUR(1000 * 60 * 60);
110
111        private long millis;
112
113        private Unit(long millis) {
114            this.millis = millis;
115        }
116
117        private long getMillis() {
118            return millis;
119        }
120
121    }
122
123    /**
124     * Schedule a Callable for execution.
125     *
126     * @param callable callable to schedule for execution.
127     * @param delay delay for first execution since scheduling.
128     * @param interval interval between executions.
129     * @param unit scheduling unit.
130     */
131    public void schedule(final Callable<Void> callable, long delay, long interval, Unit unit) {
132        log.trace("Scheduling callable [{0}], interval [{1}] seconds, delay [{2}] in [{3}]",
133                  callable.getClass(), delay, interval, unit);
134        Runnable r = new Runnable() {
135            public void run() {
136                if (Services.get().getSystemMode() == SYSTEM_MODE.SAFEMODE) {
137                    log.trace("schedule[run/callable] System is in SAFEMODE. Therefore nothing will run");
138                    return;
139                }
140                try {
141                    callable.call();
142                }
143                catch (Exception ex) {
144                    log.warn("Error executing callable [{0}], {1}", callable.getClass().getSimpleName(),
145                             ex.getMessage(), ex);
146                }
147            }
148        };
149        if (!scheduler.isShutdown()) {
150            scheduler.scheduleWithFixedDelay(r, delay * unit.getMillis(), interval * unit.getMillis(),
151                                             TimeUnit.MILLISECONDS);
152        }
153        else {
154            log.warn("Scheduler shutting down, ignoring scheduling of [{0}]", callable.getClass());
155        }
156    }
157
158    /**
159     * Schedule a Runnable for execution.
160     *
161     * @param runnable Runnable to schedule for execution.
162     * @param delay delay for first execution since scheduling.
163     * @param interval interval between executions.
164     * @param unit scheduling unit.
165     */
166    public void schedule(final Runnable runnable, long delay, long interval, Unit unit) {
167        log.trace("Scheduling runnable [{0}], interval [{1}], delay [{2}] in [{3}]",
168                  runnable.getClass(), delay, interval, unit);
169        Runnable r = new Runnable() {
170            public void run() {
171                if (Services.get().getSystemMode() == SYSTEM_MODE.SAFEMODE) {
172                    log.trace("schedule[run/Runnable] System is in SAFEMODE. Therefore nothing will run");
173                    return;
174                }
175                try {
176                    runnable.run();
177                }
178                catch (Exception ex) {
179                    log.warn("Error executing runnable [{0}], {1}", runnable.getClass().getSimpleName(),
180                             ex.getMessage(), ex);
181                }
182            }
183        };
184        if (!scheduler.isShutdown()) {
185            scheduler.scheduleWithFixedDelay(r, delay * unit.getMillis(), interval * unit.getMillis(),
186                                                 TimeUnit.MILLISECONDS);
187        }
188        else {
189            log.warn("Scheduler shutting down, ignoring scheduling of [{0}]", runnable.getClass());
190        }
191    }
192
193    /**
194     * Schedule a Runnable for execution.
195     *
196     * @param runnable Runnable to schedule for execution.
197     * @param delay the time from now to delay execution.
198     * @param unit scheduling unit.
199     */
200    public void schedule(final Runnable runnable, long delay, Unit unit) {
201        log.trace("Scheduling runnable [{0}], delay [{1}] in [{2}]",
202                  runnable.getClass(), delay, unit);
203        Runnable r = new Runnable() {
204            public void run() {
205                if (Services.get().getSystemMode() == SYSTEM_MODE.SAFEMODE) {
206                    log.trace("schedule[run/Runnable] System is in SAFEMODE. Therefore nothing will run");
207                    return;
208                }
209                try {
210                    runnable.run();
211                }
212                catch (Exception ex) {
213                    log.warn("Error executing runnable [{0}], {1}", runnable.getClass().getSimpleName(),
214                             ex.getMessage(), ex);
215                }
216            }
217        };
218        if (!scheduler.isShutdown()) {
219            scheduler.schedule(r, delay * unit.getMillis(), TimeUnit.MILLISECONDS);
220        }
221        else {
222            log.warn("Scheduler shutting down, ignoring scheduling of [{0}]", runnable.getClass());
223        }
224    }
225
226}