001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.oozie.event; 020 021 import java.util.ArrayList; 022 import java.util.List; 023 import java.util.concurrent.ConcurrentLinkedQueue; 024 import java.util.concurrent.atomic.AtomicInteger; 025 026 import org.apache.hadoop.conf.Configuration; 027 import org.apache.oozie.client.event.Event; 028 import org.apache.oozie.service.EventHandlerService; 029 import org.apache.oozie.util.XLog; 030 031 /** 032 * An implementation of the EventQueue, defining a memory-based data structure 033 * holding the events 034 */ 035 public class MemoryEventQueue implements EventQueue { 036 037 private static ConcurrentLinkedQueue<EventQueueElement> eventQueue; 038 private static AtomicInteger currentSize; 039 private static int maxSize; 040 private static XLog LOG; 041 private static int batchSize; 042 043 @Override 044 public void init(Configuration conf) { 045 eventQueue = new ConcurrentLinkedQueue<EventQueueElement>(); 046 maxSize = conf.getInt(EventHandlerService.CONF_QUEUE_SIZE, 10000); 047 currentSize = new AtomicInteger(); 048 batchSize = conf.getInt(EventHandlerService.CONF_BATCH_SIZE, 10); 049 LOG = XLog.getLog(getClass()); 050 LOG.info("Memory Event Queue initialized with Max size = [{0}], Batch drain size = [{1}]", maxSize, batchSize); 051 } 052 053 @Override 054 public int getBatchSize() { 055 return batchSize; 056 } 057 058 @Override 059 public void add(Event e) { 060 EventQueueElement eqe = new EventQueueElement(e); 061 try { 062 if (size() <= maxSize) { 063 if (eventQueue.add(eqe)) { 064 currentSize.incrementAndGet(); 065 } 066 } 067 else { 068 LOG.warn("Queue size [{0}] reached max limit. Element [{1}] not added", size(), e); 069 } 070 } 071 catch (IllegalStateException ise) { 072 LOG.warn("Unable to add event due to " + ise); 073 } 074 } 075 076 @Override 077 public List<Event> pollBatch() { 078 // batch drain 079 List<Event> eventBatch = new ArrayList<Event>(); 080 for (int i = 0; i < batchSize; i++) { 081 EventQueueElement polled = eventQueue.poll(); 082 if (polled != null) { 083 currentSize.decrementAndGet(); 084 eventBatch.add(polled.event); 085 } 086 else { 087 LOG.trace("Current queue size [{0}] less than polling batch size [{1}]", currentSize.get(), batchSize); 088 break; 089 } 090 } 091 return eventBatch; 092 } 093 094 @Override 095 public Event poll() { 096 EventQueueElement polled = eventQueue.poll(); 097 if (polled != null) { 098 currentSize.decrementAndGet(); 099 return polled.event; 100 } 101 return null; 102 } 103 104 @Override 105 public boolean isEmpty() { 106 return size() == 0; 107 } 108 109 @Override 110 public int size() { 111 return currentSize.intValue(); 112 } 113 114 @Override 115 public Event peek() { 116 EventQueueElement peeked = eventQueue.peek(); 117 if (peeked != null) { 118 return peeked.event; 119 } 120 return null; 121 } 122 123 @Override 124 public void clear() { 125 eventQueue.clear(); 126 currentSize.set(0); 127 } 128 129 }