001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.service; 020 021import java.util.concurrent.Callable; 022import java.util.concurrent.ScheduledExecutorService; 023import java.util.concurrent.ScheduledThreadPoolExecutor; 024import java.util.concurrent.TimeUnit; 025 026import org.apache.hadoop.conf.Configuration; 027import org.apache.oozie.client.OozieClient.SYSTEM_MODE; 028import org.apache.oozie.util.NamedThreadFactory; 029import org.apache.oozie.util.XLog; 030 031/** 032 * This service executes scheduled Runnables and Callables at regular intervals. <p> It uses a 033 * java.util.concurrent.ScheduledExecutorService. <p> The {@link #SCHEDULER_THREADS} configuration property indicates 034 * how many threads the scheduler will use to run scheduled commands. 035 */ 036public class SchedulerService implements Service { 037 038 public static final String CONF_PREFIX = Service.CONF_PREFIX + "SchedulerService."; 039 040 public static final String SCHEDULER_THREADS = CONF_PREFIX + "threads"; 041 042 private final XLog log = XLog.getLog(getClass()); 043 044 private ScheduledExecutorService scheduler; 045 046 /** 047 * Initialize the scheduler service. 048 * 049 * @param services services instance. 050 */ 051 @Override 052 public void init(Services services) { 053 scheduler = new ScheduledThreadPoolExecutor(getSchedulableThreads(services.getConf()), new NamedThreadFactory("Scheduler")); 054 } 055 056 /** 057 * Destroy the scheduler service. 058 */ 059 @Override 060 public void destroy() { 061 try { 062 long limit = System.currentTimeMillis() + 30 * 1000;// 30 seconds 063 scheduler.shutdownNow(); 064 while (!scheduler.awaitTermination(1000, TimeUnit.MILLISECONDS)) { 065 log.info("Waiting for scheduler to shutdown"); 066 if (System.currentTimeMillis() > limit) { 067 log.warn("Gave up, continuing without waiting for scheduler to shutdown"); 068 break; 069 } 070 } 071 } 072 catch (InterruptedException ex) { 073 log.warn(ex); 074 } 075 } 076 077 /** 078 * Return the public interface for scheduler service. 079 * 080 * @return {@link SchedulerService}. 081 */ 082 @Override 083 public Class<? extends Service> getInterface() { 084 return SchedulerService.class; 085 } 086 087 /** 088 * Return the java.util.concurrent.ScheduledExecutorService instance used by the SchedulerService. <p> 089 * 090 * @return the scheduled executor service instance. 091 */ 092 public ScheduledExecutorService getScheduler() { 093 return scheduler; 094 } 095 096 /** 097 * Return the number of threads configured with the Scheduler Service 098 * @param conf 099 * @return int num threads 100 */ 101 public int getSchedulableThreads(Configuration conf) { 102 return ConfigurationService.getInt(conf, SCHEDULER_THREADS); 103 } 104 105 public enum Unit { 106 MILLISEC(1), 107 SEC(1000), 108 MIN(1000 * 60), 109 HOUR(1000 * 60 * 60); 110 111 private long millis; 112 113 private Unit(long millis) { 114 this.millis = millis; 115 } 116 117 private long getMillis() { 118 return millis; 119 } 120 121 } 122 123 /** 124 * Schedule a Callable for execution. 125 * 126 * @param callable callable to schedule for execution. 127 * @param delay delay for first execution since scheduling. 128 * @param interval interval between executions. 129 * @param unit scheduling unit. 130 */ 131 public void schedule(final Callable<Void> callable, long delay, long interval, Unit unit) { 132 log.trace("Scheduling callable [{0}], interval [{1}] seconds, delay [{2}] in [{3}]", 133 callable.getClass(), delay, interval, unit); 134 Runnable r = new Runnable() { 135 public void run() { 136 if (Services.get().getSystemMode() == SYSTEM_MODE.SAFEMODE) { 137 log.trace("schedule[run/callable] System is in SAFEMODE. Therefore nothing will run"); 138 return; 139 } 140 try { 141 callable.call(); 142 } 143 catch (Exception ex) { 144 log.warn("Error executing callable [{0}], {1}", callable.getClass().getSimpleName(), 145 ex.getMessage(), ex); 146 } 147 } 148 }; 149 if (!scheduler.isShutdown()) { 150 scheduler.scheduleWithFixedDelay(r, delay * unit.getMillis(), interval * unit.getMillis(), 151 TimeUnit.MILLISECONDS); 152 } 153 else { 154 log.warn("Scheduler shutting down, ignoring scheduling of [{0}]", callable.getClass()); 155 } 156 } 157 158 /** 159 * Schedule a Runnable for execution. 160 * 161 * @param runnable Runnable to schedule for execution. 162 * @param delay delay for first execution since scheduling. 163 * @param interval interval between executions. 164 * @param unit scheduling unit. 165 */ 166 public void schedule(final Runnable runnable, long delay, long interval, Unit unit) { 167 log.trace("Scheduling runnable [{0}], interval [{1}], delay [{2}] in [{3}]", 168 runnable.getClass(), delay, interval, unit); 169 Runnable r = new Runnable() { 170 public void run() { 171 if (Services.get().getSystemMode() == SYSTEM_MODE.SAFEMODE) { 172 log.trace("schedule[run/Runnable] System is in SAFEMODE. Therefore nothing will run"); 173 return; 174 } 175 try { 176 runnable.run(); 177 } 178 catch (Exception ex) { 179 log.warn("Error executing runnable [{0}], {1}", runnable.getClass().getSimpleName(), 180 ex.getMessage(), ex); 181 } 182 } 183 }; 184 if (!scheduler.isShutdown()) { 185 scheduler.scheduleWithFixedDelay(r, delay * unit.getMillis(), interval * unit.getMillis(), 186 TimeUnit.MILLISECONDS); 187 } 188 else { 189 log.warn("Scheduler shutting down, ignoring scheduling of [{0}]", runnable.getClass()); 190 } 191 } 192 193 /** 194 * Schedule a Runnable for execution. 195 * 196 * @param runnable Runnable to schedule for execution. 197 * @param delay the time from now to delay execution. 198 * @param unit scheduling unit. 199 */ 200 public void schedule(final Runnable runnable, long delay, Unit unit) { 201 log.trace("Scheduling runnable [{0}], delay [{1}] in [{2}]", 202 runnable.getClass(), delay, unit); 203 Runnable r = new Runnable() { 204 public void run() { 205 if (Services.get().getSystemMode() == SYSTEM_MODE.SAFEMODE) { 206 log.trace("schedule[run/Runnable] System is in SAFEMODE. Therefore nothing will run"); 207 return; 208 } 209 try { 210 runnable.run(); 211 } 212 catch (Exception ex) { 213 log.warn("Error executing runnable [{0}], {1}", runnable.getClass().getSimpleName(), 214 ex.getMessage(), ex); 215 } 216 } 217 }; 218 if (!scheduler.isShutdown()) { 219 scheduler.schedule(r, delay * unit.getMillis(), TimeUnit.MILLISECONDS); 220 } 221 else { 222 log.warn("Scheduler shutting down, ignoring scheduling of [{0}]", runnable.getClass()); 223 } 224 } 225 226}