001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.service; 019 020 import org.apache.hadoop.conf.Configuration; 021 import org.apache.oozie.client.OozieClient.SYSTEM_MODE; 022 import org.apache.oozie.util.XLog; 023 024 import java.util.concurrent.Callable; 025 import java.util.concurrent.ScheduledExecutorService; 026 import java.util.concurrent.ScheduledThreadPoolExecutor; 027 import java.util.concurrent.TimeUnit; 028 029 /** 030 * This service executes scheduled Runnables and Callables at regular intervals. <p/> It uses a 031 * java.util.concurrent.ScheduledExecutorService. <p/> The {@link #SCHEDULER_THREADS} configuration property indicates 032 * how many threads the scheduler will use to run scheduled commands. 033 */ 034 public class SchedulerService implements Service { 035 036 public static final String CONF_PREFIX = Service.CONF_PREFIX + "SchedulerService."; 037 038 public static final String SCHEDULER_THREADS = CONF_PREFIX + "threads"; 039 040 private final XLog log = XLog.getLog(getClass()); 041 042 private ScheduledExecutorService scheduler; 043 044 /** 045 * Initialize the scheduler service. 046 * 047 * @param services services instance. 048 */ 049 @Override 050 public void init(Services services) { 051 scheduler = new ScheduledThreadPoolExecutor(getSchedulableThreads(services.getConf())); 052 } 053 054 /** 055 * Destroy the scheduler service. 056 */ 057 @Override 058 public void destroy() { 059 try { 060 long limit = System.currentTimeMillis() + 30 * 1000;// 30 seconds 061 scheduler.shutdownNow(); 062 while (!scheduler.awaitTermination(1000, TimeUnit.MILLISECONDS)) { 063 log.info("Waiting for scheduler to shutdown"); 064 if (System.currentTimeMillis() > limit) { 065 log.warn("Gave up, continuing without waiting for scheduler to shutdown"); 066 break; 067 } 068 } 069 } 070 catch (InterruptedException ex) { 071 log.warn(ex); 072 } 073 } 074 075 /** 076 * Return the public interface for scheduler service. 077 * 078 * @return {@link SchedulerService}. 079 */ 080 @Override 081 public Class<? extends Service> getInterface() { 082 return SchedulerService.class; 083 } 084 085 /** 086 * Return the java.util.concurrent.ScheduledExecutorService instance used by the SchedulerService. <p/> 087 * 088 * @return the scheduled executor service instance. 089 */ 090 public ScheduledExecutorService getScheduler() { 091 return scheduler; 092 } 093 094 /** 095 * Return the number of threads configured with the Scheduler Service 096 * @param conf 097 * @return int num threads 098 */ 099 public int getSchedulableThreads(Configuration conf) { 100 return conf.getInt(SCHEDULER_THREADS, 10); 101 } 102 103 public enum Unit { 104 MILLISEC(1), 105 SEC(1000), 106 MIN(1000 * 60), 107 HOUR(1000 * 60 * 60); 108 109 private long millis; 110 111 private Unit(long millis) { 112 this.millis = millis; 113 } 114 115 private long getMillis() { 116 return millis; 117 } 118 119 } 120 121 /** 122 * Schedule a Callable for execution. 123 * 124 * @param callable callable to schedule for execution. 125 * @param delay delay for first execution since scheduling. 126 * @param interval interval between executions. 127 * @param unit scheduling unit. 128 */ 129 public void schedule(final Callable<Void> callable, long delay, long interval, Unit unit) { 130 log.trace("Scheduling callable [{0}], interval [{1}] seconds, delay [{2}] in [{3}]", 131 callable.getClass(), delay, interval, unit); 132 Runnable r = new Runnable() { 133 public void run() { 134 if (Services.get().getSystemMode() == SYSTEM_MODE.SAFEMODE) { 135 log.trace("schedule[run/callable] System is in SAFEMODE. Therefore nothing will run"); 136 return; 137 } 138 try { 139 callable.call(); 140 } 141 catch (Exception ex) { 142 log.warn("Error executing callable [{0}], {1}", callable.getClass().getSimpleName(), 143 ex.getMessage(), ex); 144 } 145 } 146 }; 147 if (!scheduler.isShutdown()) { 148 scheduler.scheduleWithFixedDelay(r, delay * unit.getMillis(), interval * unit.getMillis(), 149 TimeUnit.MILLISECONDS); 150 } 151 else { 152 log.warn("Scheduler shutting down, ignoring scheduling of [{0}]", callable.getClass()); 153 } 154 } 155 156 /** 157 * Schedule a Runnable for execution. 158 * 159 * @param runnable Runnable to schedule for execution. 160 * @param delay delay for first execution since scheduling. 161 * @param interval interval between executions. 162 * @param unit scheduling unit. 163 */ 164 public void schedule(final Runnable runnable, long delay, long interval, Unit unit) { 165 log.trace("Scheduling runnable [{0}], interval [{1}], delay [{2}] in [{3}]", 166 runnable.getClass(), delay, interval, unit); 167 Runnable r = new Runnable() { 168 public void run() { 169 if (Services.get().getSystemMode() == SYSTEM_MODE.SAFEMODE) { 170 log.trace("schedule[run/Runnable] System is in SAFEMODE. Therefore nothing will run"); 171 return; 172 } 173 try { 174 runnable.run(); 175 } 176 catch (Exception ex) { 177 log.warn("Error executing runnable [{0}], {1}", runnable.getClass().getSimpleName(), 178 ex.getMessage(), ex); 179 } 180 } 181 }; 182 if (!scheduler.isShutdown()) { 183 scheduler.scheduleWithFixedDelay(r, delay * unit.getMillis(), interval * unit.getMillis(), 184 TimeUnit.MILLISECONDS); 185 } 186 else { 187 log.warn("Scheduler shutting down, ignoring scheduling of [{0}]", runnable.getClass()); 188 } 189 } 190 191 /** 192 * Schedule a Runnable for execution. 193 * 194 * @param runnable Runnable to schedule for execution. 195 * @param delay the time from now to delay execution. 196 * @param unit scheduling unit. 197 */ 198 public void schedule(final Runnable runnable, long delay, Unit unit) { 199 log.trace("Scheduling runnable [{0}], delay [{1}] in [{2}]", 200 runnable.getClass(), delay, unit); 201 Runnable r = new Runnable() { 202 public void run() { 203 if (Services.get().getSystemMode() == SYSTEM_MODE.SAFEMODE) { 204 log.trace("schedule[run/Runnable] System is in SAFEMODE. Therefore nothing will run"); 205 return; 206 } 207 try { 208 runnable.run(); 209 } 210 catch (Exception ex) { 211 log.warn("Error executing runnable [{0}], {1}", runnable.getClass().getSimpleName(), 212 ex.getMessage(), ex); 213 } 214 } 215 }; 216 if (!scheduler.isShutdown()) { 217 scheduler.schedule(r, delay * unit.getMillis(), TimeUnit.MILLISECONDS); 218 } 219 else { 220 log.warn("Scheduler shutting down, ignoring scheduling of [{0}]", runnable.getClass()); 221 } 222 } 223 224 }