001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.service; 019 020 021import org.apache.hadoop.conf.Configuration; 022import org.apache.oozie.ErrorCode; 023import org.apache.oozie.event.BundleJobEvent; 024import org.apache.oozie.event.CoordinatorActionEvent; 025import org.apache.oozie.event.CoordinatorJobEvent; 026import org.apache.oozie.client.event.Event; 027import org.apache.oozie.client.event.Event.MessageType; 028import org.apache.oozie.client.event.JobEvent; 029import org.apache.oozie.event.EventQueue; 030import org.apache.oozie.event.MemoryEventQueue; 031import org.apache.oozie.event.WorkflowActionEvent; 032import org.apache.oozie.event.WorkflowJobEvent; 033import org.apache.oozie.event.listener.JobEventListener; 034import org.apache.oozie.sla.listener.SLAEventListener; 035import org.apache.oozie.client.event.SLAEvent; 036import org.apache.oozie.util.LogUtils; 037import org.apache.oozie.util.XLog; 038 039import java.util.ArrayList; 040import java.util.HashMap; 041import java.util.HashSet; 042import java.util.Iterator; 043import java.util.List; 044import java.util.Map; 045import java.util.Set; 046 047/** 048 * Service class that handles the events system - creating events queue, 049 * managing configured properties and managing and invoking various event 050 * listeners via worker threads 051 */ 052public class EventHandlerService implements Service { 053 054 public static final String CONF_PREFIX = Service.CONF_PREFIX + "EventHandlerService."; 055 public static final String CONF_QUEUE_SIZE = CONF_PREFIX + "queue.size"; 056 public static final String CONF_EVENT_QUEUE = CONF_PREFIX + "event.queue"; 057 public static final String CONF_LISTENERS = CONF_PREFIX + "event.listeners"; 058 public static final String CONF_FILTER_APP_TYPES = CONF_PREFIX + "filter.app.types"; 059 public static final String CONF_BATCH_SIZE = CONF_PREFIX + "batch.size"; 060 public static final String CONF_WORKER_THREADS = CONF_PREFIX + "worker.threads"; 061 public static final String CONF_WORKER_INTERVAL = CONF_PREFIX + "worker.interval"; 062 063 private static EventQueue eventQueue; 064 private XLog LOG; 065 private Map<MessageType, List<?>> listenerMap = new HashMap<MessageType, List<?>>(); 066 private Set<String> apptypes; 067 private static boolean eventsEnabled = false; 068 private int numWorkers; 069 070 @Override 071 public void init(Services services) throws ServiceException { 072 try { 073 Configuration conf = services.getConf(); 074 LOG = XLog.getLog(getClass()); 075 LOG = XLog.resetPrefix(LOG); 076 Class<? extends EventQueue> queueImpl = (Class<? extends EventQueue>) conf.getClass(CONF_EVENT_QUEUE, null); 077 eventQueue = queueImpl == null ? new MemoryEventQueue() : (EventQueue) queueImpl.newInstance(); 078 eventQueue.init(conf); 079 // initialize app-types to switch on events for 080 initApptypes(conf); 081 // initialize event listeners 082 initEventListeners(conf); 083 // initialize worker threads via Scheduler 084 initWorkerThreads(conf, services); 085 eventsEnabled = true; 086 LOG.info("EventHandlerService initialized. Event queue = [{0}], Event listeners configured = [{1}]," 087 + " Events configured for App-types = [{2}], Num Worker Threads = [{3}]", eventQueue.getClass() 088 .getName(), listenerMap.toString(), apptypes, numWorkers); 089 } 090 catch (Exception ex) { 091 throw new ServiceException(ErrorCode.E0100, ex.getMessage(), ex); 092 } 093 } 094 095 private void initApptypes(Configuration conf) { 096 apptypes = new HashSet<String>(); 097 for (String jobtype : conf.getStringCollection(CONF_FILTER_APP_TYPES)) { 098 String tmp = jobtype.trim().toLowerCase(); 099 if (tmp.length() == 0) { 100 continue; 101 } 102 apptypes.add(tmp); 103 } 104 } 105 106 private void initEventListeners(Configuration conf) throws Exception { 107 Class<?>[] listenerClass = conf.getClasses(CONF_LISTENERS, 108 org.apache.oozie.jms.JMSJobEventListener.class, 109 org.apache.oozie.sla.listener.SLAJobEventListener.class); 110 for (int i = 0; i < listenerClass.length; i++) { 111 Object listener = null; 112 try { 113 listener = listenerClass[i].newInstance(); 114 } 115 catch (InstantiationException e) { 116 LOG.warn("Could not create event listener instance, " + e); 117 } 118 catch (IllegalAccessException e) { 119 LOG.warn("Illegal access to event listener instance, " + e); 120 } 121 addEventListener(listener, conf, listenerClass[i].getName()); 122 } 123 } 124 125 @SuppressWarnings({ "rawtypes", "unchecked" }) 126 public void addEventListener(Object listener, Configuration conf, String name) throws Exception { 127 if (listener instanceof JobEventListener) { 128 List listenersList = listenerMap.get(MessageType.JOB); 129 if (listenersList == null) { 130 listenersList = new ArrayList(); 131 listenerMap.put(MessageType.JOB, listenersList); 132 } 133 listenersList.add(listener); 134 ((JobEventListener) listener).init(conf); 135 } 136 else if (listener instanceof SLAEventListener) { 137 List listenersList = listenerMap.get(MessageType.SLA); 138 if (listenersList == null) { 139 listenersList = new ArrayList(); 140 listenerMap.put(MessageType.SLA, listenersList); 141 } 142 listenersList.add(listener); 143 ((SLAEventListener) listener).init(conf); 144 } 145 else { 146 LOG.warn("Event listener [{0}] is of undefined type", name); 147 } 148 } 149 150 public static boolean isEnabled() { 151 return eventsEnabled; 152 } 153 154 private void initWorkerThreads(Configuration conf, Services services) throws ServiceException { 155 numWorkers = conf.getInt(CONF_WORKER_THREADS, 3); 156 int interval = conf.getInt(CONF_WORKER_INTERVAL, 30); 157 SchedulerService ss = services.get(SchedulerService.class); 158 int available = ss.getSchedulableThreads(conf); 159 if (numWorkers + 3 > available) { 160 throw new ServiceException(ErrorCode.E0100, getClass().getName(), "Event worker threads requested [" 161 + numWorkers + "] cannot be handled with current settings. Increase " 162 + SchedulerService.SCHEDULER_THREADS); 163 } 164 Runnable eventWorker = new EventWorker(); 165 // schedule staggered runnables every 1 min interval by default 166 for (int i = 0; i < numWorkers; i++) { 167 ss.schedule(eventWorker, 10 + i * 20, interval, SchedulerService.Unit.SEC); 168 } 169 } 170 171 @Override 172 public void destroy() { 173 eventsEnabled = false; 174 for (MessageType type : listenerMap.keySet()) { 175 Iterator<?> iter = listenerMap.get(type).iterator(); 176 while (iter.hasNext()) { 177 if (type == MessageType.JOB) { 178 ((JobEventListener) iter.next()).destroy(); 179 } 180 else if (type == MessageType.SLA) { 181 ((SLAEventListener) iter.next()).destroy(); 182 } 183 } 184 } 185 } 186 187 @Override 188 public Class<? extends Service> getInterface() { 189 return EventHandlerService.class; 190 } 191 192 public boolean isSupportedApptype(String appType) { 193 if (!apptypes.contains(appType.toLowerCase())) { 194 return false; 195 } 196 return true; 197 } 198 199 public void setAppTypes(Set<String> types) { 200 apptypes = types; 201 } 202 203 public Set<String> getAppTypes() { 204 return apptypes; 205 } 206 207 public String listEventListeners() { 208 return listenerMap.toString(); 209 } 210 211 public void queueEvent(Event event) { 212 LOG = LogUtils.setLogPrefix(LOG, event); 213 LOG.debug("Queueing event : {0}", event); 214 LOG.trace("Stack trace while queueing event : {0}", event, new Throwable()); 215 eventQueue.add(event); 216 LogUtils.clearLogPrefix(); 217 } 218 219 public EventQueue getEventQueue() { 220 return eventQueue; 221 } 222 223 public class EventWorker implements Runnable { 224 225 @Override 226 public void run() { 227 if (Thread.currentThread().isInterrupted()) { 228 return; 229 } 230 try { 231 if (!eventQueue.isEmpty()) { 232 List<Event> work = eventQueue.pollBatch(); 233 for (Event event : work) { 234 LOG = LogUtils.setLogPrefix(LOG, event); 235 LOG.debug("Processing event : {0}", event); 236 MessageType msgType = event.getMsgType(); 237 List<?> listeners = listenerMap.get(msgType); 238 if (listeners != null) { 239 Iterator<?> iter = listeners.iterator(); 240 while (iter.hasNext()) { 241 try { 242 if (msgType == MessageType.JOB) { 243 invokeJobEventListener((JobEventListener) iter.next(), (JobEvent) event); 244 } 245 else if (msgType == MessageType.SLA) { 246 invokeSLAEventListener((SLAEventListener) iter.next(), (SLAEvent) event); 247 } 248 else { 249 iter.next(); 250 } 251 } 252 catch (Throwable error) { 253 XLog.getLog(EventHandlerService.class).debug("Throwable in EventWorker thread run : ", 254 error); 255 } 256 } 257 } 258 } 259 } 260 } 261 catch (Throwable error) { 262 XLog.getLog(EventHandlerService.class).debug("Throwable in EventWorker thread run : ", 263 error); 264 } 265 } 266 267 private void invokeJobEventListener(JobEventListener jobListener, JobEvent event) { 268 switch (event.getAppType()) { 269 case WORKFLOW_JOB: 270 jobListener.onWorkflowJobEvent((WorkflowJobEvent)event); 271 break; 272 case WORKFLOW_ACTION: 273 jobListener.onWorkflowActionEvent((WorkflowActionEvent)event); 274 break; 275 case COORDINATOR_JOB: 276 jobListener.onCoordinatorJobEvent((CoordinatorJobEvent)event); 277 break; 278 case COORDINATOR_ACTION: 279 jobListener.onCoordinatorActionEvent((CoordinatorActionEvent)event); 280 break; 281 case BUNDLE_JOB: 282 jobListener.onBundleJobEvent((BundleJobEvent)event); 283 break; 284 default: 285 XLog.getLog(EventHandlerService.class).info("Undefined Job Event app-type - {0}", 286 event.getAppType()); 287 } 288 } 289 290 private void invokeSLAEventListener(SLAEventListener slaListener, SLAEvent event) { 291 switch (event.getEventStatus()) { 292 case START_MET: 293 slaListener.onStartMet(event); 294 break; 295 case START_MISS: 296 slaListener.onStartMiss(event); 297 break; 298 case END_MET: 299 slaListener.onEndMet(event); 300 break; 301 case END_MISS: 302 slaListener.onEndMiss(event); 303 break; 304 case DURATION_MET: 305 slaListener.onDurationMet(event); 306 break; 307 case DURATION_MISS: 308 slaListener.onDurationMiss(event); 309 break; 310 default: 311 XLog.getLog(EventHandlerService.class).info("Undefined SLA event type - {0}", event.getSLAStatus()); 312 } 313 } 314 } 315 316}