001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.service; 020 021import org.apache.hadoop.conf.Configuration; 022import org.apache.oozie.client.OozieClient.SYSTEM_MODE; 023import org.apache.oozie.util.XLog; 024 025import java.util.concurrent.Callable; 026import java.util.concurrent.ScheduledExecutorService; 027import java.util.concurrent.ScheduledThreadPoolExecutor; 028import java.util.concurrent.TimeUnit; 029 030/** 031 * This service executes scheduled Runnables and Callables at regular intervals. <p/> It uses a 032 * java.util.concurrent.ScheduledExecutorService. <p/> The {@link #SCHEDULER_THREADS} configuration property indicates 033 * how many threads the scheduler will use to run scheduled commands. 034 */ 035public class SchedulerService implements Service { 036 037 public static final String CONF_PREFIX = Service.CONF_PREFIX + "SchedulerService."; 038 039 public static final String SCHEDULER_THREADS = CONF_PREFIX + "threads"; 040 041 private final XLog log = XLog.getLog(getClass()); 042 043 private ScheduledExecutorService scheduler; 044 045 /** 046 * Initialize the scheduler service. 047 * 048 * @param services services instance. 049 */ 050 @Override 051 public void init(Services services) { 052 scheduler = new ScheduledThreadPoolExecutor(getSchedulableThreads(services.getConf())); 053 } 054 055 /** 056 * Destroy the scheduler service. 057 */ 058 @Override 059 public void destroy() { 060 try { 061 long limit = System.currentTimeMillis() + 30 * 1000;// 30 seconds 062 scheduler.shutdownNow(); 063 while (!scheduler.awaitTermination(1000, TimeUnit.MILLISECONDS)) { 064 log.info("Waiting for scheduler to shutdown"); 065 if (System.currentTimeMillis() > limit) { 066 log.warn("Gave up, continuing without waiting for scheduler to shutdown"); 067 break; 068 } 069 } 070 } 071 catch (InterruptedException ex) { 072 log.warn(ex); 073 } 074 } 075 076 /** 077 * Return the public interface for scheduler service. 078 * 079 * @return {@link SchedulerService}. 080 */ 081 @Override 082 public Class<? extends Service> getInterface() { 083 return SchedulerService.class; 084 } 085 086 /** 087 * Return the java.util.concurrent.ScheduledExecutorService instance used by the SchedulerService. <p/> 088 * 089 * @return the scheduled executor service instance. 090 */ 091 public ScheduledExecutorService getScheduler() { 092 return scheduler; 093 } 094 095 /** 096 * Return the number of threads configured with the Scheduler Service 097 * @param conf 098 * @return int num threads 099 */ 100 public int getSchedulableThreads(Configuration conf) { 101 return ConfigurationService.getInt(conf, SCHEDULER_THREADS); 102 } 103 104 public enum Unit { 105 MILLISEC(1), 106 SEC(1000), 107 MIN(1000 * 60), 108 HOUR(1000 * 60 * 60); 109 110 private long millis; 111 112 private Unit(long millis) { 113 this.millis = millis; 114 } 115 116 private long getMillis() { 117 return millis; 118 } 119 120 } 121 122 /** 123 * Schedule a Callable for execution. 124 * 125 * @param callable callable to schedule for execution. 126 * @param delay delay for first execution since scheduling. 127 * @param interval interval between executions. 128 * @param unit scheduling unit. 129 */ 130 public void schedule(final Callable<Void> callable, long delay, long interval, Unit unit) { 131 log.trace("Scheduling callable [{0}], interval [{1}] seconds, delay [{2}] in [{3}]", 132 callable.getClass(), delay, interval, unit); 133 Runnable r = new Runnable() { 134 public void run() { 135 if (Services.get().getSystemMode() == SYSTEM_MODE.SAFEMODE) { 136 log.trace("schedule[run/callable] System is in SAFEMODE. Therefore nothing will run"); 137 return; 138 } 139 try { 140 callable.call(); 141 } 142 catch (Exception ex) { 143 log.warn("Error executing callable [{0}], {1}", callable.getClass().getSimpleName(), 144 ex.getMessage(), ex); 145 } 146 } 147 }; 148 if (!scheduler.isShutdown()) { 149 scheduler.scheduleWithFixedDelay(r, delay * unit.getMillis(), interval * unit.getMillis(), 150 TimeUnit.MILLISECONDS); 151 } 152 else { 153 log.warn("Scheduler shutting down, ignoring scheduling of [{0}]", callable.getClass()); 154 } 155 } 156 157 /** 158 * Schedule a Runnable for execution. 159 * 160 * @param runnable Runnable to schedule for execution. 161 * @param delay delay for first execution since scheduling. 162 * @param interval interval between executions. 163 * @param unit scheduling unit. 164 */ 165 public void schedule(final Runnable runnable, long delay, long interval, Unit unit) { 166 log.trace("Scheduling runnable [{0}], interval [{1}], delay [{2}] in [{3}]", 167 runnable.getClass(), delay, interval, unit); 168 Runnable r = new Runnable() { 169 public void run() { 170 if (Services.get().getSystemMode() == SYSTEM_MODE.SAFEMODE) { 171 log.trace("schedule[run/Runnable] System is in SAFEMODE. Therefore nothing will run"); 172 return; 173 } 174 try { 175 runnable.run(); 176 } 177 catch (Exception ex) { 178 log.warn("Error executing runnable [{0}], {1}", runnable.getClass().getSimpleName(), 179 ex.getMessage(), ex); 180 } 181 } 182 }; 183 if (!scheduler.isShutdown()) { 184 scheduler.scheduleWithFixedDelay(r, delay * unit.getMillis(), interval * unit.getMillis(), 185 TimeUnit.MILLISECONDS); 186 } 187 else { 188 log.warn("Scheduler shutting down, ignoring scheduling of [{0}]", runnable.getClass()); 189 } 190 } 191 192 /** 193 * Schedule a Runnable for execution. 194 * 195 * @param runnable Runnable to schedule for execution. 196 * @param delay the time from now to delay execution. 197 * @param unit scheduling unit. 198 */ 199 public void schedule(final Runnable runnable, long delay, Unit unit) { 200 log.trace("Scheduling runnable [{0}], delay [{1}] in [{2}]", 201 runnable.getClass(), delay, unit); 202 Runnable r = new Runnable() { 203 public void run() { 204 if (Services.get().getSystemMode() == SYSTEM_MODE.SAFEMODE) { 205 log.trace("schedule[run/Runnable] System is in SAFEMODE. Therefore nothing will run"); 206 return; 207 } 208 try { 209 runnable.run(); 210 } 211 catch (Exception ex) { 212 log.warn("Error executing runnable [{0}], {1}", runnable.getClass().getSimpleName(), 213 ex.getMessage(), ex); 214 } 215 } 216 }; 217 if (!scheduler.isShutdown()) { 218 scheduler.schedule(r, delay * unit.getMillis(), TimeUnit.MILLISECONDS); 219 } 220 else { 221 log.warn("Scheduler shutting down, ignoring scheduling of [{0}]", runnable.getClass()); 222 } 223 } 224 225}