001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.event;
020
021import java.util.ArrayList;
022import java.util.List;
023import java.util.concurrent.ConcurrentLinkedQueue;
024import java.util.concurrent.atomic.AtomicInteger;
025
026import org.apache.hadoop.conf.Configuration;
027import org.apache.oozie.client.event.Event;
028import org.apache.oozie.service.EventHandlerService;
029import org.apache.oozie.util.XLog;
030
031/**
032 * An implementation of the EventQueue, defining a memory-based data structure
033 * holding the events
034 */
035public class MemoryEventQueue implements EventQueue {
036
037    private static ConcurrentLinkedQueue<EventQueueElement> eventQueue;
038    private static AtomicInteger currentSize;
039    private static int maxSize;
040    private static XLog LOG;
041    private static int batchSize;
042
043    @Override
044    public void init(Configuration conf) {
045        eventQueue = new ConcurrentLinkedQueue<EventQueueElement>();
046        maxSize = conf.getInt(EventHandlerService.CONF_QUEUE_SIZE, 10000);
047        currentSize = new AtomicInteger();
048        batchSize = conf.getInt(EventHandlerService.CONF_BATCH_SIZE, 10);
049        LOG = XLog.getLog(getClass());
050        LOG.info("Memory Event Queue initialized with Max size = [{0}], Batch drain size = [{1}]", maxSize, batchSize);
051    }
052
053    @Override
054    public int getBatchSize() {
055        return batchSize;
056    }
057
058    @Override
059    public void add(Event e) {
060        EventQueueElement eqe = new EventQueueElement(e);
061        try {
062            if (size() <= maxSize) {
063                if (eventQueue.add(eqe)) {
064                    currentSize.incrementAndGet();
065                }
066            }
067            else {
068                LOG.warn("Queue size [{0}] reached max limit. Element [{1}] not added", size(), e);
069            }
070        }
071        catch (IllegalStateException ise) {
072            LOG.warn("Unable to add event due to " + ise);
073        }
074    }
075
076    @Override
077    public List<Event> pollBatch() {
078        // batch drain
079        List<Event> eventBatch = new ArrayList<Event>();
080        for (int i = 0; i < batchSize; i++) {
081            EventQueueElement polled = eventQueue.poll();
082            if (polled != null) {
083                currentSize.decrementAndGet();
084                eventBatch.add(polled.event);
085            }
086            else {
087                LOG.trace("Current queue size [{0}] less than polling batch size [{1}]", currentSize.get(), batchSize);
088                break;
089            }
090        }
091        return eventBatch;
092    }
093
094    @Override
095    public Event poll() {
096        EventQueueElement polled = eventQueue.poll();
097        if (polled != null) {
098            currentSize.decrementAndGet();
099            return polled.event;
100        }
101        return null;
102    }
103
104    @Override
105    public boolean isEmpty() {
106        return size() == 0;
107    }
108
109    @Override
110    public int size() {
111        return currentSize.intValue();
112    }
113
114    @Override
115    public Event peek() {
116        EventQueueElement peeked = eventQueue.peek();
117        if (peeked != null) {
118            return peeked.event;
119        }
120        return null;
121    }
122
123    @Override
124    public void clear() {
125        eventQueue.clear();
126        currentSize.set(0);
127    }
128
129}