001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.service; 019 020 021 import org.apache.hadoop.conf.Configuration; 022 import org.apache.oozie.ErrorCode; 023 import org.apache.oozie.event.BundleJobEvent; 024 import org.apache.oozie.event.CoordinatorActionEvent; 025 import org.apache.oozie.event.CoordinatorJobEvent; 026 import org.apache.oozie.client.event.Event; 027 import org.apache.oozie.client.event.Event.MessageType; 028 import org.apache.oozie.client.event.JobEvent; 029 import org.apache.oozie.event.EventQueue; 030 import org.apache.oozie.event.MemoryEventQueue; 031 import org.apache.oozie.event.WorkflowActionEvent; 032 import org.apache.oozie.event.WorkflowJobEvent; 033 import org.apache.oozie.event.listener.JobEventListener; 034 import org.apache.oozie.sla.listener.SLAEventListener; 035 import org.apache.oozie.client.event.SLAEvent; 036 import org.apache.oozie.util.XLog; 037 038 import java.util.ArrayList; 039 import java.util.HashMap; 040 import java.util.HashSet; 041 import java.util.Iterator; 042 import java.util.List; 043 import java.util.Map; 044 import java.util.Set; 045 046 /** 047 * Service class that handles the events system - creating events queue, 048 * managing configured properties and managing and invoking various event 049 * listeners via worker threads 050 */ 051 public class EventHandlerService implements Service { 052 053 public static final String CONF_PREFIX = Service.CONF_PREFIX + "EventHandlerService."; 054 public static final String CONF_QUEUE_SIZE = CONF_PREFIX + "queue.size"; 055 public static final String CONF_EVENT_QUEUE = CONF_PREFIX + "event.queue"; 056 public static final String CONF_LISTENERS = CONF_PREFIX + "event.listeners"; 057 public static final String CONF_FILTER_APP_TYPES = CONF_PREFIX + "filter.app.types"; 058 public static final String CONF_BATCH_SIZE = CONF_PREFIX + "batch.size"; 059 public static final String CONF_WORKER_THREADS = CONF_PREFIX + "worker.threads"; 060 public static final String CONF_WORKER_INTERVAL = CONF_PREFIX + "worker.interval"; 061 062 private static EventQueue eventQueue; 063 private XLog LOG; 064 private Map<MessageType, List<?>> listenerMap = new HashMap<MessageType, List<?>>(); 065 private Set<String> apptypes; 066 private static boolean eventsEnabled = false; 067 private int numWorkers; 068 069 @Override 070 public void init(Services services) throws ServiceException { 071 try { 072 Configuration conf = services.getConf(); 073 LOG = XLog.getLog(getClass()); 074 Class<? extends EventQueue> queueImpl = (Class<? extends EventQueue>) conf.getClass(CONF_EVENT_QUEUE, null); 075 eventQueue = queueImpl == null ? new MemoryEventQueue() : (EventQueue) queueImpl.newInstance(); 076 eventQueue.init(conf); 077 // initialize app-types to switch on events for 078 initApptypes(conf); 079 // initialize event listeners 080 initEventListeners(conf); 081 // initialize worker threads via Scheduler 082 initWorkerThreads(conf, services); 083 eventsEnabled = true; 084 LOG.info("EventHandlerService initialized. Event queue = [{0}], Event listeners configured = [{1}]," 085 + " Events configured for App-types = [{2}], Num Worker Threads = [{3}]", eventQueue.getClass() 086 .getName(), listenerMap.toString(), apptypes, numWorkers); 087 } 088 catch (Exception ex) { 089 throw new ServiceException(ErrorCode.E0100, ex.getMessage(), ex); 090 } 091 } 092 093 private void initApptypes(Configuration conf) { 094 apptypes = new HashSet<String>(); 095 for (String jobtype : conf.getStringCollection(CONF_FILTER_APP_TYPES)) { 096 String tmp = jobtype.trim().toLowerCase(); 097 if (tmp.length() == 0) { 098 continue; 099 } 100 apptypes.add(tmp); 101 } 102 } 103 104 private void initEventListeners(Configuration conf) throws Exception { 105 Class<?>[] listenerClass = conf.getClasses(CONF_LISTENERS, 106 org.apache.oozie.jms.JMSJobEventListener.class, 107 org.apache.oozie.sla.listener.SLAJobEventListener.class); 108 for (int i = 0; i < listenerClass.length; i++) { 109 Object listener = null; 110 try { 111 listener = listenerClass[i].newInstance(); 112 } 113 catch (InstantiationException e) { 114 LOG.warn("Could not create event listener instance, " + e); 115 } 116 catch (IllegalAccessException e) { 117 LOG.warn("Illegal access to event listener instance, " + e); 118 } 119 addEventListener(listener, conf, listenerClass[i].getName()); 120 } 121 } 122 123 @SuppressWarnings({ "rawtypes", "unchecked" }) 124 public void addEventListener(Object listener, Configuration conf, String name) throws Exception { 125 if (listener instanceof JobEventListener) { 126 List listenersList = listenerMap.get(MessageType.JOB); 127 if (listenersList == null) { 128 listenersList = new ArrayList(); 129 listenerMap.put(MessageType.JOB, listenersList); 130 } 131 listenersList.add(listener); 132 ((JobEventListener) listener).init(conf); 133 } 134 else if (listener instanceof SLAEventListener) { 135 List listenersList = listenerMap.get(MessageType.SLA); 136 if (listenersList == null) { 137 listenersList = new ArrayList(); 138 listenerMap.put(MessageType.SLA, listenersList); 139 } 140 listenersList.add(listener); 141 ((SLAEventListener) listener).init(conf); 142 } 143 else { 144 LOG.warn("Event listener [{0}] is of undefined type", name); 145 } 146 } 147 148 public static boolean isEnabled() { 149 return eventsEnabled; 150 } 151 152 private void initWorkerThreads(Configuration conf, Services services) throws ServiceException { 153 numWorkers = conf.getInt(CONF_WORKER_THREADS, 3); 154 int interval = conf.getInt(CONF_WORKER_INTERVAL, 30); 155 SchedulerService ss = services.get(SchedulerService.class); 156 int available = ss.getSchedulableThreads(conf); 157 if (numWorkers + 3 > available) { 158 throw new ServiceException(ErrorCode.E0100, getClass().getName(), "Event worker threads requested [" 159 + numWorkers + "] cannot be handled with current settings. Increase " 160 + SchedulerService.SCHEDULER_THREADS); 161 } 162 Runnable eventWorker = new EventWorker(); 163 // schedule staggered runnables every 1 min interval by default 164 for (int i = 0; i < numWorkers; i++) { 165 ss.schedule(eventWorker, 10 + i * 20, interval, SchedulerService.Unit.SEC); 166 } 167 } 168 169 @Override 170 public void destroy() { 171 eventsEnabled = false; 172 for (MessageType type : listenerMap.keySet()) { 173 Iterator<?> iter = listenerMap.get(type).iterator(); 174 while (iter.hasNext()) { 175 if (type == MessageType.JOB) { 176 ((JobEventListener) iter.next()).destroy(); 177 } 178 else if (type == MessageType.SLA) { 179 ((SLAEventListener) iter.next()).destroy(); 180 } 181 } 182 } 183 } 184 185 @Override 186 public Class<? extends Service> getInterface() { 187 return EventHandlerService.class; 188 } 189 190 public boolean isSupportedApptype(String appType) { 191 if (!apptypes.contains(appType.toLowerCase())) { 192 return false; 193 } 194 return true; 195 } 196 197 public void setAppTypes(Set<String> types) { 198 apptypes = types; 199 } 200 201 public Set<String> getAppTypes() { 202 return apptypes; 203 } 204 205 public String listEventListeners() { 206 return listenerMap.toString(); 207 } 208 209 public void queueEvent(Event event) { 210 LOG.debug("Queueing event : {0}", event); 211 LOG.trace("Stack trace while queueing event : {0}", event, new Throwable()); 212 eventQueue.add(event); 213 } 214 215 public EventQueue getEventQueue() { 216 return eventQueue; 217 } 218 219 public class EventWorker implements Runnable { 220 221 @Override 222 public void run() { 223 if (Thread.currentThread().isInterrupted()) { 224 return; 225 } 226 try { 227 if (!eventQueue.isEmpty()) { 228 List<Event> work = eventQueue.pollBatch(); 229 for (Event event : work) { 230 LOG.debug("Processing event : {0}", event); 231 MessageType msgType = event.getMsgType(); 232 List<?> listeners = listenerMap.get(msgType); 233 if (listeners != null) { 234 Iterator<?> iter = listeners.iterator(); 235 while (iter.hasNext()) { 236 try { 237 if (msgType == MessageType.JOB) { 238 invokeJobEventListener((JobEventListener) iter.next(), (JobEvent) event); 239 } 240 else if (msgType == MessageType.SLA) { 241 invokeSLAEventListener((SLAEventListener) iter.next(), (SLAEvent) event); 242 } 243 else { 244 iter.next(); 245 } 246 } 247 catch (Throwable error) { 248 XLog.getLog(EventHandlerService.class).debug("Throwable in EventWorker thread run : ", 249 error); 250 } 251 } 252 } 253 } 254 } 255 } 256 catch (Throwable error) { 257 XLog.getLog(EventHandlerService.class).debug("Throwable in EventWorker thread run : ", 258 error); 259 } 260 } 261 262 private void invokeJobEventListener(JobEventListener jobListener, JobEvent event) { 263 switch (event.getAppType()) { 264 case WORKFLOW_JOB: 265 jobListener.onWorkflowJobEvent((WorkflowJobEvent)event); 266 break; 267 case WORKFLOW_ACTION: 268 jobListener.onWorkflowActionEvent((WorkflowActionEvent)event); 269 break; 270 case COORDINATOR_JOB: 271 jobListener.onCoordinatorJobEvent((CoordinatorJobEvent)event); 272 break; 273 case COORDINATOR_ACTION: 274 jobListener.onCoordinatorActionEvent((CoordinatorActionEvent)event); 275 break; 276 case BUNDLE_JOB: 277 jobListener.onBundleJobEvent((BundleJobEvent)event); 278 break; 279 default: 280 XLog.getLog(EventHandlerService.class).info("Undefined Job Event app-type - {0}", 281 event.getAppType()); 282 } 283 } 284 285 private void invokeSLAEventListener(SLAEventListener slaListener, SLAEvent event) { 286 switch (event.getEventStatus()) { 287 case START_MET: 288 slaListener.onStartMet(event); 289 break; 290 case START_MISS: 291 slaListener.onStartMiss(event); 292 break; 293 case END_MET: 294 slaListener.onEndMet(event); 295 break; 296 case END_MISS: 297 slaListener.onEndMiss(event); 298 break; 299 case DURATION_MET: 300 slaListener.onDurationMet(event); 301 break; 302 case DURATION_MISS: 303 slaListener.onDurationMiss(event); 304 break; 305 default: 306 XLog.getLog(EventHandlerService.class).info("Undefined SLA event type - {0}", event.getSLAStatus()); 307 } 308 } 309 } 310 311 }