001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 020package org.apache.oozie.event; 021 022import java.util.ArrayList; 023import java.util.List; 024import java.util.concurrent.ConcurrentLinkedQueue; 025import java.util.concurrent.atomic.AtomicInteger; 026 027import org.apache.hadoop.conf.Configuration; 028import org.apache.oozie.client.event.Event; 029import org.apache.oozie.service.ConfigurationService; 030import org.apache.oozie.service.EventHandlerService; 031import org.apache.oozie.util.XLog; 032 033/** 034 * An implementation of the EventQueue, defining a memory-based data structure 035 * holding the events 036 */ 037public class MemoryEventQueue implements EventQueue { 038 039 private static ConcurrentLinkedQueue<EventQueueElement> eventQueue; 040 private static AtomicInteger currentSize; 041 private static int maxSize; 042 private static XLog LOG; 043 private static int batchSize; 044 045 @Override 046 public void init(Configuration conf) { 047 eventQueue = new ConcurrentLinkedQueue<EventQueueElement>(); 048 maxSize = ConfigurationService.getInt(conf, EventHandlerService.CONF_QUEUE_SIZE); 049 currentSize = new AtomicInteger(); 050 batchSize = ConfigurationService.getInt(conf, EventHandlerService.CONF_BATCH_SIZE); 051 LOG = XLog.getLog(getClass()); 052 LOG.info("Memory Event Queue initialized with Max size = [{0}], Batch drain size = [{1}]", maxSize, batchSize); 053 } 054 055 @Override 056 public int getBatchSize() { 057 return batchSize; 058 } 059 060 @Override 061 public void add(Event e) { 062 EventQueueElement eqe = new EventQueueElement(e); 063 try { 064 if (size() <= maxSize) { 065 if (eventQueue.add(eqe)) { 066 currentSize.incrementAndGet(); 067 } 068 } 069 else { 070 LOG.warn("Queue size [{0}] reached max limit. Element [{1}] not added", size(), e); 071 } 072 } 073 catch (IllegalStateException ise) { 074 LOG.warn("Unable to add event due to " + ise); 075 } 076 } 077 078 @Override 079 public List<Event> pollBatch() { 080 // batch drain 081 List<Event> eventBatch = new ArrayList<Event>(); 082 for (int i = 0; i < batchSize; i++) { 083 EventQueueElement polled = eventQueue.poll(); 084 if (polled != null) { 085 currentSize.decrementAndGet(); 086 eventBatch.add(polled.event); 087 } 088 else { 089 LOG.trace("Current queue size [{0}] less than polling batch size [{1}]", currentSize.get(), batchSize); 090 break; 091 } 092 } 093 return eventBatch; 094 } 095 096 @Override 097 public Event poll() { 098 EventQueueElement polled = eventQueue.poll(); 099 if (polled != null) { 100 currentSize.decrementAndGet(); 101 return polled.event; 102 } 103 return null; 104 } 105 106 @Override 107 public boolean isEmpty() { 108 return size() == 0; 109 } 110 111 @Override 112 public int size() { 113 return currentSize.intValue(); 114 } 115 116 @Override 117 public Event peek() { 118 EventQueueElement peeked = eventQueue.peek(); 119 if (peeked != null) { 120 return peeked.event; 121 } 122 return null; 123 } 124 125 @Override 126 public void clear() { 127 eventQueue.clear(); 128 currentSize.set(0); 129 } 130 131}