001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.service;
019    
020    
021    import org.apache.hadoop.conf.Configuration;
022    import org.apache.oozie.ErrorCode;
023    import org.apache.oozie.event.BundleJobEvent;
024    import org.apache.oozie.event.CoordinatorActionEvent;
025    import org.apache.oozie.event.CoordinatorJobEvent;
026    import org.apache.oozie.client.event.Event;
027    import org.apache.oozie.client.event.Event.MessageType;
028    import org.apache.oozie.client.event.JobEvent;
029    import org.apache.oozie.event.EventQueue;
030    import org.apache.oozie.event.MemoryEventQueue;
031    import org.apache.oozie.event.WorkflowActionEvent;
032    import org.apache.oozie.event.WorkflowJobEvent;
033    import org.apache.oozie.event.listener.JobEventListener;
034    import org.apache.oozie.sla.listener.SLAEventListener;
035    import org.apache.oozie.client.event.SLAEvent;
036    import org.apache.oozie.util.XLog;
037    
038    import java.util.ArrayList;
039    import java.util.HashMap;
040    import java.util.HashSet;
041    import java.util.Iterator;
042    import java.util.List;
043    import java.util.Map;
044    import java.util.Set;
045    
046    /**
047     * Service class that handles the events system - creating events queue,
048     * managing configured properties and managing and invoking various event
049     * listeners via worker threads
050     */
051    public class EventHandlerService implements Service {
052    
053        public static final String CONF_PREFIX = Service.CONF_PREFIX + "EventHandlerService.";
054        public static final String CONF_QUEUE_SIZE = CONF_PREFIX + "queue.size";
055        public static final String CONF_EVENT_QUEUE = CONF_PREFIX + "event.queue";
056        public static final String CONF_LISTENERS = CONF_PREFIX + "event.listeners";
057        public static final String CONF_FILTER_APP_TYPES = CONF_PREFIX + "filter.app.types";
058        public static final String CONF_BATCH_SIZE = CONF_PREFIX + "batch.size";
059        public static final String CONF_WORKER_THREADS = CONF_PREFIX + "worker.threads";
060        public static final String CONF_WORKER_INTERVAL = CONF_PREFIX + "worker.interval";
061    
062        private static EventQueue eventQueue;
063        private XLog LOG;
064        private Map<MessageType, List<?>> listenerMap = new HashMap<MessageType, List<?>>();
065        private Set<String> apptypes;
066        private static boolean eventsEnabled = false;
067        private int numWorkers;
068    
069        @Override
070        public void init(Services services) throws ServiceException {
071            try {
072                Configuration conf = services.getConf();
073                LOG = XLog.getLog(getClass());
074                Class<? extends EventQueue> queueImpl = (Class<? extends EventQueue>) conf.getClass(CONF_EVENT_QUEUE, null);
075                eventQueue = queueImpl == null ? new MemoryEventQueue() : (EventQueue) queueImpl.newInstance();
076                eventQueue.init(conf);
077                // initialize app-types to switch on events for
078                initApptypes(conf);
079                // initialize event listeners
080                initEventListeners(conf);
081                // initialize worker threads via Scheduler
082                initWorkerThreads(conf, services);
083                eventsEnabled = true;
084                LOG.info("EventHandlerService initialized. Event queue = [{0}], Event listeners configured = [{1}],"
085                        + " Events configured for App-types = [{2}], Num Worker Threads = [{3}]", eventQueue.getClass()
086                        .getName(), listenerMap.toString(), apptypes, numWorkers);
087            }
088            catch (Exception ex) {
089                throw new ServiceException(ErrorCode.E0100, ex.getMessage(), ex);
090            }
091        }
092    
093        private void initApptypes(Configuration conf) {
094            apptypes = new HashSet<String>();
095            for (String jobtype : conf.getStringCollection(CONF_FILTER_APP_TYPES)) {
096                String tmp = jobtype.trim().toLowerCase();
097                if (tmp.length() == 0) {
098                    continue;
099                }
100                apptypes.add(tmp);
101            }
102        }
103    
104        private void initEventListeners(Configuration conf) throws Exception {
105            Class<?>[] listenerClass = conf.getClasses(CONF_LISTENERS,
106                    org.apache.oozie.jms.JMSJobEventListener.class,
107                    org.apache.oozie.sla.listener.SLAJobEventListener.class);
108            for (int i = 0; i < listenerClass.length; i++) {
109                Object listener = null;
110                try {
111                    listener = listenerClass[i].newInstance();
112                }
113                catch (InstantiationException e) {
114                    LOG.warn("Could not create event listener instance, " + e);
115                }
116                catch (IllegalAccessException e) {
117                    LOG.warn("Illegal access to event listener instance, " + e);
118                }
119                addEventListener(listener, conf, listenerClass[i].getName());
120            }
121        }
122    
123        @SuppressWarnings({ "rawtypes", "unchecked" })
124        public void addEventListener(Object listener, Configuration conf, String name) throws Exception {
125            if (listener instanceof JobEventListener) {
126                List listenersList = listenerMap.get(MessageType.JOB);
127                if (listenersList == null) {
128                    listenersList = new ArrayList();
129                    listenerMap.put(MessageType.JOB, listenersList);
130                }
131                listenersList.add(listener);
132                ((JobEventListener) listener).init(conf);
133            }
134            else if (listener instanceof SLAEventListener) {
135                List listenersList = listenerMap.get(MessageType.SLA);
136                if (listenersList == null) {
137                    listenersList = new ArrayList();
138                    listenerMap.put(MessageType.SLA, listenersList);
139                }
140                listenersList.add(listener);
141                ((SLAEventListener) listener).init(conf);
142            }
143            else {
144                LOG.warn("Event listener [{0}] is of undefined type", name);
145            }
146        }
147    
148        public static boolean isEnabled() {
149            return eventsEnabled;
150        }
151    
152        private void initWorkerThreads(Configuration conf, Services services) throws ServiceException {
153            numWorkers = conf.getInt(CONF_WORKER_THREADS, 3);
154            int interval = conf.getInt(CONF_WORKER_INTERVAL, 30);
155            SchedulerService ss = services.get(SchedulerService.class);
156            int available = ss.getSchedulableThreads(conf);
157            if (numWorkers + 3 > available) {
158                throw new ServiceException(ErrorCode.E0100, getClass().getName(), "Event worker threads requested ["
159                        + numWorkers + "] cannot be handled with current settings. Increase "
160                        + SchedulerService.SCHEDULER_THREADS);
161            }
162            Runnable eventWorker = new EventWorker();
163            // schedule staggered runnables every 1 min interval by default
164            for (int i = 0; i < numWorkers; i++) {
165                ss.schedule(eventWorker, 10 + i * 20, interval, SchedulerService.Unit.SEC);
166            }
167        }
168    
169        @Override
170        public void destroy() {
171            eventsEnabled = false;
172            for (MessageType type : listenerMap.keySet()) {
173                Iterator<?> iter = listenerMap.get(type).iterator();
174                while (iter.hasNext()) {
175                    if (type == MessageType.JOB) {
176                        ((JobEventListener) iter.next()).destroy();
177                    }
178                    else if (type == MessageType.SLA) {
179                        ((SLAEventListener) iter.next()).destroy();
180                    }
181                }
182            }
183        }
184    
185        @Override
186        public Class<? extends Service> getInterface() {
187            return EventHandlerService.class;
188        }
189    
190        public boolean isSupportedApptype(String appType) {
191            if (!apptypes.contains(appType.toLowerCase())) {
192                return false;
193            }
194            return true;
195        }
196    
197        public void setAppTypes(Set<String> types) {
198            apptypes = types;
199        }
200    
201        public Set<String> getAppTypes() {
202            return apptypes;
203        }
204    
205        public String listEventListeners() {
206            return listenerMap.toString();
207        }
208    
209        public void queueEvent(Event event) {
210            LOG.debug("Queueing event : {0}", event);
211            LOG.trace("Stack trace while queueing event : {0}", event, new Throwable());
212            eventQueue.add(event);
213        }
214    
215        public EventQueue getEventQueue() {
216            return eventQueue;
217        }
218    
219        public class EventWorker implements Runnable {
220    
221            @Override
222            public void run() {
223                if (Thread.currentThread().isInterrupted()) {
224                    return;
225                }
226                try {
227                    if (!eventQueue.isEmpty()) {
228                        List<Event> work = eventQueue.pollBatch();
229                        for (Event event : work) {
230                            LOG.debug("Processing event : {0}", event);
231                            MessageType msgType = event.getMsgType();
232                            List<?> listeners = listenerMap.get(msgType);
233                            if (listeners != null) {
234                                Iterator<?> iter = listeners.iterator();
235                                while (iter.hasNext()) {
236                                    try {
237                                        if (msgType == MessageType.JOB) {
238                                            invokeJobEventListener((JobEventListener) iter.next(), (JobEvent) event);
239                                        }
240                                        else if (msgType == MessageType.SLA) {
241                                            invokeSLAEventListener((SLAEventListener) iter.next(), (SLAEvent) event);
242                                        }
243                                        else {
244                                            iter.next();
245                                        }
246                                    }
247                                    catch (Throwable error) {
248                                        XLog.getLog(EventHandlerService.class).debug("Throwable in EventWorker thread run : ",
249                                                error);
250                                    }
251                                }
252                            }
253                        }
254                    }
255                }
256                catch (Throwable error) {
257                    XLog.getLog(EventHandlerService.class).debug("Throwable in EventWorker thread run : ",
258                            error);
259                }
260            }
261    
262            private void invokeJobEventListener(JobEventListener jobListener, JobEvent event) {
263                switch (event.getAppType()) {
264                    case WORKFLOW_JOB:
265                        jobListener.onWorkflowJobEvent((WorkflowJobEvent)event);
266                        break;
267                    case WORKFLOW_ACTION:
268                        jobListener.onWorkflowActionEvent((WorkflowActionEvent)event);
269                        break;
270                    case COORDINATOR_JOB:
271                        jobListener.onCoordinatorJobEvent((CoordinatorJobEvent)event);
272                        break;
273                    case COORDINATOR_ACTION:
274                        jobListener.onCoordinatorActionEvent((CoordinatorActionEvent)event);
275                        break;
276                    case BUNDLE_JOB:
277                        jobListener.onBundleJobEvent((BundleJobEvent)event);
278                        break;
279                    default:
280                        XLog.getLog(EventHandlerService.class).info("Undefined Job Event app-type - {0}",
281                                event.getAppType());
282                }
283            }
284    
285            private void invokeSLAEventListener(SLAEventListener slaListener, SLAEvent event) {
286                switch (event.getEventStatus()) {
287                    case START_MET:
288                        slaListener.onStartMet(event);
289                        break;
290                    case START_MISS:
291                        slaListener.onStartMiss(event);
292                        break;
293                    case END_MET:
294                        slaListener.onEndMet(event);
295                        break;
296                    case END_MISS:
297                        slaListener.onEndMiss(event);
298                        break;
299                    case DURATION_MET:
300                        slaListener.onDurationMet(event);
301                        break;
302                    case DURATION_MISS:
303                        slaListener.onDurationMiss(event);
304                        break;
305                    default:
306                        XLog.getLog(EventHandlerService.class).info("Undefined SLA event type - {0}", event.getSLAStatus());
307                }
308            }
309        }
310    
311    }