001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.service; 020 021 022import java.util.ArrayList; 023import java.util.HashMap; 024import java.util.HashSet; 025import java.util.Iterator; 026import java.util.List; 027import java.util.Map; 028import java.util.Set; 029 030import org.apache.hadoop.conf.Configuration; 031import org.apache.oozie.ErrorCode; 032import org.apache.oozie.client.event.Event; 033import org.apache.oozie.client.event.Event.MessageType; 034import org.apache.oozie.client.event.JobEvent; 035import org.apache.oozie.client.event.SLAEvent; 036import org.apache.oozie.event.BundleJobEvent; 037import org.apache.oozie.event.CoordinatorActionEvent; 038import org.apache.oozie.event.CoordinatorJobEvent; 039import org.apache.oozie.event.EventQueue; 040import org.apache.oozie.event.MemoryEventQueue; 041import org.apache.oozie.event.WorkflowActionEvent; 042import org.apache.oozie.event.WorkflowJobEvent; 043import org.apache.oozie.event.listener.JobEventListener; 044import org.apache.oozie.sla.listener.SLAEventListener; 045import org.apache.oozie.util.LogUtils; 046import org.apache.oozie.util.XLog; 047 048/** 049 * Service class that handles the events system - creating events queue, 050 * managing configured properties and managing and invoking various event 051 * listeners via worker threads 052 */ 053public class EventHandlerService implements Service { 054 055 public static final String CONF_PREFIX = Service.CONF_PREFIX + "EventHandlerService."; 056 public static final String CONF_QUEUE_SIZE = CONF_PREFIX + "queue.size"; 057 public static final String CONF_EVENT_QUEUE = CONF_PREFIX + "event.queue"; 058 public static final String CONF_LISTENERS = CONF_PREFIX + "event.listeners"; 059 public static final String CONF_FILTER_APP_TYPES = CONF_PREFIX + "filter.app.types"; 060 public static final String CONF_BATCH_SIZE = CONF_PREFIX + "batch.size"; 061 public static final String CONF_WORKER_THREADS = CONF_PREFIX + "worker.threads"; 062 public static final String CONF_WORKER_INTERVAL = CONF_PREFIX + "worker.interval"; 063 064 private static EventQueue eventQueue; 065 private XLog LOG; 066 private Map<MessageType, List<?>> listenerMap = new HashMap<MessageType, List<?>>(); 067 private Set<String> apptypes; 068 private static boolean eventsEnabled = false; 069 private int numWorkers; 070 071 @Override 072 public void init(Services services) throws ServiceException { 073 try { 074 Configuration conf = services.getConf(); 075 LOG = XLog.getLog(getClass()); 076 Class<? extends EventQueue> queueImpl = (Class<? extends EventQueue>) ConfigurationService.getClass 077 (conf, CONF_EVENT_QUEUE); 078 eventQueue = queueImpl == null ? new MemoryEventQueue() : (EventQueue) queueImpl.newInstance(); 079 eventQueue.init(conf); 080 // initialize app-types to switch on events for 081 initApptypes(conf); 082 // initialize event listeners 083 initEventListeners(conf); 084 // initialize worker threads via Scheduler 085 initWorkerThreads(conf, services); 086 eventsEnabled = true; 087 LOG.info("EventHandlerService initialized. Event queue = [{0}], Event listeners configured = [{1}]," 088 + " Events configured for App-types = [{2}], Num Worker Threads = [{3}]", eventQueue.getClass() 089 .getName(), listenerMap.toString(), apptypes, numWorkers); 090 } 091 catch (Exception ex) { 092 throw new ServiceException(ErrorCode.E0100, ex.getMessage(), ex); 093 } 094 } 095 096 private void initApptypes(Configuration conf) { 097 apptypes = new HashSet<String>(); 098 for (String jobtype : ConfigurationService.getStrings(conf, CONF_FILTER_APP_TYPES)) { 099 String tmp = jobtype.trim().toLowerCase(); 100 if (tmp.length() == 0) { 101 continue; 102 } 103 apptypes.add(tmp); 104 } 105 } 106 107 private void initEventListeners(Configuration conf) throws Exception { 108 Class<?>[] listenerClass = ConfigurationService.getClasses(conf, CONF_LISTENERS); 109 for (int i = 0; i < listenerClass.length; i++) { 110 Object listener = null; 111 try { 112 listener = listenerClass[i].newInstance(); 113 } 114 catch (InstantiationException e) { 115 LOG.warn("Could not create event listener instance, " + e); 116 } 117 catch (IllegalAccessException e) { 118 LOG.warn("Illegal access to event listener instance, " + e); 119 } 120 addEventListener(listener, conf, listenerClass[i].getName()); 121 } 122 } 123 124 @SuppressWarnings({ "rawtypes", "unchecked" }) 125 public void addEventListener(Object listener, Configuration conf, String name) throws Exception { 126 if (listener instanceof JobEventListener) { 127 List listenersList = listenerMap.get(MessageType.JOB); 128 if (listenersList == null) { 129 listenersList = new ArrayList(); 130 listenerMap.put(MessageType.JOB, listenersList); 131 } 132 listenersList.add(listener); 133 ((JobEventListener) listener).init(conf); 134 } 135 else if (listener instanceof SLAEventListener) { 136 List listenersList = listenerMap.get(MessageType.SLA); 137 if (listenersList == null) { 138 listenersList = new ArrayList(); 139 listenerMap.put(MessageType.SLA, listenersList); 140 } 141 listenersList.add(listener); 142 ((SLAEventListener) listener).init(conf); 143 } 144 else { 145 LOG.warn("Event listener [{0}] is of undefined type", name); 146 } 147 } 148 149 public static boolean isEnabled() { 150 return eventsEnabled; 151 } 152 153 private void initWorkerThreads(Configuration conf, Services services) throws ServiceException { 154 numWorkers = ConfigurationService.getInt(conf, CONF_WORKER_THREADS); 155 int interval = ConfigurationService.getInt(conf, CONF_WORKER_INTERVAL); 156 SchedulerService ss = services.get(SchedulerService.class); 157 int available = ss.getSchedulableThreads(conf); 158 if (numWorkers + 3 > available) { 159 throw new ServiceException(ErrorCode.E0100, getClass().getName(), "Event worker threads requested [" 160 + numWorkers + "] cannot be handled with current settings. Increase " 161 + SchedulerService.SCHEDULER_THREADS); 162 } 163 Runnable eventWorker = new EventWorker(); 164 // schedule staggered runnables every 1 min interval by default 165 for (int i = 0; i < numWorkers; i++) { 166 ss.schedule(eventWorker, 10 + i * 20, interval, SchedulerService.Unit.SEC); 167 } 168 } 169 170 @Override 171 public void destroy() { 172 eventsEnabled = false; 173 for (MessageType type : listenerMap.keySet()) { 174 Iterator<?> iter = listenerMap.get(type).iterator(); 175 while (iter.hasNext()) { 176 if (type == MessageType.JOB) { 177 ((JobEventListener) iter.next()).destroy(); 178 } 179 else if (type == MessageType.SLA) { 180 ((SLAEventListener) iter.next()).destroy(); 181 } 182 } 183 } 184 } 185 186 @Override 187 public Class<? extends Service> getInterface() { 188 return EventHandlerService.class; 189 } 190 191 public boolean isSupportedApptype(String appType) { 192 if (!apptypes.contains(appType.toLowerCase())) { 193 return false; 194 } 195 return true; 196 } 197 198 public void setAppTypes(Set<String> types) { 199 apptypes = types; 200 } 201 202 public Set<String> getAppTypes() { 203 return apptypes; 204 } 205 206 public String listEventListeners() { 207 return listenerMap.toString(); 208 } 209 210 public void queueEvent(Event event) { 211 LOG = LogUtils.setLogPrefix(LOG, event); 212 LOG.debug("Queueing event : {0}", event); 213 LOG.trace("Stack trace while queueing event : {0}", event, new Throwable()); 214 eventQueue.add(event); 215 LogUtils.clearLogPrefix(); 216 } 217 218 public EventQueue getEventQueue() { 219 return eventQueue; 220 } 221 222 public class EventWorker implements Runnable { 223 224 @Override 225 public void run() { 226 if (Thread.currentThread().isInterrupted()) { 227 return; 228 } 229 try { 230 if (!eventQueue.isEmpty()) { 231 List<Event> work = eventQueue.pollBatch(); 232 for (Event event : work) { 233 LOG = LogUtils.setLogPrefix(LOG, event); 234 LOG.debug("Processing event : {0}", event); 235 MessageType msgType = event.getMsgType(); 236 List<?> listeners = listenerMap.get(msgType); 237 if (listeners != null) { 238 Iterator<?> iter = listeners.iterator(); 239 while (iter.hasNext()) { 240 try { 241 if (msgType == MessageType.JOB) { 242 invokeJobEventListener((JobEventListener) iter.next(), (JobEvent) event); 243 } 244 else if (msgType == MessageType.SLA) { 245 invokeSLAEventListener((SLAEventListener) iter.next(), (SLAEvent) event); 246 } 247 else { 248 iter.next(); 249 } 250 } 251 catch (Throwable error) { 252 XLog.getLog(EventHandlerService.class).debug("Throwable in EventWorker thread run : ", 253 error); 254 } 255 } 256 } 257 } 258 } 259 } 260 catch (Throwable error) { 261 XLog.getLog(EventHandlerService.class).debug("Throwable in EventWorker thread run : ", 262 error); 263 } 264 } 265 266 private void invokeJobEventListener(JobEventListener jobListener, JobEvent event) { 267 switch (event.getAppType()) { 268 case WORKFLOW_JOB: 269 jobListener.onWorkflowJobEvent((WorkflowJobEvent)event); 270 break; 271 case WORKFLOW_ACTION: 272 jobListener.onWorkflowActionEvent((WorkflowActionEvent)event); 273 break; 274 case COORDINATOR_JOB: 275 jobListener.onCoordinatorJobEvent((CoordinatorJobEvent)event); 276 break; 277 case COORDINATOR_ACTION: 278 jobListener.onCoordinatorActionEvent((CoordinatorActionEvent)event); 279 break; 280 case BUNDLE_JOB: 281 jobListener.onBundleJobEvent((BundleJobEvent)event); 282 break; 283 default: 284 XLog.getLog(EventHandlerService.class).info("Undefined Job Event app-type - {0}", 285 event.getAppType()); 286 } 287 } 288 289 private void invokeSLAEventListener(SLAEventListener slaListener, SLAEvent event) { 290 switch (event.getEventStatus()) { 291 case START_MET: 292 slaListener.onStartMet(event); 293 break; 294 case START_MISS: 295 slaListener.onStartMiss(event); 296 break; 297 case END_MET: 298 slaListener.onEndMet(event); 299 break; 300 case END_MISS: 301 slaListener.onEndMiss(event); 302 break; 303 case DURATION_MET: 304 slaListener.onDurationMet(event); 305 break; 306 case DURATION_MISS: 307 slaListener.onDurationMiss(event); 308 break; 309 default: 310 XLog.getLog(EventHandlerService.class).info("Undefined SLA event type - {0}", event.getSLAStatus()); 311 } 312 } 313 } 314 315}