001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.service;
019
020import org.apache.hadoop.conf.Configuration;
021import org.apache.oozie.client.OozieClient.SYSTEM_MODE;
022import org.apache.oozie.util.XLog;
023
024import java.util.concurrent.Callable;
025import java.util.concurrent.ScheduledExecutorService;
026import java.util.concurrent.ScheduledThreadPoolExecutor;
027import java.util.concurrent.TimeUnit;
028
029/**
030 * This service executes scheduled Runnables and Callables at regular intervals. <p/> It uses a
031 * java.util.concurrent.ScheduledExecutorService. <p/> The {@link #SCHEDULER_THREADS} configuration property indicates
032 * how many threads the scheduler will use to run scheduled commands.
033 */
034public class SchedulerService implements Service {
035
036    public static final String CONF_PREFIX = Service.CONF_PREFIX + "SchedulerService.";
037
038    public static final String SCHEDULER_THREADS = CONF_PREFIX + "threads";
039
040    private final XLog log = XLog.getLog(getClass());
041
042    private ScheduledExecutorService scheduler;
043
044    /**
045     * Initialize the scheduler service.
046     *
047     * @param services services instance.
048     */
049    @Override
050    public void init(Services services) {
051        scheduler = new ScheduledThreadPoolExecutor(getSchedulableThreads(services.getConf()));
052    }
053
054    /**
055     * Destroy the scheduler service.
056     */
057    @Override
058    public void destroy() {
059        try {
060            long limit = System.currentTimeMillis() + 30 * 1000;// 30 seconds
061            scheduler.shutdownNow();
062            while (!scheduler.awaitTermination(1000, TimeUnit.MILLISECONDS)) {
063                log.info("Waiting for scheduler to shutdown");
064                if (System.currentTimeMillis() > limit) {
065                    log.warn("Gave up, continuing without waiting for scheduler to shutdown");
066                    break;
067                }
068            }
069        }
070        catch (InterruptedException ex) {
071            log.warn(ex);
072        }
073    }
074
075    /**
076     * Return the public interface for scheduler service.
077     *
078     * @return {@link SchedulerService}.
079     */
080    @Override
081    public Class<? extends Service> getInterface() {
082        return SchedulerService.class;
083    }
084
085    /**
086     * Return the java.util.concurrent.ScheduledExecutorService instance used by the SchedulerService. <p/>
087     *
088     * @return the scheduled executor service instance.
089     */
090    public ScheduledExecutorService getScheduler() {
091        return scheduler;
092    }
093
094    /**
095     * Return the number of threads configured with the Scheduler Service
096     * @param conf
097     * @return int num threads
098     */
099    public int getSchedulableThreads(Configuration conf) {
100        return conf.getInt(SCHEDULER_THREADS, 10);
101    }
102
103    public enum Unit {
104        MILLISEC(1),
105        SEC(1000),
106        MIN(1000 * 60),
107        HOUR(1000 * 60 * 60);
108
109        private long millis;
110
111        private Unit(long millis) {
112            this.millis = millis;
113        }
114
115        private long getMillis() {
116            return millis;
117        }
118
119    }
120
121    /**
122     * Schedule a Callable for execution.
123     *
124     * @param callable callable to schedule for execution.
125     * @param delay delay for first execution since scheduling.
126     * @param interval interval between executions.
127     * @param unit scheduling unit.
128     */
129    public void schedule(final Callable<Void> callable, long delay, long interval, Unit unit) {
130        log.trace("Scheduling callable [{0}], interval [{1}] seconds, delay [{2}] in [{3}]",
131                  callable.getClass(), delay, interval, unit);
132        Runnable r = new Runnable() {
133            public void run() {
134                if (Services.get().getSystemMode() == SYSTEM_MODE.SAFEMODE) {
135                    log.trace("schedule[run/callable] System is in SAFEMODE. Therefore nothing will run");
136                    return;
137                }
138                try {
139                    callable.call();
140                }
141                catch (Exception ex) {
142                    log.warn("Error executing callable [{0}], {1}", callable.getClass().getSimpleName(),
143                             ex.getMessage(), ex);
144                }
145            }
146        };
147        if (!scheduler.isShutdown()) {
148            scheduler.scheduleWithFixedDelay(r, delay * unit.getMillis(), interval * unit.getMillis(),
149                                             TimeUnit.MILLISECONDS);
150        }
151        else {
152            log.warn("Scheduler shutting down, ignoring scheduling of [{0}]", callable.getClass());
153        }
154    }
155
156    /**
157     * Schedule a Runnable for execution.
158     *
159     * @param runnable Runnable to schedule for execution.
160     * @param delay delay for first execution since scheduling.
161     * @param interval interval between executions.
162     * @param unit scheduling unit.
163     */
164    public void schedule(final Runnable runnable, long delay, long interval, Unit unit) {
165        log.trace("Scheduling runnable [{0}], interval [{1}], delay [{2}] in [{3}]",
166                  runnable.getClass(), delay, interval, unit);
167        Runnable r = new Runnable() {
168            public void run() {
169                if (Services.get().getSystemMode() == SYSTEM_MODE.SAFEMODE) {
170                    log.trace("schedule[run/Runnable] System is in SAFEMODE. Therefore nothing will run");
171                    return;
172                }
173                try {
174                    runnable.run();
175                }
176                catch (Exception ex) {
177                    log.warn("Error executing runnable [{0}], {1}", runnable.getClass().getSimpleName(),
178                             ex.getMessage(), ex);
179                }
180            }
181        };
182        if (!scheduler.isShutdown()) {
183            scheduler.scheduleWithFixedDelay(r, delay * unit.getMillis(), interval * unit.getMillis(),
184                                                 TimeUnit.MILLISECONDS);
185        }
186        else {
187            log.warn("Scheduler shutting down, ignoring scheduling of [{0}]", runnable.getClass());
188        }
189    }
190
191    /**
192     * Schedule a Runnable for execution.
193     *
194     * @param runnable Runnable to schedule for execution.
195     * @param delay the time from now to delay execution.
196     * @param unit scheduling unit.
197     */
198    public void schedule(final Runnable runnable, long delay, Unit unit) {
199        log.trace("Scheduling runnable [{0}], delay [{1}] in [{2}]",
200                  runnable.getClass(), delay, unit);
201        Runnable r = new Runnable() {
202            public void run() {
203                if (Services.get().getSystemMode() == SYSTEM_MODE.SAFEMODE) {
204                    log.trace("schedule[run/Runnable] System is in SAFEMODE. Therefore nothing will run");
205                    return;
206                }
207                try {
208                    runnable.run();
209                }
210                catch (Exception ex) {
211                    log.warn("Error executing runnable [{0}], {1}", runnable.getClass().getSimpleName(),
212                             ex.getMessage(), ex);
213                }
214            }
215        };
216        if (!scheduler.isShutdown()) {
217            scheduler.schedule(r, delay * unit.getMillis(), TimeUnit.MILLISECONDS);
218        }
219        else {
220            log.warn("Scheduler shutting down, ignoring scheduling of [{0}]", runnable.getClass());
221        }
222    }
223
224}