001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.service;
019    
020    import org.apache.hadoop.conf.Configuration;
021    import org.apache.oozie.client.OozieClient.SYSTEM_MODE;
022    import org.apache.oozie.util.XLog;
023    
024    import java.util.concurrent.Callable;
025    import java.util.concurrent.ScheduledExecutorService;
026    import java.util.concurrent.ScheduledThreadPoolExecutor;
027    import java.util.concurrent.TimeUnit;
028    
029    /**
030     * This service executes scheduled Runnables and Callables at regular intervals. <p/> It uses a
031     * java.util.concurrent.ScheduledExecutorService. <p/> The {@link #SCHEDULER_THREADS} configuration property indicates
032     * how many threads the scheduler will use to run scheduled commands.
033     */
034    public class SchedulerService implements Service {
035    
036        public static final String CONF_PREFIX = Service.CONF_PREFIX + "SchedulerService.";
037    
038        public static final String SCHEDULER_THREADS = CONF_PREFIX + "threads";
039    
040        private final XLog log = XLog.getLog(getClass());
041    
042        private ScheduledExecutorService scheduler;
043    
044        /**
045         * Initialize the scheduler service.
046         *
047         * @param services services instance.
048         */
049        @Override
050        public void init(Services services) {
051            scheduler = new ScheduledThreadPoolExecutor(getSchedulableThreads(services.getConf()));
052        }
053    
054        /**
055         * Destroy the scheduler service.
056         */
057        @Override
058        public void destroy() {
059            try {
060                long limit = System.currentTimeMillis() + 30 * 1000;// 30 seconds
061                scheduler.shutdownNow();
062                while (!scheduler.awaitTermination(1000, TimeUnit.MILLISECONDS)) {
063                    log.info("Waiting for scheduler to shutdown");
064                    if (System.currentTimeMillis() > limit) {
065                        log.warn("Gave up, continuing without waiting for scheduler to shutdown");
066                        break;
067                    }
068                }
069            }
070            catch (InterruptedException ex) {
071                log.warn(ex);
072            }
073        }
074    
075        /**
076         * Return the public interface for scheduler service.
077         *
078         * @return {@link SchedulerService}.
079         */
080        @Override
081        public Class<? extends Service> getInterface() {
082            return SchedulerService.class;
083        }
084    
085        /**
086         * Return the java.util.concurrent.ScheduledExecutorService instance used by the SchedulerService. <p/>
087         *
088         * @return the scheduled executor service instance.
089         */
090        public ScheduledExecutorService getScheduler() {
091            return scheduler;
092        }
093    
094        /**
095         * Return the number of threads configured with the Scheduler Service
096         * @param conf
097         * @return int num threads
098         */
099        public int getSchedulableThreads(Configuration conf) {
100            return conf.getInt(SCHEDULER_THREADS, 10);
101        }
102    
103        public enum Unit {
104            MILLISEC(1),
105            SEC(1000),
106            MIN(1000 * 60),
107            HOUR(1000 * 60 * 60);
108    
109            private long millis;
110    
111            private Unit(long millis) {
112                this.millis = millis;
113            }
114    
115            private long getMillis() {
116                return millis;
117            }
118    
119        }
120    
121        /**
122         * Schedule a Callable for execution.
123         *
124         * @param callable callable to schedule for execution.
125         * @param delay delay for first execution since scheduling.
126         * @param interval interval between executions.
127         * @param unit scheduling unit.
128         */
129        public void schedule(final Callable<Void> callable, long delay, long interval, Unit unit) {
130            log.trace("Scheduling callable [{0}], interval [{1}] seconds, delay [{2}] in [{3}]",
131                      callable.getClass(), delay, interval, unit);
132            Runnable r = new Runnable() {
133                public void run() {
134                    if (Services.get().getSystemMode() == SYSTEM_MODE.SAFEMODE) {
135                        log.trace("schedule[run/callable] System is in SAFEMODE. Therefore nothing will run");
136                        return;
137                    }
138                    try {
139                        callable.call();
140                    }
141                    catch (Exception ex) {
142                        log.warn("Error executing callable [{0}], {1}", callable.getClass().getSimpleName(),
143                                 ex.getMessage(), ex);
144                    }
145                }
146            };
147            if (!scheduler.isShutdown()) {
148                scheduler.scheduleWithFixedDelay(r, delay * unit.getMillis(), interval * unit.getMillis(),
149                                                 TimeUnit.MILLISECONDS);
150            }
151            else {
152                log.warn("Scheduler shutting down, ignoring scheduling of [{0}]", callable.getClass());
153            }
154        }
155    
156        /**
157         * Schedule a Runnable for execution.
158         *
159         * @param runnable Runnable to schedule for execution.
160         * @param delay delay for first execution since scheduling.
161         * @param interval interval between executions.
162         * @param unit scheduling unit.
163         */
164        public void schedule(final Runnable runnable, long delay, long interval, Unit unit) {
165            log.trace("Scheduling runnable [{0}], interval [{1}], delay [{2}] in [{3}]",
166                      runnable.getClass(), delay, interval, unit);
167            Runnable r = new Runnable() {
168                public void run() {
169                    if (Services.get().getSystemMode() == SYSTEM_MODE.SAFEMODE) {
170                        log.trace("schedule[run/Runnable] System is in SAFEMODE. Therefore nothing will run");
171                        return;
172                    }
173                    try {
174                        runnable.run();
175                    }
176                    catch (Exception ex) {
177                        log.warn("Error executing runnable [{0}], {1}", runnable.getClass().getSimpleName(),
178                                 ex.getMessage(), ex);
179                    }
180                }
181            };
182            if (!scheduler.isShutdown()) {
183                scheduler.scheduleWithFixedDelay(r, delay * unit.getMillis(), interval * unit.getMillis(),
184                                                     TimeUnit.MILLISECONDS);
185            }
186            else {
187                log.warn("Scheduler shutting down, ignoring scheduling of [{0}]", runnable.getClass());
188            }
189        }
190    
191        /**
192         * Schedule a Runnable for execution.
193         *
194         * @param runnable Runnable to schedule for execution.
195         * @param delay the time from now to delay execution.
196         * @param unit scheduling unit.
197         */
198        public void schedule(final Runnable runnable, long delay, Unit unit) {
199            log.trace("Scheduling runnable [{0}], delay [{1}] in [{2}]",
200                      runnable.getClass(), delay, unit);
201            Runnable r = new Runnable() {
202                public void run() {
203                    if (Services.get().getSystemMode() == SYSTEM_MODE.SAFEMODE) {
204                        log.trace("schedule[run/Runnable] System is in SAFEMODE. Therefore nothing will run");
205                        return;
206                    }
207                    try {
208                        runnable.run();
209                    }
210                    catch (Exception ex) {
211                        log.warn("Error executing runnable [{0}], {1}", runnable.getClass().getSimpleName(),
212                                 ex.getMessage(), ex);
213                    }
214                }
215            };
216            if (!scheduler.isShutdown()) {
217                scheduler.schedule(r, delay * unit.getMillis(), TimeUnit.MILLISECONDS);
218            }
219            else {
220                log.warn("Scheduler shutting down, ignoring scheduling of [{0}]", runnable.getClass());
221            }
222        }
223    
224    }