001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.oozie.event;
020    
021    import java.util.ArrayList;
022    import java.util.List;
023    import java.util.concurrent.ConcurrentLinkedQueue;
024    import java.util.concurrent.atomic.AtomicInteger;
025    
026    import org.apache.hadoop.conf.Configuration;
027    import org.apache.oozie.client.event.Event;
028    import org.apache.oozie.service.EventHandlerService;
029    import org.apache.oozie.util.XLog;
030    
031    /**
032     * An implementation of the EventQueue, defining a memory-based data structure
033     * holding the events
034     */
035    public class MemoryEventQueue implements EventQueue {
036    
037        private static ConcurrentLinkedQueue<EventQueueElement> eventQueue;
038        private static AtomicInteger currentSize;
039        private static int maxSize;
040        private static XLog LOG;
041        private static int batchSize;
042    
043        @Override
044        public void init(Configuration conf) {
045            eventQueue = new ConcurrentLinkedQueue<EventQueueElement>();
046            maxSize = conf.getInt(EventHandlerService.CONF_QUEUE_SIZE, 10000);
047            currentSize = new AtomicInteger();
048            batchSize = conf.getInt(EventHandlerService.CONF_BATCH_SIZE, 10);
049            LOG = XLog.getLog(getClass());
050            LOG.info("Memory Event Queue initialized with Max size = [{0}], Batch drain size = [{1}]", maxSize, batchSize);
051        }
052    
053        @Override
054        public int getBatchSize() {
055            return batchSize;
056        }
057    
058        @Override
059        public void add(Event e) {
060            EventQueueElement eqe = new EventQueueElement(e);
061            try {
062                if (size() <= maxSize) {
063                    if (eventQueue.add(eqe)) {
064                        currentSize.incrementAndGet();
065                    }
066                }
067                else {
068                    LOG.warn("Queue size [{0}] reached max limit. Element [{1}] not added", size(), e);
069                }
070            }
071            catch (IllegalStateException ise) {
072                LOG.warn("Unable to add event due to " + ise);
073            }
074        }
075    
076        @Override
077        public List<Event> pollBatch() {
078            // batch drain
079            List<Event> eventBatch = new ArrayList<Event>();
080            for (int i = 0; i < batchSize; i++) {
081                EventQueueElement polled = eventQueue.poll();
082                if (polled != null) {
083                    currentSize.decrementAndGet();
084                    eventBatch.add(polled.event);
085                }
086                else {
087                    LOG.trace("Current queue size [{0}] less than polling batch size [{1}]", currentSize.get(), batchSize);
088                    break;
089                }
090            }
091            return eventBatch;
092        }
093    
094        @Override
095        public Event poll() {
096            EventQueueElement polled = eventQueue.poll();
097            if (polled != null) {
098                currentSize.decrementAndGet();
099                return polled.event;
100            }
101            return null;
102        }
103    
104        @Override
105        public boolean isEmpty() {
106            return size() == 0;
107        }
108    
109        @Override
110        public int size() {
111            return currentSize.intValue();
112        }
113    
114        @Override
115        public Event peek() {
116            EventQueueElement peeked = eventQueue.peek();
117            if (peeked != null) {
118                return peeked.event;
119            }
120            return null;
121        }
122    
123        @Override
124        public void clear() {
125            eventQueue.clear();
126            currentSize.set(0);
127        }
128    
129    }