001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.service;
019
020
021import org.apache.hadoop.conf.Configuration;
022import org.apache.oozie.ErrorCode;
023import org.apache.oozie.event.BundleJobEvent;
024import org.apache.oozie.event.CoordinatorActionEvent;
025import org.apache.oozie.event.CoordinatorJobEvent;
026import org.apache.oozie.client.event.Event;
027import org.apache.oozie.client.event.Event.MessageType;
028import org.apache.oozie.client.event.JobEvent;
029import org.apache.oozie.event.EventQueue;
030import org.apache.oozie.event.MemoryEventQueue;
031import org.apache.oozie.event.WorkflowActionEvent;
032import org.apache.oozie.event.WorkflowJobEvent;
033import org.apache.oozie.event.listener.JobEventListener;
034import org.apache.oozie.sla.listener.SLAEventListener;
035import org.apache.oozie.client.event.SLAEvent;
036import org.apache.oozie.util.LogUtils;
037import org.apache.oozie.util.XLog;
038
039import java.util.ArrayList;
040import java.util.HashMap;
041import java.util.HashSet;
042import java.util.Iterator;
043import java.util.List;
044import java.util.Map;
045import java.util.Set;
046
047/**
048 * Service class that handles the events system - creating events queue,
049 * managing configured properties and managing and invoking various event
050 * listeners via worker threads
051 */
052public class EventHandlerService implements Service {
053
054    public static final String CONF_PREFIX = Service.CONF_PREFIX + "EventHandlerService.";
055    public static final String CONF_QUEUE_SIZE = CONF_PREFIX + "queue.size";
056    public static final String CONF_EVENT_QUEUE = CONF_PREFIX + "event.queue";
057    public static final String CONF_LISTENERS = CONF_PREFIX + "event.listeners";
058    public static final String CONF_FILTER_APP_TYPES = CONF_PREFIX + "filter.app.types";
059    public static final String CONF_BATCH_SIZE = CONF_PREFIX + "batch.size";
060    public static final String CONF_WORKER_THREADS = CONF_PREFIX + "worker.threads";
061    public static final String CONF_WORKER_INTERVAL = CONF_PREFIX + "worker.interval";
062
063    private static EventQueue eventQueue;
064    private XLog LOG;
065    private Map<MessageType, List<?>> listenerMap = new HashMap<MessageType, List<?>>();
066    private Set<String> apptypes;
067    private static boolean eventsEnabled = false;
068    private int numWorkers;
069
070    @Override
071    public void init(Services services) throws ServiceException {
072        try {
073            Configuration conf = services.getConf();
074            LOG = XLog.getLog(getClass());
075            LOG = XLog.resetPrefix(LOG);
076            Class<? extends EventQueue> queueImpl = (Class<? extends EventQueue>) conf.getClass(CONF_EVENT_QUEUE, null);
077            eventQueue = queueImpl == null ? new MemoryEventQueue() : (EventQueue) queueImpl.newInstance();
078            eventQueue.init(conf);
079            // initialize app-types to switch on events for
080            initApptypes(conf);
081            // initialize event listeners
082            initEventListeners(conf);
083            // initialize worker threads via Scheduler
084            initWorkerThreads(conf, services);
085            eventsEnabled = true;
086            LOG.info("EventHandlerService initialized. Event queue = [{0}], Event listeners configured = [{1}],"
087                    + " Events configured for App-types = [{2}], Num Worker Threads = [{3}]", eventQueue.getClass()
088                    .getName(), listenerMap.toString(), apptypes, numWorkers);
089        }
090        catch (Exception ex) {
091            throw new ServiceException(ErrorCode.E0100, ex.getMessage(), ex);
092        }
093    }
094
095    private void initApptypes(Configuration conf) {
096        apptypes = new HashSet<String>();
097        for (String jobtype : conf.getStringCollection(CONF_FILTER_APP_TYPES)) {
098            String tmp = jobtype.trim().toLowerCase();
099            if (tmp.length() == 0) {
100                continue;
101            }
102            apptypes.add(tmp);
103        }
104    }
105
106    private void initEventListeners(Configuration conf) throws Exception {
107        Class<?>[] listenerClass = conf.getClasses(CONF_LISTENERS,
108                org.apache.oozie.jms.JMSJobEventListener.class,
109                org.apache.oozie.sla.listener.SLAJobEventListener.class);
110        for (int i = 0; i < listenerClass.length; i++) {
111            Object listener = null;
112            try {
113                listener = listenerClass[i].newInstance();
114            }
115            catch (InstantiationException e) {
116                LOG.warn("Could not create event listener instance, " + e);
117            }
118            catch (IllegalAccessException e) {
119                LOG.warn("Illegal access to event listener instance, " + e);
120            }
121            addEventListener(listener, conf, listenerClass[i].getName());
122        }
123    }
124
125    @SuppressWarnings({ "rawtypes", "unchecked" })
126    public void addEventListener(Object listener, Configuration conf, String name) throws Exception {
127        if (listener instanceof JobEventListener) {
128            List listenersList = listenerMap.get(MessageType.JOB);
129            if (listenersList == null) {
130                listenersList = new ArrayList();
131                listenerMap.put(MessageType.JOB, listenersList);
132            }
133            listenersList.add(listener);
134            ((JobEventListener) listener).init(conf);
135        }
136        else if (listener instanceof SLAEventListener) {
137            List listenersList = listenerMap.get(MessageType.SLA);
138            if (listenersList == null) {
139                listenersList = new ArrayList();
140                listenerMap.put(MessageType.SLA, listenersList);
141            }
142            listenersList.add(listener);
143            ((SLAEventListener) listener).init(conf);
144        }
145        else {
146            LOG.warn("Event listener [{0}] is of undefined type", name);
147        }
148    }
149
150    public static boolean isEnabled() {
151        return eventsEnabled;
152    }
153
154    private void initWorkerThreads(Configuration conf, Services services) throws ServiceException {
155        numWorkers = conf.getInt(CONF_WORKER_THREADS, 3);
156        int interval = conf.getInt(CONF_WORKER_INTERVAL, 30);
157        SchedulerService ss = services.get(SchedulerService.class);
158        int available = ss.getSchedulableThreads(conf);
159        if (numWorkers + 3 > available) {
160            throw new ServiceException(ErrorCode.E0100, getClass().getName(), "Event worker threads requested ["
161                    + numWorkers + "] cannot be handled with current settings. Increase "
162                    + SchedulerService.SCHEDULER_THREADS);
163        }
164        Runnable eventWorker = new EventWorker();
165        // schedule staggered runnables every 1 min interval by default
166        for (int i = 0; i < numWorkers; i++) {
167            ss.schedule(eventWorker, 10 + i * 20, interval, SchedulerService.Unit.SEC);
168        }
169    }
170
171    @Override
172    public void destroy() {
173        eventsEnabled = false;
174        for (MessageType type : listenerMap.keySet()) {
175            Iterator<?> iter = listenerMap.get(type).iterator();
176            while (iter.hasNext()) {
177                if (type == MessageType.JOB) {
178                    ((JobEventListener) iter.next()).destroy();
179                }
180                else if (type == MessageType.SLA) {
181                    ((SLAEventListener) iter.next()).destroy();
182                }
183            }
184        }
185    }
186
187    @Override
188    public Class<? extends Service> getInterface() {
189        return EventHandlerService.class;
190    }
191
192    public boolean isSupportedApptype(String appType) {
193        if (!apptypes.contains(appType.toLowerCase())) {
194            return false;
195        }
196        return true;
197    }
198
199    public void setAppTypes(Set<String> types) {
200        apptypes = types;
201    }
202
203    public Set<String> getAppTypes() {
204        return apptypes;
205    }
206
207    public String listEventListeners() {
208        return listenerMap.toString();
209    }
210
211    public void queueEvent(Event event) {
212        LOG = LogUtils.setLogPrefix(LOG, event);
213        LOG.debug("Queueing event : {0}", event);
214        LOG.trace("Stack trace while queueing event : {0}", event, new Throwable());
215        eventQueue.add(event);
216        LogUtils.clearLogPrefix();
217    }
218
219    public EventQueue getEventQueue() {
220        return eventQueue;
221    }
222
223    public class EventWorker implements Runnable {
224
225        @Override
226        public void run() {
227            if (Thread.currentThread().isInterrupted()) {
228                return;
229            }
230            try {
231                if (!eventQueue.isEmpty()) {
232                    List<Event> work = eventQueue.pollBatch();
233                    for (Event event : work) {
234                        LOG = LogUtils.setLogPrefix(LOG, event);
235                        LOG.debug("Processing event : {0}", event);
236                        MessageType msgType = event.getMsgType();
237                        List<?> listeners = listenerMap.get(msgType);
238                        if (listeners != null) {
239                            Iterator<?> iter = listeners.iterator();
240                            while (iter.hasNext()) {
241                                try {
242                                    if (msgType == MessageType.JOB) {
243                                        invokeJobEventListener((JobEventListener) iter.next(), (JobEvent) event);
244                                    }
245                                    else if (msgType == MessageType.SLA) {
246                                        invokeSLAEventListener((SLAEventListener) iter.next(), (SLAEvent) event);
247                                    }
248                                    else {
249                                        iter.next();
250                                    }
251                                }
252                                catch (Throwable error) {
253                                    XLog.getLog(EventHandlerService.class).debug("Throwable in EventWorker thread run : ",
254                                            error);
255                                }
256                            }
257                        }
258                    }
259                }
260            }
261            catch (Throwable error) {
262                XLog.getLog(EventHandlerService.class).debug("Throwable in EventWorker thread run : ",
263                        error);
264            }
265        }
266
267        private void invokeJobEventListener(JobEventListener jobListener, JobEvent event) {
268            switch (event.getAppType()) {
269                case WORKFLOW_JOB:
270                    jobListener.onWorkflowJobEvent((WorkflowJobEvent)event);
271                    break;
272                case WORKFLOW_ACTION:
273                    jobListener.onWorkflowActionEvent((WorkflowActionEvent)event);
274                    break;
275                case COORDINATOR_JOB:
276                    jobListener.onCoordinatorJobEvent((CoordinatorJobEvent)event);
277                    break;
278                case COORDINATOR_ACTION:
279                    jobListener.onCoordinatorActionEvent((CoordinatorActionEvent)event);
280                    break;
281                case BUNDLE_JOB:
282                    jobListener.onBundleJobEvent((BundleJobEvent)event);
283                    break;
284                default:
285                    XLog.getLog(EventHandlerService.class).info("Undefined Job Event app-type - {0}",
286                            event.getAppType());
287            }
288        }
289
290        private void invokeSLAEventListener(SLAEventListener slaListener, SLAEvent event) {
291            switch (event.getEventStatus()) {
292                case START_MET:
293                    slaListener.onStartMet(event);
294                    break;
295                case START_MISS:
296                    slaListener.onStartMiss(event);
297                    break;
298                case END_MET:
299                    slaListener.onEndMet(event);
300                    break;
301                case END_MISS:
302                    slaListener.onEndMiss(event);
303                    break;
304                case DURATION_MET:
305                    slaListener.onDurationMet(event);
306                    break;
307                case DURATION_MISS:
308                    slaListener.onDurationMiss(event);
309                    break;
310                default:
311                    XLog.getLog(EventHandlerService.class).info("Undefined SLA event type - {0}", event.getSLAStatus());
312            }
313        }
314    }
315
316}