001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.service;
019    
020    import org.apache.oozie.client.OozieClient.SYSTEM_MODE;
021    import org.apache.oozie.util.XLog;
022    
023    import java.util.concurrent.Callable;
024    import java.util.concurrent.ScheduledExecutorService;
025    import java.util.concurrent.ScheduledThreadPoolExecutor;
026    import java.util.concurrent.TimeUnit;
027    
028    /**
029     * This service executes scheduled Runnables and Callables at regular intervals. <p/> It uses a
030     * java.util.concurrent.ScheduledExecutorService. <p/> The {@link #SCHEDULER_THREADS} configuration property indicates
031     * how many threads the scheduler will use to run scheduled commands.
032     */
033    public class SchedulerService implements Service {
034    
035        public static final String CONF_PREFIX = Service.CONF_PREFIX + "SchedulerService.";
036    
037        public static final String SCHEDULER_THREADS = CONF_PREFIX + "threads";
038    
039        private final XLog log = XLog.getLog(getClass());
040    
041        private ScheduledExecutorService scheduler;
042    
043        /**
044         * Initialize the scheduler service.
045         *
046         * @param services services instance.
047         */
048        @Override
049        public void init(Services services) {
050            scheduler = new ScheduledThreadPoolExecutor(services.getConf().getInt(SCHEDULER_THREADS, 5));
051        }
052    
053        /**
054         * Destroy the scheduler service.
055         */
056        @Override
057        public void destroy() {
058            try {
059                long limit = System.currentTimeMillis() + 30 * 1000;// 30 seconds
060                scheduler.shutdownNow();
061                while (!scheduler.awaitTermination(1000, TimeUnit.MILLISECONDS)) {
062                    log.info("Waiting for scheduler to shutdown");
063                    if (System.currentTimeMillis() > limit) {
064                        log.warn("Gave up, continuing without waiting for scheduler to shutdown");
065                        break;
066                    }
067                }
068            }
069            catch (InterruptedException ex) {
070                log.warn(ex);
071            }
072        }
073    
074        /**
075         * Return the public interface for scheduler service.
076         *
077         * @return {@link SchedulerService}.
078         */
079        @Override
080        public Class<? extends Service> getInterface() {
081            return SchedulerService.class;
082        }
083    
084        /**
085         * Return the java.util.concurrent.ScheduledExecutorService instance used by the SchedulerService. <p/>
086         *
087         * @return the scheduled executor service instance.
088         */
089        public ScheduledExecutorService getScheduler() {
090            return scheduler;
091        }
092    
093        public enum Unit {
094            MILLISEC(1),
095            SEC(1000),
096            MIN(1000 * 60),
097            HOUR(1000 * 60 * 60);
098    
099            private long millis;
100    
101            private Unit(long millis) {
102                this.millis = millis;
103            }
104    
105            private long getMillis() {
106                return millis;
107            }
108    
109        }
110    
111        /**
112         * Schedule a Callable for execution.
113         *
114         * @param callable callable to schedule for execution.
115         * @param delay delay for first execution since scheduling.
116         * @param interval interval between executions.
117         * @param unit scheduling unit.
118         */
119        public void schedule(final Callable<Void> callable, long delay, long interval, Unit unit) {
120            log.trace("Scheduling callable [{0}], interval [{1}] seconds, delay [{2}] in [{3}]",
121                      callable.getClass(), delay, interval, unit);
122            Runnable r = new Runnable() {
123                public void run() {
124                    if (Services.get().getSystemMode() == SYSTEM_MODE.SAFEMODE) {
125                        log.trace("schedule[run/callable] System is in SAFEMODE. Therefore nothing will run");
126                        return;
127                    }
128                    try {
129                        callable.call();
130                    }
131                    catch (Exception ex) {
132                        log.warn("Error executing callable [{0}], {1}", callable.getClass().getSimpleName(),
133                                 ex.getMessage(), ex);
134                    }
135                }
136            };
137            if (!scheduler.isShutdown()) {
138                scheduler.scheduleWithFixedDelay(r, delay * unit.getMillis(), interval * unit.getMillis(),
139                                                 TimeUnit.MILLISECONDS);
140            }
141            else {
142                log.warn("Scheduler shutting down, ignoring scheduling of [{0}]", callable.getClass());
143            }
144        }
145    
146        /**
147         * Schedule a Runnable for execution.
148         *
149         * @param runnable Runnable to schedule for execution.
150         * @param delay delay for first execution since scheduling.
151         * @param interval interval between executions.
152         * @param unit scheduling unit.
153         */
154        public void schedule(final Runnable runnable, long delay, long interval, Unit unit) {
155            log.trace("Scheduling runnable [{0}], interval [{1}], delay [{2}] in [{3}]",
156                      runnable.getClass(), delay, interval, unit);
157            Runnable r = new Runnable() {
158                public void run() {
159                    if (Services.get().getSystemMode() == SYSTEM_MODE.SAFEMODE) {
160                        log.trace("schedule[run/Runnable] System is in SAFEMODE. Therefore nothing will run");
161                        return;
162                    }
163                    try {
164                        runnable.run();
165                    }
166                    catch (Exception ex) {
167                        log.warn("Error executing runnable [{0}], {1}", runnable.getClass().getSimpleName(),
168                                 ex.getMessage(), ex);
169                    }
170                }
171            };
172            if (!scheduler.isShutdown()) {
173                scheduler.scheduleWithFixedDelay(r, delay * unit.getMillis(), interval * unit.getMillis(),
174                                                     TimeUnit.MILLISECONDS);
175            }
176            else {
177                log.warn("Scheduler shutting down, ignoring scheduling of [{0}]", runnable.getClass());
178            }
179        }
180    
181    }