001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.service; 020 021 022import java.util.ArrayList; 023import java.util.HashMap; 024import java.util.HashSet; 025import java.util.Iterator; 026import java.util.List; 027import java.util.Map; 028import java.util.Map.Entry; 029import java.util.Set; 030 031import org.apache.hadoop.conf.Configuration; 032import org.apache.oozie.ErrorCode; 033import org.apache.oozie.action.hadoop.PasswordMasker; 034import org.apache.oozie.client.event.Event; 035import org.apache.oozie.client.event.Event.MessageType; 036import org.apache.oozie.client.event.JobEvent; 037import org.apache.oozie.client.event.SLAEvent; 038import org.apache.oozie.event.BundleJobEvent; 039import org.apache.oozie.event.CoordinatorActionEvent; 040import org.apache.oozie.event.CoordinatorJobEvent; 041import org.apache.oozie.event.EventQueue; 042import org.apache.oozie.event.MemoryEventQueue; 043import org.apache.oozie.event.WorkflowActionEvent; 044import org.apache.oozie.event.WorkflowJobEvent; 045import org.apache.oozie.event.listener.JobEventListener; 046import org.apache.oozie.sla.listener.SLAEventListener; 047import org.apache.oozie.util.LogUtils; 048import org.apache.oozie.util.XLog; 049 050/** 051 * Service class that handles the events system - creating events queue, 052 * managing configured properties and managing and invoking various event 053 * listeners via worker threads 054 */ 055public class EventHandlerService implements Service { 056 057 public static final String CONF_PREFIX = Service.CONF_PREFIX + "EventHandlerService."; 058 public static final String CONF_QUEUE_SIZE = CONF_PREFIX + "queue.size"; 059 public static final String CONF_EVENT_QUEUE = CONF_PREFIX + "event.queue"; 060 public static final String CONF_LISTENERS = CONF_PREFIX + "event.listeners"; 061 public static final String CONF_FILTER_APP_TYPES = CONF_PREFIX + "filter.app.types"; 062 public static final String CONF_BATCH_SIZE = CONF_PREFIX + "batch.size"; 063 public static final String CONF_WORKER_THREADS = CONF_PREFIX + "worker.threads"; 064 public static final String CONF_WORKER_INTERVAL = CONF_PREFIX + "worker.interval"; 065 066 private static EventQueue eventQueue; 067 private XLog LOG; 068 private Map<MessageType, List<?>> listenerMap = new HashMap<MessageType, List<?>>(); 069 private Set<String> apptypes; 070 private static boolean eventsEnabled = false; 071 private int numWorkers; 072 073 @Override 074 public void init(Services services) throws ServiceException { 075 try { 076 Configuration conf = services.getConf(); 077 LOG = XLog.getLog(getClass()); 078 Class<? extends EventQueue> queueImpl = (Class<? extends EventQueue>) ConfigurationService.getClass 079 (conf, CONF_EVENT_QUEUE); 080 eventQueue = queueImpl == null ? new MemoryEventQueue() : (EventQueue) queueImpl.newInstance(); 081 eventQueue.init(conf); 082 // initialize app-types to switch on events for 083 initApptypes(conf); 084 // initialize event listeners 085 initEventListeners(conf); 086 // initialize worker threads via Scheduler 087 initWorkerThreads(conf, services); 088 eventsEnabled = true; 089 LOG.info("EventHandlerService initialized. Event queue = [{0}], Event listeners configured = [{1}]," 090 + " Events configured for App-types = [{2}], Num Worker Threads = [{3}]", eventQueue.getClass() 091 .getName(), listenerMap.toString(), apptypes, numWorkers); 092 } 093 catch (Exception ex) { 094 throw new ServiceException(ErrorCode.E0100, ex.getMessage(), ex); 095 } 096 } 097 098 private void initApptypes(Configuration conf) { 099 apptypes = new HashSet<String>(); 100 for (String jobtype : ConfigurationService.getStrings(conf, CONF_FILTER_APP_TYPES)) { 101 String tmp = jobtype.trim().toLowerCase(); 102 if (tmp.length() == 0) { 103 continue; 104 } 105 apptypes.add(tmp); 106 } 107 } 108 109 private void initEventListeners(Configuration conf) throws Exception { 110 Class<?>[] listenerClass = ConfigurationService.getClasses(conf, CONF_LISTENERS); 111 for (int i = 0; i < listenerClass.length; i++) { 112 Object listener = null; 113 try { 114 listener = listenerClass[i].newInstance(); 115 } 116 catch (InstantiationException e) { 117 LOG.warn("Could not create event listener instance, " + e); 118 } 119 catch (IllegalAccessException e) { 120 LOG.warn("Illegal access to event listener instance, " + e); 121 } 122 addEventListener(listener, conf, listenerClass[i].getName()); 123 } 124 } 125 126 @SuppressWarnings({ "rawtypes", "unchecked" }) 127 public void addEventListener(Object listener, Configuration conf, String name) throws Exception { 128 if (listener instanceof JobEventListener) { 129 List listenersList = listenerMap.get(MessageType.JOB); 130 if (listenersList == null) { 131 listenersList = new ArrayList(); 132 listenerMap.put(MessageType.JOB, listenersList); 133 } 134 listenersList.add(listener); 135 ((JobEventListener) listener).init(conf); 136 } 137 else if (listener instanceof SLAEventListener) { 138 List listenersList = listenerMap.get(MessageType.SLA); 139 if (listenersList == null) { 140 listenersList = new ArrayList(); 141 listenerMap.put(MessageType.SLA, listenersList); 142 } 143 listenersList.add(listener); 144 ((SLAEventListener) listener).init(conf); 145 } 146 else { 147 LOG.warn("Event listener [{0}] is of undefined type", name); 148 } 149 } 150 151 public static boolean isEnabled() { 152 return eventsEnabled; 153 } 154 155 private void initWorkerThreads(Configuration conf, Services services) throws ServiceException { 156 numWorkers = ConfigurationService.getInt(conf, CONF_WORKER_THREADS); 157 int interval = ConfigurationService.getInt(conf, CONF_WORKER_INTERVAL); 158 SchedulerService ss = services.get(SchedulerService.class); 159 int available = ss.getSchedulableThreads(conf); 160 if (numWorkers + 3 > available) { 161 throw new ServiceException(ErrorCode.E0100, getClass().getName(), "Event worker threads requested [" 162 + numWorkers + "] cannot be handled with current settings. Increase " 163 + SchedulerService.SCHEDULER_THREADS); 164 } 165 Runnable eventWorker = new EventWorker(); 166 // schedule staggered runnables every 1 min interval by default 167 for (int i = 0; i < numWorkers; i++) { 168 ss.schedule(eventWorker, 10 + i * 20, interval, SchedulerService.Unit.SEC); 169 } 170 } 171 172 @Override 173 public void destroy() { 174 eventsEnabled = false; 175 176 for (Entry<MessageType, List<?>> entry : listenerMap.entrySet()) { 177 List<?> listeners = entry.getValue(); 178 MessageType type = entry.getKey(); 179 180 for (Object listener : listeners) { 181 if (type == MessageType.JOB) { 182 ((JobEventListener) listener).destroy(); 183 } else if (type == MessageType.SLA) { 184 ((SLAEventListener) listener).destroy(); 185 } 186 } 187 } 188 } 189 190 @Override 191 public Class<? extends Service> getInterface() { 192 return EventHandlerService.class; 193 } 194 195 public boolean isSupportedApptype(String appType) { 196 if (!apptypes.contains(appType.toLowerCase())) { 197 return false; 198 } 199 return true; 200 } 201 202 public void setAppTypes(Set<String> types) { 203 apptypes = types; 204 } 205 206 public Set<String> getAppTypes() { 207 return apptypes; 208 } 209 210 public String listEventListeners() { 211 return listenerMap.toString(); 212 } 213 214 public void queueEvent(Event event) { 215 LOG = LogUtils.setLogPrefix(LOG, event); 216 LOG.debug("Queueing event : {0}", event); 217 LOG.trace("Stack trace while queueing event : {0}", event, new Throwable()); 218 eventQueue.add(event); 219 LogUtils.clearLogPrefix(); 220 } 221 222 public EventQueue getEventQueue() { 223 return eventQueue; 224 } 225 226 public class EventWorker implements Runnable { 227 228 @Override 229 public void run() { 230 if (Thread.currentThread().isInterrupted()) { 231 return; 232 } 233 try { 234 if (!eventQueue.isEmpty()) { 235 List<Event> work = eventQueue.pollBatch(); 236 for (Event event : work) { 237 LOG = LogUtils.setLogPrefix(LOG, event); 238 LOG.debug("Processing event : {0}", event); 239 MessageType msgType = event.getMsgType(); 240 List<?> listeners = listenerMap.get(msgType); 241 if (listeners != null) { 242 Iterator<?> iter = listeners.iterator(); 243 while (iter.hasNext()) { 244 try { 245 if (msgType == MessageType.JOB) { 246 invokeJobEventListener((JobEventListener) iter.next(), (JobEvent) event); 247 } 248 else if (msgType == MessageType.SLA) { 249 invokeSLAEventListener((SLAEventListener) iter.next(), (SLAEvent) event); 250 } 251 else { 252 iter.next(); 253 } 254 } 255 catch (Throwable error) { 256 XLog.getLog(EventHandlerService.class).debug("Throwable in EventWorker thread run : ", 257 error); 258 XLog.getLog(EventHandlerService.class).warn("Throwable in EventWorker thread run. " + 259 "Error message: {0}", 260 new PasswordMasker().maskPasswordsIfNecessary(error.getMessage())); 261 } 262 } 263 } 264 } 265 } 266 } 267 catch (Throwable error) { 268 XLog.getLog(EventHandlerService.class).debug("Throwable in EventWorker thread run : ", 269 error); 270 XLog.getLog(EventHandlerService.class).warn("Throwable in EventWorker thread run. " + 271 "Error message: {0}", 272 new PasswordMasker().maskPasswordsIfNecessary(error.getMessage())); 273 } 274 } 275 276 private void invokeJobEventListener(JobEventListener jobListener, JobEvent event) { 277 switch (event.getAppType()) { 278 case WORKFLOW_JOB: 279 jobListener.onWorkflowJobEvent((WorkflowJobEvent)event); 280 break; 281 case WORKFLOW_ACTION: 282 jobListener.onWorkflowActionEvent((WorkflowActionEvent)event); 283 break; 284 case COORDINATOR_JOB: 285 jobListener.onCoordinatorJobEvent((CoordinatorJobEvent)event); 286 break; 287 case COORDINATOR_ACTION: 288 jobListener.onCoordinatorActionEvent((CoordinatorActionEvent)event); 289 break; 290 case BUNDLE_JOB: 291 jobListener.onBundleJobEvent((BundleJobEvent)event); 292 break; 293 default: 294 XLog.getLog(EventHandlerService.class).info("Undefined Job Event app-type - {0}", 295 event.getAppType()); 296 } 297 } 298 299 private void invokeSLAEventListener(SLAEventListener slaListener, SLAEvent event) { 300 switch (event.getEventStatus()) { 301 case START_MET: 302 slaListener.onStartMet(event); 303 break; 304 case START_MISS: 305 slaListener.onStartMiss(event); 306 break; 307 case END_MET: 308 slaListener.onEndMet(event); 309 break; 310 case END_MISS: 311 slaListener.onEndMiss(event); 312 break; 313 case DURATION_MET: 314 slaListener.onDurationMet(event); 315 break; 316 case DURATION_MISS: 317 slaListener.onDurationMiss(event); 318 break; 319 default: 320 XLog.getLog(EventHandlerService.class).info("Undefined SLA event type - {0}", event.getSLAStatus()); 321 } 322 } 323 } 324 325}