001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.service; 019 020 import org.apache.oozie.client.OozieClient.SYSTEM_MODE; 021 import org.apache.oozie.util.XLog; 022 023 import java.util.concurrent.Callable; 024 import java.util.concurrent.ScheduledExecutorService; 025 import java.util.concurrent.ScheduledThreadPoolExecutor; 026 import java.util.concurrent.TimeUnit; 027 028 /** 029 * This service executes scheduled Runnables and Callables at regular intervals. <p/> It uses a 030 * java.util.concurrent.ScheduledExecutorService. <p/> The {@link #SCHEDULER_THREADS} configuration property indicates 031 * how many threads the scheduler will use to run scheduled commands. 032 */ 033 public class SchedulerService implements Service { 034 035 public static final String CONF_PREFIX = Service.CONF_PREFIX + "SchedulerService."; 036 037 public static final String SCHEDULER_THREADS = CONF_PREFIX + "threads"; 038 039 private final XLog log = XLog.getLog(getClass()); 040 041 private ScheduledExecutorService scheduler; 042 043 /** 044 * Initialize the scheduler service. 045 * 046 * @param services services instance. 047 */ 048 @Override 049 public void init(Services services) { 050 scheduler = new ScheduledThreadPoolExecutor(services.getConf().getInt(SCHEDULER_THREADS, 5)); 051 } 052 053 /** 054 * Destroy the scheduler service. 055 */ 056 @Override 057 public void destroy() { 058 try { 059 long limit = System.currentTimeMillis() + 30 * 1000;// 30 seconds 060 scheduler.shutdownNow(); 061 while (!scheduler.awaitTermination(1000, TimeUnit.MILLISECONDS)) { 062 log.info("Waiting for scheduler to shutdown"); 063 if (System.currentTimeMillis() > limit) { 064 log.warn("Gave up, continuing without waiting for scheduler to shutdown"); 065 break; 066 } 067 } 068 } 069 catch (InterruptedException ex) { 070 log.warn(ex); 071 } 072 } 073 074 /** 075 * Return the public interface for scheduler service. 076 * 077 * @return {@link SchedulerService}. 078 */ 079 @Override 080 public Class<? extends Service> getInterface() { 081 return SchedulerService.class; 082 } 083 084 /** 085 * Return the java.util.concurrent.ScheduledExecutorService instance used by the SchedulerService. <p/> 086 * 087 * @return the scheduled executor service instance. 088 */ 089 public ScheduledExecutorService getScheduler() { 090 return scheduler; 091 } 092 093 public enum Unit { 094 MILLISEC(1), 095 SEC(1000), 096 MIN(1000 * 60), 097 HOUR(1000 * 60 * 60); 098 099 private long millis; 100 101 private Unit(long millis) { 102 this.millis = millis; 103 } 104 105 private long getMillis() { 106 return millis; 107 } 108 109 } 110 111 /** 112 * Schedule a Callable for execution. 113 * 114 * @param callable callable to schedule for execution. 115 * @param delay delay for first execution since scheduling. 116 * @param interval interval between executions. 117 * @param unit scheduling unit. 118 */ 119 public void schedule(final Callable<Void> callable, long delay, long interval, Unit unit) { 120 log.trace("Scheduling callable [{0}], interval [{1}] seconds, delay [{2}] in [{3}]", 121 callable.getClass(), delay, interval, unit); 122 Runnable r = new Runnable() { 123 public void run() { 124 if (Services.get().getSystemMode() == SYSTEM_MODE.SAFEMODE) { 125 log.trace("schedule[run/callable] System is in SAFEMODE. Therefore nothing will run"); 126 return; 127 } 128 try { 129 callable.call(); 130 } 131 catch (Exception ex) { 132 log.warn("Error executing callable [{0}], {1}", callable.getClass().getSimpleName(), 133 ex.getMessage(), ex); 134 } 135 } 136 }; 137 if (!scheduler.isShutdown()) { 138 scheduler.scheduleWithFixedDelay(r, delay * unit.getMillis(), interval * unit.getMillis(), 139 TimeUnit.MILLISECONDS); 140 } 141 else { 142 log.warn("Scheduler shutting down, ignoring scheduling of [{0}]", callable.getClass()); 143 } 144 } 145 146 /** 147 * Schedule a Runnable for execution. 148 * 149 * @param runnable Runnable to schedule for execution. 150 * @param delay delay for first execution since scheduling. 151 * @param interval interval between executions. 152 * @param unit scheduling unit. 153 */ 154 public void schedule(final Runnable runnable, long delay, long interval, Unit unit) { 155 log.trace("Scheduling runnable [{0}], interval [{1}], delay [{2}] in [{3}]", 156 runnable.getClass(), delay, interval, unit); 157 Runnable r = new Runnable() { 158 public void run() { 159 if (Services.get().getSystemMode() == SYSTEM_MODE.SAFEMODE) { 160 log.trace("schedule[run/Runnable] System is in SAFEMODE. Therefore nothing will run"); 161 return; 162 } 163 try { 164 runnable.run(); 165 } 166 catch (Exception ex) { 167 log.warn("Error executing runnable [{0}], {1}", runnable.getClass().getSimpleName(), 168 ex.getMessage(), ex); 169 } 170 } 171 }; 172 if (!scheduler.isShutdown()) { 173 scheduler.scheduleWithFixedDelay(r, delay * unit.getMillis(), interval * unit.getMillis(), 174 TimeUnit.MILLISECONDS); 175 } 176 else { 177 log.warn("Scheduler shutting down, ignoring scheduling of [{0}]", runnable.getClass()); 178 } 179 } 180 181 }