This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.oozie.event;
020
021 import java.util.ArrayList;
022 import java.util.List;
023 import java.util.concurrent.ConcurrentLinkedQueue;
024 import java.util.concurrent.atomic.AtomicInteger;
025
026 import org.apache.hadoop.conf.Configuration;
027 import org.apache.oozie.client.event.Event;
028 import org.apache.oozie.service.EventHandlerService;
029 import org.apache.oozie.util.XLog;
030
031 /**
032 * An implementation of the EventQueue, defining a memory-based data structure
033 * holding the events
034 */
035 public class MemoryEventQueue implements EventQueue {
036
037 private static ConcurrentLinkedQueue<EventQueueElement> eventQueue;
038 private static AtomicInteger currentSize;
039 private static int maxSize;
040 private static XLog LOG;
041 private static int batchSize;
042
043 @Override
044 public void init(Configuration conf) {
045 eventQueue = new ConcurrentLinkedQueue<EventQueueElement>();
046 maxSize = conf.getInt(EventHandlerService.CONF_QUEUE_SIZE, 10000);
047 currentSize = new AtomicInteger();
048 batchSize = conf.getInt(EventHandlerService.CONF_BATCH_SIZE, 10);
049 LOG = XLog.getLog(getClass());
050 LOG.info("Memory Event Queue initialized with Max size = [{0}], Batch drain size = [{1}]", maxSize, batchSize);
051 }
052
053 @Override
054 public int getBatchSize() {
055 return batchSize;
056 }
057
058 @Override
059 public void add(Event e) {
060 EventQueueElement eqe = new EventQueueElement(e);
061 try {
062 if (size() <= maxSize) {
063 if (eventQueue.add(eqe)) {
064 currentSize.incrementAndGet();
065 }
066 }
067 else {
068 LOG.warn("Queue size [{0}] reached max limit. Element [{1}] not added", size(), e);
069 }
070 }
071 catch (IllegalStateException ise) {
072 LOG.warn("Unable to add event due to " + ise);
073 }
074 }
075
076 @Override
077 public List<Event> pollBatch() {
078 // batch drain
079 List<Event> eventBatch = new ArrayList<Event>();
080 for (int i = 0; i < batchSize; i++) {
081 EventQueueElement polled = eventQueue.poll();
082 if (polled != null) {
083 currentSize.decrementAndGet();
084 eventBatch.add(polled.event);
085 }
086 else {
087 LOG.trace("Current queue size [{0}] less than polling batch size [{1}]", currentSize.get(), batchSize);
088 break;
089 }
090 }
091 return eventBatch;
092 }
093
094 @Override
095 public Event poll() {
096 EventQueueElement polled = eventQueue.poll();
097 if (polled != null) {
098 currentSize.decrementAndGet();
099 return polled.event;
100 }
101 return null;
102 }
103
104 @Override
105 public boolean isEmpty() {
106 return size() == 0;
107 }
108
109 @Override
110 public int size() {
111 return currentSize.intValue();
112 }
113
114 @Override
115 public Event peek() {
116 EventQueueElement peeked = eventQueue.peek();
117 if (peeked != null) {
118 return peeked.event;
119 }
120 return null;
121 }
122
123 @Override
124 public void clear() {
125 eventQueue.clear();
126 currentSize.set(0);
127 }
128
129 }