This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.service;
019
020
021 import org.apache.hadoop.conf.Configuration;
022 import org.apache.oozie.ErrorCode;
023 import org.apache.oozie.event.BundleJobEvent;
024 import org.apache.oozie.event.CoordinatorActionEvent;
025 import org.apache.oozie.event.CoordinatorJobEvent;
026 import org.apache.oozie.client.event.Event;
027 import org.apache.oozie.client.event.Event.MessageType;
028 import org.apache.oozie.client.event.JobEvent;
029 import org.apache.oozie.event.EventQueue;
030 import org.apache.oozie.event.MemoryEventQueue;
031 import org.apache.oozie.event.WorkflowActionEvent;
032 import org.apache.oozie.event.WorkflowJobEvent;
033 import org.apache.oozie.event.listener.JobEventListener;
034 import org.apache.oozie.sla.listener.SLAEventListener;
035 import org.apache.oozie.client.event.SLAEvent;
036 import org.apache.oozie.util.XLog;
037
038 import java.util.ArrayList;
039 import java.util.HashMap;
040 import java.util.HashSet;
041 import java.util.Iterator;
042 import java.util.List;
043 import java.util.Map;
044 import java.util.Set;
045
046 /**
047 * Service class that handles the events system - creating events queue,
048 * managing configured properties and managing and invoking various event
049 * listeners via worker threads
050 */
051 public class EventHandlerService implements Service {
052
053 public static final String CONF_PREFIX = Service.CONF_PREFIX + "EventHandlerService.";
054 public static final String CONF_QUEUE_SIZE = CONF_PREFIX + "queue.size";
055 public static final String CONF_EVENT_QUEUE = CONF_PREFIX + "event.queue";
056 public static final String CONF_LISTENERS = CONF_PREFIX + "event.listeners";
057 public static final String CONF_FILTER_APP_TYPES = CONF_PREFIX + "filter.app.types";
058 public static final String CONF_BATCH_SIZE = CONF_PREFIX + "batch.size";
059 public static final String CONF_WORKER_THREADS = CONF_PREFIX + "worker.threads";
060 public static final String CONF_WORKER_INTERVAL = CONF_PREFIX + "worker.interval";
061
062 private static EventQueue eventQueue;
063 private XLog LOG;
064 private Map<MessageType, List<?>> listenerMap = new HashMap<MessageType, List<?>>();
065 private Set<String> apptypes;
066 private static boolean eventsEnabled = false;
067 private int numWorkers;
068
069 @Override
070 public void init(Services services) throws ServiceException {
071 try {
072 Configuration conf = services.getConf();
073 LOG = XLog.getLog(getClass());
074 Class<? extends EventQueue> queueImpl = (Class<? extends EventQueue>) conf.getClass(CONF_EVENT_QUEUE, null);
075 eventQueue = queueImpl == null ? new MemoryEventQueue() : (EventQueue) queueImpl.newInstance();
076 eventQueue.init(conf);
077 // initialize app-types to switch on events for
078 initApptypes(conf);
079 // initialize event listeners
080 initEventListeners(conf);
081 // initialize worker threads via Scheduler
082 initWorkerThreads(conf, services);
083 eventsEnabled = true;
084 LOG.info("EventHandlerService initialized. Event queue = [{0}], Event listeners configured = [{1}],"
085 + " Events configured for App-types = [{2}], Num Worker Threads = [{3}]", eventQueue.getClass()
086 .getName(), listenerMap.toString(), apptypes, numWorkers);
087 }
088 catch (Exception ex) {
089 throw new ServiceException(ErrorCode.E0100, ex.getMessage(), ex);
090 }
091 }
092
093 private void initApptypes(Configuration conf) {
094 apptypes = new HashSet<String>();
095 for (String jobtype : conf.getStringCollection(CONF_FILTER_APP_TYPES)) {
096 String tmp = jobtype.trim().toLowerCase();
097 if (tmp.length() == 0) {
098 continue;
099 }
100 apptypes.add(tmp);
101 }
102 }
103
104 private void initEventListeners(Configuration conf) throws Exception {
105 Class<?>[] listenerClass = conf.getClasses(CONF_LISTENERS,
106 org.apache.oozie.jms.JMSJobEventListener.class,
107 org.apache.oozie.sla.listener.SLAJobEventListener.class);
108 for (int i = 0; i < listenerClass.length; i++) {
109 Object listener = null;
110 try {
111 listener = listenerClass[i].newInstance();
112 }
113 catch (InstantiationException e) {
114 LOG.warn("Could not create event listener instance, " + e);
115 }
116 catch (IllegalAccessException e) {
117 LOG.warn("Illegal access to event listener instance, " + e);
118 }
119 addEventListener(listener, conf, listenerClass[i].getName());
120 }
121 }
122
123 @SuppressWarnings({ "rawtypes", "unchecked" })
124 public void addEventListener(Object listener, Configuration conf, String name) throws Exception {
125 if (listener instanceof JobEventListener) {
126 List listenersList = listenerMap.get(MessageType.JOB);
127 if (listenersList == null) {
128 listenersList = new ArrayList();
129 listenerMap.put(MessageType.JOB, listenersList);
130 }
131 listenersList.add(listener);
132 ((JobEventListener) listener).init(conf);
133 }
134 else if (listener instanceof SLAEventListener) {
135 List listenersList = listenerMap.get(MessageType.SLA);
136 if (listenersList == null) {
137 listenersList = new ArrayList();
138 listenerMap.put(MessageType.SLA, listenersList);
139 }
140 listenersList.add(listener);
141 ((SLAEventListener) listener).init(conf);
142 }
143 else {
144 LOG.warn("Event listener [{0}] is of undefined type", name);
145 }
146 }
147
148 public static boolean isEnabled() {
149 return eventsEnabled;
150 }
151
152 private void initWorkerThreads(Configuration conf, Services services) throws ServiceException {
153 numWorkers = conf.getInt(CONF_WORKER_THREADS, 3);
154 int interval = conf.getInt(CONF_WORKER_INTERVAL, 30);
155 SchedulerService ss = services.get(SchedulerService.class);
156 int available = ss.getSchedulableThreads(conf);
157 if (numWorkers + 3 > available) {
158 throw new ServiceException(ErrorCode.E0100, getClass().getName(), "Event worker threads requested ["
159 + numWorkers + "] cannot be handled with current settings. Increase "
160 + SchedulerService.SCHEDULER_THREADS);
161 }
162 Runnable eventWorker = new EventWorker();
163 // schedule staggered runnables every 1 min interval by default
164 for (int i = 0; i < numWorkers; i++) {
165 ss.schedule(eventWorker, 10 + i * 20, interval, SchedulerService.Unit.SEC);
166 }
167 }
168
169 @Override
170 public void destroy() {
171 eventsEnabled = false;
172 for (MessageType type : listenerMap.keySet()) {
173 Iterator<?> iter = listenerMap.get(type).iterator();
174 while (iter.hasNext()) {
175 if (type == MessageType.JOB) {
176 ((JobEventListener) iter.next()).destroy();
177 }
178 else if (type == MessageType.SLA) {
179 ((SLAEventListener) iter.next()).destroy();
180 }
181 }
182 }
183 }
184
185 @Override
186 public Class<? extends Service> getInterface() {
187 return EventHandlerService.class;
188 }
189
190 public boolean isSupportedApptype(String appType) {
191 if (!apptypes.contains(appType.toLowerCase())) {
192 return false;
193 }
194 return true;
195 }
196
197 public void setAppTypes(Set<String> types) {
198 apptypes = types;
199 }
200
201 public Set<String> getAppTypes() {
202 return apptypes;
203 }
204
205 public String listEventListeners() {
206 return listenerMap.toString();
207 }
208
209 public void queueEvent(Event event) {
210 LOG.debug("Queueing event : {0}", event);
211 LOG.trace("Stack trace while queueing event : {0}", event, new Throwable());
212 eventQueue.add(event);
213 }
214
215 public EventQueue getEventQueue() {
216 return eventQueue;
217 }
218
219 public class EventWorker implements Runnable {
220
221 @Override
222 public void run() {
223 if (Thread.currentThread().isInterrupted()) {
224 return;
225 }
226 try {
227 if (!eventQueue.isEmpty()) {
228 List<Event> work = eventQueue.pollBatch();
229 for (Event event : work) {
230 LOG.debug("Processing event : {0}", event);
231 MessageType msgType = event.getMsgType();
232 List<?> listeners = listenerMap.get(msgType);
233 if (listeners != null) {
234 Iterator<?> iter = listeners.iterator();
235 while (iter.hasNext()) {
236 try {
237 if (msgType == MessageType.JOB) {
238 invokeJobEventListener((JobEventListener) iter.next(), (JobEvent) event);
239 }
240 else if (msgType == MessageType.SLA) {
241 invokeSLAEventListener((SLAEventListener) iter.next(), (SLAEvent) event);
242 }
243 else {
244 iter.next();
245 }
246 }
247 catch (Throwable error) {
248 XLog.getLog(EventHandlerService.class).debug("Throwable in EventWorker thread run : ",
249 error);
250 }
251 }
252 }
253 }
254 }
255 }
256 catch (Throwable error) {
257 XLog.getLog(EventHandlerService.class).debug("Throwable in EventWorker thread run : ",
258 error);
259 }
260 }
261
262 private void invokeJobEventListener(JobEventListener jobListener, JobEvent event) {
263 switch (event.getAppType()) {
264 case WORKFLOW_JOB:
265 jobListener.onWorkflowJobEvent((WorkflowJobEvent)event);
266 break;
267 case WORKFLOW_ACTION:
268 jobListener.onWorkflowActionEvent((WorkflowActionEvent)event);
269 break;
270 case COORDINATOR_JOB:
271 jobListener.onCoordinatorJobEvent((CoordinatorJobEvent)event);
272 break;
273 case COORDINATOR_ACTION:
274 jobListener.onCoordinatorActionEvent((CoordinatorActionEvent)event);
275 break;
276 case BUNDLE_JOB:
277 jobListener.onBundleJobEvent((BundleJobEvent)event);
278 break;
279 default:
280 XLog.getLog(EventHandlerService.class).info("Undefined Job Event app-type - {0}",
281 event.getAppType());
282 }
283 }
284
285 private void invokeSLAEventListener(SLAEventListener slaListener, SLAEvent event) {
286 switch (event.getEventStatus()) {
287 case START_MET:
288 slaListener.onStartMet(event);
289 break;
290 case START_MISS:
291 slaListener.onStartMiss(event);
292 break;
293 case END_MET:
294 slaListener.onEndMet(event);
295 break;
296 case END_MISS:
297 slaListener.onEndMiss(event);
298 break;
299 case DURATION_MET:
300 slaListener.onDurationMet(event);
301 break;
302 case DURATION_MISS:
303 slaListener.onDurationMiss(event);
304 break;
305 default:
306 XLog.getLog(EventHandlerService.class).info("Undefined SLA event type - {0}", event.getSLAStatus());
307 }
308 }
309 }
310
311 }