001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.service;
020
021
022import java.util.ArrayList;
023import java.util.HashMap;
024import java.util.HashSet;
025import java.util.Iterator;
026import java.util.List;
027import java.util.Map;
028import java.util.Map.Entry;
029import java.util.Set;
030
031import org.apache.hadoop.conf.Configuration;
032import org.apache.oozie.ErrorCode;
033import org.apache.oozie.action.hadoop.PasswordMasker;
034import org.apache.oozie.client.event.Event;
035import org.apache.oozie.client.event.Event.MessageType;
036import org.apache.oozie.client.event.JobEvent;
037import org.apache.oozie.client.event.SLAEvent;
038import org.apache.oozie.event.BundleJobEvent;
039import org.apache.oozie.event.CoordinatorActionEvent;
040import org.apache.oozie.event.CoordinatorJobEvent;
041import org.apache.oozie.event.EventQueue;
042import org.apache.oozie.event.MemoryEventQueue;
043import org.apache.oozie.event.WorkflowActionEvent;
044import org.apache.oozie.event.WorkflowJobEvent;
045import org.apache.oozie.event.listener.JobEventListener;
046import org.apache.oozie.sla.listener.SLAEventListener;
047import org.apache.oozie.util.LogUtils;
048import org.apache.oozie.util.XLog;
049
050/**
051 * Service class that handles the events system - creating events queue,
052 * managing configured properties and managing and invoking various event
053 * listeners via worker threads
054 */
055public class EventHandlerService implements Service {
056
057    public static final String CONF_PREFIX = Service.CONF_PREFIX + "EventHandlerService.";
058    public static final String CONF_QUEUE_SIZE = CONF_PREFIX + "queue.size";
059    public static final String CONF_EVENT_QUEUE = CONF_PREFIX + "event.queue";
060    public static final String CONF_LISTENERS = CONF_PREFIX + "event.listeners";
061    public static final String CONF_FILTER_APP_TYPES = CONF_PREFIX + "filter.app.types";
062    public static final String CONF_BATCH_SIZE = CONF_PREFIX + "batch.size";
063    public static final String CONF_WORKER_THREADS = CONF_PREFIX + "worker.threads";
064    public static final String CONF_WORKER_INTERVAL = CONF_PREFIX + "worker.interval";
065
066    private static EventQueue eventQueue;
067    private XLog LOG;
068    private Map<MessageType, List<?>> listenerMap = new HashMap<MessageType, List<?>>();
069    private Set<String> apptypes;
070    private static boolean eventsEnabled = false;
071    private int numWorkers;
072
073    @Override
074    public void init(Services services) throws ServiceException {
075        try {
076            Configuration conf = services.getConf();
077            LOG = XLog.getLog(getClass());
078            Class<? extends EventQueue> queueImpl = (Class<? extends EventQueue>) ConfigurationService.getClass
079                    (conf, CONF_EVENT_QUEUE);
080            eventQueue = queueImpl == null ? new MemoryEventQueue() : (EventQueue) queueImpl.newInstance();
081            eventQueue.init(conf);
082            // initialize app-types to switch on events for
083            initApptypes(conf);
084            // initialize event listeners
085            initEventListeners(conf);
086            // initialize worker threads via Scheduler
087            initWorkerThreads(conf, services);
088            eventsEnabled = true;
089            LOG.info("EventHandlerService initialized. Event queue = [{0}], Event listeners configured = [{1}],"
090                    + " Events configured for App-types = [{2}], Num Worker Threads = [{3}]", eventQueue.getClass()
091                    .getName(), listenerMap.toString(), apptypes, numWorkers);
092        }
093        catch (Exception ex) {
094            throw new ServiceException(ErrorCode.E0100, ex.getMessage(), ex);
095        }
096    }
097
098    private void initApptypes(Configuration conf) {
099        apptypes = new HashSet<String>();
100        for (String jobtype : ConfigurationService.getStrings(conf, CONF_FILTER_APP_TYPES)) {
101            String tmp = jobtype.trim().toLowerCase();
102            if (tmp.length() == 0) {
103                continue;
104            }
105            apptypes.add(tmp);
106        }
107    }
108
109    private void initEventListeners(Configuration conf) throws Exception {
110        Class<?>[] listenerClass = ConfigurationService.getClasses(conf, CONF_LISTENERS);
111        for (int i = 0; i < listenerClass.length; i++) {
112            Object listener = null;
113            try {
114                listener = listenerClass[i].newInstance();
115            }
116            catch (InstantiationException e) {
117                LOG.warn("Could not create event listener instance, " + e);
118            }
119            catch (IllegalAccessException e) {
120                LOG.warn("Illegal access to event listener instance, " + e);
121            }
122            addEventListener(listener, conf, listenerClass[i].getName());
123        }
124    }
125
126    @SuppressWarnings({ "rawtypes", "unchecked" })
127    public void addEventListener(Object listener, Configuration conf, String name) throws Exception {
128        if (listener instanceof JobEventListener) {
129            List listenersList = listenerMap.get(MessageType.JOB);
130            if (listenersList == null) {
131                listenersList = new ArrayList();
132                listenerMap.put(MessageType.JOB, listenersList);
133            }
134            listenersList.add(listener);
135            ((JobEventListener) listener).init(conf);
136        }
137        else if (listener instanceof SLAEventListener) {
138            List listenersList = listenerMap.get(MessageType.SLA);
139            if (listenersList == null) {
140                listenersList = new ArrayList();
141                listenerMap.put(MessageType.SLA, listenersList);
142            }
143            listenersList.add(listener);
144            ((SLAEventListener) listener).init(conf);
145        }
146        else {
147            LOG.warn("Event listener [{0}] is of undefined type", name);
148        }
149    }
150
151    public static boolean isEnabled() {
152        return eventsEnabled;
153    }
154
155    private void initWorkerThreads(Configuration conf, Services services) throws ServiceException {
156        numWorkers = ConfigurationService.getInt(conf, CONF_WORKER_THREADS);
157        int interval = ConfigurationService.getInt(conf, CONF_WORKER_INTERVAL);
158        SchedulerService ss = services.get(SchedulerService.class);
159        int available = ss.getSchedulableThreads(conf);
160        if (numWorkers + 3 > available) {
161            throw new ServiceException(ErrorCode.E0100, getClass().getName(), "Event worker threads requested ["
162                    + numWorkers + "] cannot be handled with current settings. Increase "
163                    + SchedulerService.SCHEDULER_THREADS);
164        }
165        Runnable eventWorker = new EventWorker();
166        // schedule staggered runnables every 1 min interval by default
167        for (int i = 0; i < numWorkers; i++) {
168            ss.schedule(eventWorker, 10 + i * 20, interval, SchedulerService.Unit.SEC);
169        }
170    }
171
172    @Override
173    public void destroy() {
174        eventsEnabled = false;
175
176        for (Entry<MessageType, List<?>> entry : listenerMap.entrySet()) {
177            List<?> listeners = entry.getValue();
178            MessageType type = entry.getKey();
179
180            for (Object listener : listeners) {
181                if (type == MessageType.JOB) {
182                    ((JobEventListener) listener).destroy();
183                } else if (type == MessageType.SLA) {
184                    ((SLAEventListener) listener).destroy();
185                }
186            }
187        }
188    }
189
190    @Override
191    public Class<? extends Service> getInterface() {
192        return EventHandlerService.class;
193    }
194
195    public boolean isSupportedApptype(String appType) {
196        if (!apptypes.contains(appType.toLowerCase())) {
197            return false;
198        }
199        return true;
200    }
201
202    public void setAppTypes(Set<String> types) {
203        apptypes = types;
204    }
205
206    public Set<String> getAppTypes() {
207        return apptypes;
208    }
209
210    public String listEventListeners() {
211        return listenerMap.toString();
212    }
213
214    public void queueEvent(Event event) {
215        LOG = LogUtils.setLogPrefix(LOG, event);
216        LOG.debug("Queueing event : {0}", event);
217        LOG.trace("Stack trace while queueing event : {0}", event, new Throwable());
218        eventQueue.add(event);
219        LogUtils.clearLogPrefix();
220    }
221
222    public EventQueue getEventQueue() {
223        return eventQueue;
224    }
225
226    public class EventWorker implements Runnable {
227
228        @Override
229        public void run() {
230            if (Thread.currentThread().isInterrupted()) {
231                return;
232            }
233            try {
234                if (!eventQueue.isEmpty()) {
235                    List<Event> work = eventQueue.pollBatch();
236                    for (Event event : work) {
237                        LOG = LogUtils.setLogPrefix(LOG, event);
238                        LOG.debug("Processing event : {0}", event);
239                        MessageType msgType = event.getMsgType();
240                        List<?> listeners = listenerMap.get(msgType);
241                        if (listeners != null) {
242                            Iterator<?> iter = listeners.iterator();
243                            while (iter.hasNext()) {
244                                try {
245                                    if (msgType == MessageType.JOB) {
246                                        invokeJobEventListener((JobEventListener) iter.next(), (JobEvent) event);
247                                    }
248                                    else if (msgType == MessageType.SLA) {
249                                        invokeSLAEventListener((SLAEventListener) iter.next(), (SLAEvent) event);
250                                    }
251                                    else {
252                                        iter.next();
253                                    }
254                                }
255                                catch (Throwable error) {
256                                    XLog.getLog(EventHandlerService.class).debug("Throwable in EventWorker thread run : ",
257                                            error);
258                                    XLog.getLog(EventHandlerService.class).warn("Throwable in EventWorker thread run. " +
259                                                    "Error message: {0}",
260                                            new PasswordMasker().maskPasswordsIfNecessary(error.getMessage()));
261                                }
262                            }
263                        }
264                    }
265                }
266            }
267            catch (Throwable error) {
268                XLog.getLog(EventHandlerService.class).debug("Throwable in EventWorker thread run : ",
269                        error);
270                XLog.getLog(EventHandlerService.class).warn("Throwable in EventWorker thread run. " +
271                                "Error message: {0}",
272                        new PasswordMasker().maskPasswordsIfNecessary(error.getMessage()));
273            }
274        }
275
276        private void invokeJobEventListener(JobEventListener jobListener, JobEvent event) {
277            switch (event.getAppType()) {
278                case WORKFLOW_JOB:
279                    jobListener.onWorkflowJobEvent((WorkflowJobEvent)event);
280                    break;
281                case WORKFLOW_ACTION:
282                    jobListener.onWorkflowActionEvent((WorkflowActionEvent)event);
283                    break;
284                case COORDINATOR_JOB:
285                    jobListener.onCoordinatorJobEvent((CoordinatorJobEvent)event);
286                    break;
287                case COORDINATOR_ACTION:
288                    jobListener.onCoordinatorActionEvent((CoordinatorActionEvent)event);
289                    break;
290                case BUNDLE_JOB:
291                    jobListener.onBundleJobEvent((BundleJobEvent)event);
292                    break;
293                default:
294                    XLog.getLog(EventHandlerService.class).info("Undefined Job Event app-type - {0}",
295                            event.getAppType());
296            }
297        }
298
299        private void invokeSLAEventListener(SLAEventListener slaListener, SLAEvent event) {
300            switch (event.getEventStatus()) {
301                case START_MET:
302                    slaListener.onStartMet(event);
303                    break;
304                case START_MISS:
305                    slaListener.onStartMiss(event);
306                    break;
307                case END_MET:
308                    slaListener.onEndMet(event);
309                    break;
310                case END_MISS:
311                    slaListener.onEndMiss(event);
312                    break;
313                case DURATION_MET:
314                    slaListener.onDurationMet(event);
315                    break;
316                case DURATION_MISS:
317                    slaListener.onDurationMiss(event);
318                    break;
319                default:
320                    XLog.getLog(EventHandlerService.class).info("Undefined SLA event type - {0}", event.getSLAStatus());
321            }
322        }
323    }
324
325}