001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.service;
020
021
022import java.util.ArrayList;
023import java.util.HashMap;
024import java.util.HashSet;
025import java.util.Iterator;
026import java.util.List;
027import java.util.Map;
028import java.util.Set;
029
030import org.apache.hadoop.conf.Configuration;
031import org.apache.oozie.ErrorCode;
032import org.apache.oozie.client.event.Event;
033import org.apache.oozie.client.event.Event.MessageType;
034import org.apache.oozie.client.event.JobEvent;
035import org.apache.oozie.client.event.SLAEvent;
036import org.apache.oozie.event.BundleJobEvent;
037import org.apache.oozie.event.CoordinatorActionEvent;
038import org.apache.oozie.event.CoordinatorJobEvent;
039import org.apache.oozie.event.EventQueue;
040import org.apache.oozie.event.MemoryEventQueue;
041import org.apache.oozie.event.WorkflowActionEvent;
042import org.apache.oozie.event.WorkflowJobEvent;
043import org.apache.oozie.event.listener.JobEventListener;
044import org.apache.oozie.sla.listener.SLAEventListener;
045import org.apache.oozie.util.LogUtils;
046import org.apache.oozie.util.XLog;
047
048/**
049 * Service class that handles the events system - creating events queue,
050 * managing configured properties and managing and invoking various event
051 * listeners via worker threads
052 */
053public class EventHandlerService implements Service {
054
055    public static final String CONF_PREFIX = Service.CONF_PREFIX + "EventHandlerService.";
056    public static final String CONF_QUEUE_SIZE = CONF_PREFIX + "queue.size";
057    public static final String CONF_EVENT_QUEUE = CONF_PREFIX + "event.queue";
058    public static final String CONF_LISTENERS = CONF_PREFIX + "event.listeners";
059    public static final String CONF_FILTER_APP_TYPES = CONF_PREFIX + "filter.app.types";
060    public static final String CONF_BATCH_SIZE = CONF_PREFIX + "batch.size";
061    public static final String CONF_WORKER_THREADS = CONF_PREFIX + "worker.threads";
062    public static final String CONF_WORKER_INTERVAL = CONF_PREFIX + "worker.interval";
063
064    private static EventQueue eventQueue;
065    private XLog LOG;
066    private Map<MessageType, List<?>> listenerMap = new HashMap<MessageType, List<?>>();
067    private Set<String> apptypes;
068    private static boolean eventsEnabled = false;
069    private int numWorkers;
070
071    @Override
072    public void init(Services services) throws ServiceException {
073        try {
074            Configuration conf = services.getConf();
075            LOG = XLog.getLog(getClass());
076            Class<? extends EventQueue> queueImpl = (Class<? extends EventQueue>) ConfigurationService.getClass
077                    (conf, CONF_EVENT_QUEUE);
078            eventQueue = queueImpl == null ? new MemoryEventQueue() : (EventQueue) queueImpl.newInstance();
079            eventQueue.init(conf);
080            // initialize app-types to switch on events for
081            initApptypes(conf);
082            // initialize event listeners
083            initEventListeners(conf);
084            // initialize worker threads via Scheduler
085            initWorkerThreads(conf, services);
086            eventsEnabled = true;
087            LOG.info("EventHandlerService initialized. Event queue = [{0}], Event listeners configured = [{1}],"
088                    + " Events configured for App-types = [{2}], Num Worker Threads = [{3}]", eventQueue.getClass()
089                    .getName(), listenerMap.toString(), apptypes, numWorkers);
090        }
091        catch (Exception ex) {
092            throw new ServiceException(ErrorCode.E0100, ex.getMessage(), ex);
093        }
094    }
095
096    private void initApptypes(Configuration conf) {
097        apptypes = new HashSet<String>();
098        for (String jobtype : ConfigurationService.getStrings(conf, CONF_FILTER_APP_TYPES)) {
099            String tmp = jobtype.trim().toLowerCase();
100            if (tmp.length() == 0) {
101                continue;
102            }
103            apptypes.add(tmp);
104        }
105    }
106
107    private void initEventListeners(Configuration conf) throws Exception {
108        Class<?>[] listenerClass = ConfigurationService.getClasses(conf, CONF_LISTENERS);
109        for (int i = 0; i < listenerClass.length; i++) {
110            Object listener = null;
111            try {
112                listener = listenerClass[i].newInstance();
113            }
114            catch (InstantiationException e) {
115                LOG.warn("Could not create event listener instance, " + e);
116            }
117            catch (IllegalAccessException e) {
118                LOG.warn("Illegal access to event listener instance, " + e);
119            }
120            addEventListener(listener, conf, listenerClass[i].getName());
121        }
122    }
123
124    @SuppressWarnings({ "rawtypes", "unchecked" })
125    public void addEventListener(Object listener, Configuration conf, String name) throws Exception {
126        if (listener instanceof JobEventListener) {
127            List listenersList = listenerMap.get(MessageType.JOB);
128            if (listenersList == null) {
129                listenersList = new ArrayList();
130                listenerMap.put(MessageType.JOB, listenersList);
131            }
132            listenersList.add(listener);
133            ((JobEventListener) listener).init(conf);
134        }
135        else if (listener instanceof SLAEventListener) {
136            List listenersList = listenerMap.get(MessageType.SLA);
137            if (listenersList == null) {
138                listenersList = new ArrayList();
139                listenerMap.put(MessageType.SLA, listenersList);
140            }
141            listenersList.add(listener);
142            ((SLAEventListener) listener).init(conf);
143        }
144        else {
145            LOG.warn("Event listener [{0}] is of undefined type", name);
146        }
147    }
148
149    public static boolean isEnabled() {
150        return eventsEnabled;
151    }
152
153    private void initWorkerThreads(Configuration conf, Services services) throws ServiceException {
154        numWorkers = ConfigurationService.getInt(conf, CONF_WORKER_THREADS);
155        int interval = ConfigurationService.getInt(conf, CONF_WORKER_INTERVAL);
156        SchedulerService ss = services.get(SchedulerService.class);
157        int available = ss.getSchedulableThreads(conf);
158        if (numWorkers + 3 > available) {
159            throw new ServiceException(ErrorCode.E0100, getClass().getName(), "Event worker threads requested ["
160                    + numWorkers + "] cannot be handled with current settings. Increase "
161                    + SchedulerService.SCHEDULER_THREADS);
162        }
163        Runnable eventWorker = new EventWorker();
164        // schedule staggered runnables every 1 min interval by default
165        for (int i = 0; i < numWorkers; i++) {
166            ss.schedule(eventWorker, 10 + i * 20, interval, SchedulerService.Unit.SEC);
167        }
168    }
169
170    @Override
171    public void destroy() {
172        eventsEnabled = false;
173        for (MessageType type : listenerMap.keySet()) {
174            Iterator<?> iter = listenerMap.get(type).iterator();
175            while (iter.hasNext()) {
176                if (type == MessageType.JOB) {
177                    ((JobEventListener) iter.next()).destroy();
178                }
179                else if (type == MessageType.SLA) {
180                    ((SLAEventListener) iter.next()).destroy();
181                }
182            }
183        }
184    }
185
186    @Override
187    public Class<? extends Service> getInterface() {
188        return EventHandlerService.class;
189    }
190
191    public boolean isSupportedApptype(String appType) {
192        if (!apptypes.contains(appType.toLowerCase())) {
193            return false;
194        }
195        return true;
196    }
197
198    public void setAppTypes(Set<String> types) {
199        apptypes = types;
200    }
201
202    public Set<String> getAppTypes() {
203        return apptypes;
204    }
205
206    public String listEventListeners() {
207        return listenerMap.toString();
208    }
209
210    public void queueEvent(Event event) {
211        LOG = LogUtils.setLogPrefix(LOG, event);
212        LOG.debug("Queueing event : {0}", event);
213        LOG.trace("Stack trace while queueing event : {0}", event, new Throwable());
214        eventQueue.add(event);
215        LogUtils.clearLogPrefix();
216    }
217
218    public EventQueue getEventQueue() {
219        return eventQueue;
220    }
221
222    public class EventWorker implements Runnable {
223
224        @Override
225        public void run() {
226            if (Thread.currentThread().isInterrupted()) {
227                return;
228            }
229            try {
230                if (!eventQueue.isEmpty()) {
231                    List<Event> work = eventQueue.pollBatch();
232                    for (Event event : work) {
233                        LOG = LogUtils.setLogPrefix(LOG, event);
234                        LOG.debug("Processing event : {0}", event);
235                        MessageType msgType = event.getMsgType();
236                        List<?> listeners = listenerMap.get(msgType);
237                        if (listeners != null) {
238                            Iterator<?> iter = listeners.iterator();
239                            while (iter.hasNext()) {
240                                try {
241                                    if (msgType == MessageType.JOB) {
242                                        invokeJobEventListener((JobEventListener) iter.next(), (JobEvent) event);
243                                    }
244                                    else if (msgType == MessageType.SLA) {
245                                        invokeSLAEventListener((SLAEventListener) iter.next(), (SLAEvent) event);
246                                    }
247                                    else {
248                                        iter.next();
249                                    }
250                                }
251                                catch (Throwable error) {
252                                    XLog.getLog(EventHandlerService.class).debug("Throwable in EventWorker thread run : ",
253                                            error);
254                                }
255                            }
256                        }
257                    }
258                }
259            }
260            catch (Throwable error) {
261                XLog.getLog(EventHandlerService.class).debug("Throwable in EventWorker thread run : ",
262                        error);
263            }
264        }
265
266        private void invokeJobEventListener(JobEventListener jobListener, JobEvent event) {
267            switch (event.getAppType()) {
268                case WORKFLOW_JOB:
269                    jobListener.onWorkflowJobEvent((WorkflowJobEvent)event);
270                    break;
271                case WORKFLOW_ACTION:
272                    jobListener.onWorkflowActionEvent((WorkflowActionEvent)event);
273                    break;
274                case COORDINATOR_JOB:
275                    jobListener.onCoordinatorJobEvent((CoordinatorJobEvent)event);
276                    break;
277                case COORDINATOR_ACTION:
278                    jobListener.onCoordinatorActionEvent((CoordinatorActionEvent)event);
279                    break;
280                case BUNDLE_JOB:
281                    jobListener.onBundleJobEvent((BundleJobEvent)event);
282                    break;
283                default:
284                    XLog.getLog(EventHandlerService.class).info("Undefined Job Event app-type - {0}",
285                            event.getAppType());
286            }
287        }
288
289        private void invokeSLAEventListener(SLAEventListener slaListener, SLAEvent event) {
290            switch (event.getEventStatus()) {
291                case START_MET:
292                    slaListener.onStartMet(event);
293                    break;
294                case START_MISS:
295                    slaListener.onStartMiss(event);
296                    break;
297                case END_MET:
298                    slaListener.onEndMet(event);
299                    break;
300                case END_MISS:
301                    slaListener.onEndMiss(event);
302                    break;
303                case DURATION_MET:
304                    slaListener.onDurationMet(event);
305                    break;
306                case DURATION_MISS:
307                    slaListener.onDurationMiss(event);
308                    break;
309                default:
310                    XLog.getLog(EventHandlerService.class).info("Undefined SLA event type - {0}", event.getSLAStatus());
311            }
312        }
313    }
314
315}