public class PriorityDelayQueue<E> extends AbstractQueue<PriorityDelayQueue.QueueElement<E>> implements BlockingQueue<PriorityDelayQueue.QueueElement<E>>
The PriorityDelayQueue
avoids starvation by raising elements priority as they age.
To support queuing elements into the future, the JDK DelayQueue
is used.
To support priority queuing, an array of DelayQueue
sub-queues is used. Elements are consumed from the
higher priority sub-queues first. From a sub-queue, elements are available based on their age.
To avoid starvation, there is is maximum wait time for an an element in a sub-queue, after the maximum wait time has elapsed, the element is promoted to the next higher priority sub-queue. Eventually it will reach the maximum priority sub-queue and it will be consumed when it is the oldest element in the that sub-queue.
Every time an element is promoted to a higher priority sub-queue, a new maximum wait time applies.
This class does not use a separate thread for anti-starvation check, instead, the check is performed on polling and seeking operations. This check is performed, the most every 1/2 second.
Modifier and Type | Class and Description |
---|---|
static class |
PriorityDelayQueue.QueueElement<E>
Element wrapper required by the queue.
|
Modifier and Type | Field and Description |
---|---|
static long |
ANTI_STARVATION_INTERVAL
Frequency, in milliseconds, of the anti-starvation check.
|
protected AtomicInteger |
currentSize |
protected ReentrantLock |
lock |
protected int |
priorities |
protected DelayQueue<PriorityDelayQueue.QueueElement<E>>[] |
queues |
Constructor and Description |
---|
PriorityDelayQueue(int priorities,
long maxWait,
TimeUnit unit,
int maxSize)
Create a
PriorityDelayQueue . |
Modifier and Type | Method and Description |
---|---|
boolean |
add(PriorityDelayQueue.QueueElement<E> queueElement)
Inserts the specified element into this queue if it is possible to do
so immediately without violating capacity restrictions, returning
true upon success and throwing an
IllegalStateException if no space is currently available.
|
protected void |
antiStarvation()
Run the anti-starvation check every
ANTI_STARVATION_INTERVAL milliseconds. |
void |
clear()
Removes all of the elements from this queue.
|
protected void |
debug(String msgTemplate,
Object... msgArgs)
Method for debugging purposes.
|
int |
drainTo(Collection<? super PriorityDelayQueue.QueueElement<E>> c)
Remove all available elements from this queue and adds them
to the given collection.
|
int |
drainTo(Collection<? super PriorityDelayQueue.QueueElement<E>> c,
int maxElements)
Remove at most the given number of available elements from
this queue and adds them to the given collection.
|
long |
getMaxSize()
Return the maximum queue size.
|
long |
getMaxWait(TimeUnit unit)
Return the max wait time for elements before they are promoted to the next higher priority.
|
int |
getPriorities()
Return number of priorities the queue supports.
|
Iterator<PriorityDelayQueue.QueueElement<E>> |
iterator()
Return an iterator over all the
PriorityDelayQueue.QueueElement elements (both expired and unexpired) in this queue. |
boolean |
offer(PriorityDelayQueue.QueueElement<E> queueElement)
Insert the specified element into the queue.
|
boolean |
offer(PriorityDelayQueue.QueueElement<E> e,
long timeout,
TimeUnit unit)
Insert the specified element into this queue, waiting up to the
specified wait time if necessary for space to become available.
|
PriorityDelayQueue.QueueElement<E> |
peek()
Retrieve, but does not remove, the head of this queue, or returns null if this queue is empty.
|
PriorityDelayQueue.QueueElement<E> |
poll()
Retrieve and remove the head of this queue, or return null if this queue has no elements with an expired
delay.
|
PriorityDelayQueue.QueueElement<E> |
poll(long timeout,
TimeUnit unit)
Retrieve and removes the head of this queue, waiting up to the
specified wait time if necessary for an element to become available.
|
void |
put(PriorityDelayQueue.QueueElement<E> e)
Insert the specified element into this queue, waiting if necessary
for space to become available.
|
int |
remainingCapacity()
Return the number of additional elements that this queue can ideally
(in the absence of memory or resource constraints) accept without
blocking, or Integer.MAX_VALUE if there is no intrinsic
limit.
|
int |
size()
Return the number of elements in the queue.
|
int[] |
sizes()
Return the number of elements on each priority sub-queue.
|
PriorityDelayQueue.QueueElement<E> |
take()
Retrieve and removes the head of this queue, waiting if necessary
until an element becomes available.
|
addAll, element, remove
contains, containsAll, isEmpty, remove, removeAll, retainAll, toArray, toArray, toString
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
contains, remove
addAll, containsAll, equals, hashCode, isEmpty, parallelStream, removeAll, removeIf, retainAll, spliterator, stream, toArray, toArray
public static final long ANTI_STARVATION_INTERVAL
protected int priorities
protected DelayQueue<PriorityDelayQueue.QueueElement<E>>[] queues
protected final transient ReentrantLock lock
protected AtomicInteger currentSize
public PriorityDelayQueue(int priorities, long maxWait, TimeUnit unit, int maxSize)
PriorityDelayQueue
.priorities
- number of priorities the queue will support.maxWait
- max wait time for elements before they are promoted to the next higher priority.unit
- time unit of the max wait time.maxSize
- maximum size of the queue, -1 means unbounded.public int getPriorities()
public long getMaxWait(TimeUnit unit)
unit
- time unit of the max wait time.public long getMaxSize()
-1
the queue is unbounded.public Iterator<PriorityDelayQueue.QueueElement<E>> iterator()
PriorityDelayQueue.QueueElement
elements (both expired and unexpired) in this queue. The
iterator does not return the elements in any particular order. The returned Iterator is a "weakly
consistent" iterator that will never throw ConcurrentModificationException
, and guarantees to traverse
elements as they existed upon construction of the iterator, and may (but is not guaranteed to) reflect any
modifications subsequent to construction.iterator
in interface Iterable<PriorityDelayQueue.QueueElement<E>>
iterator
in interface Collection<PriorityDelayQueue.QueueElement<E>>
iterator
in class AbstractCollection<PriorityDelayQueue.QueueElement<E>>
PriorityDelayQueue.QueueElement
elements in this queue.public int size()
size
in interface Collection<PriorityDelayQueue.QueueElement<E>>
size
in class AbstractCollection<PriorityDelayQueue.QueueElement<E>>
public int[] sizes()
public boolean add(PriorityDelayQueue.QueueElement<E> queueElement)
offer
.add
in interface Collection<PriorityDelayQueue.QueueElement<E>>
add
in interface BlockingQueue<PriorityDelayQueue.QueueElement<E>>
add
in interface Queue<PriorityDelayQueue.QueueElement<E>>
add
in class AbstractQueue<PriorityDelayQueue.QueueElement<E>>
queueElement
- the PriorityDelayQueue.QueueElement
element to add.Collection.add(E)
)IllegalStateException
- if the element cannot be added at this
time due to capacity restrictionsClassCastException
- if the class of the specified element
prevents it from being added to this queueNullPointerException
- if the specified element is nullIllegalArgumentException
- if some property of the specified
element prevents it from being added to this queuepublic boolean offer(PriorityDelayQueue.QueueElement<E> queueElement)
The element is added with minimun priority and no delay.
offer
in interface BlockingQueue<PriorityDelayQueue.QueueElement<E>>
offer
in interface Queue<PriorityDelayQueue.QueueElement<E>>
queueElement
- the element to add.NullPointerException
- if the specified element is nullpublic PriorityDelayQueue.QueueElement<E> poll()
The retrieved element is the oldest one from the highest priority sub-queue.
Invocations to this method run the anti-starvation (once every interval check).
poll
in interface Queue<PriorityDelayQueue.QueueElement<E>>
public PriorityDelayQueue.QueueElement<E> peek()
peek
in interface Queue<PriorityDelayQueue.QueueElement<E>>
protected void antiStarvation()
ANTI_STARVATION_INTERVAL
milliseconds.
It promotes elements beyond max wait time to the next higher priority sub-queue.
protected void debug(String msgTemplate, Object... msgArgs)
This method should be overriden for logging purposes.
Message templates used by this class are in JDK's MessageFormat syntax.
msgTemplate
- message template.msgArgs
- arguments for the message template.public void put(PriorityDelayQueue.QueueElement<E> e) throws InterruptedException
NOTE: This method is to fulfill the BlockingQueue interface. Not implemented in the most optimal way.
put
in interface BlockingQueue<PriorityDelayQueue.QueueElement<E>>
e
- the element to addInterruptedException
- if interrupted while waitingClassCastException
- if the class of the specified element
prevents it from being added to this queueNullPointerException
- if the specified element is nullIllegalArgumentException
- if some property of the specified
element prevents it from being added to this queuepublic boolean offer(PriorityDelayQueue.QueueElement<E> e, long timeout, TimeUnit unit) throws InterruptedException
IMPORTANT: This implementation forces the addition of the element to the queue regardless of the queue current size. The timeout value is ignored as the element is added immediately.
NOTE: This method is to fulfill the BlockingQueue interface. Not implemented in the most optimal way.
offer
in interface BlockingQueue<PriorityDelayQueue.QueueElement<E>>
e
- the element to addtimeout
- how long to wait before giving up, in units of
unitunit
- a TimeUnit determining how to interpret the
timeout parameterInterruptedException
- if interrupted while waitingClassCastException
- if the class of the specified element
prevents it from being added to this queueNullPointerException
- if the specified element is nullIllegalArgumentException
- if some property of the specified
element prevents it from being added to this queuepublic PriorityDelayQueue.QueueElement<E> take() throws InterruptedException
IMPORTANT: This implementation has a delay of up to 10ms (when the queue is empty) to detect a new element is available. It is doing a 10ms sleep.
NOTE: This method is to fulfill the BlockingQueue interface. Not implemented in the most optimal way.
take
in interface BlockingQueue<PriorityDelayQueue.QueueElement<E>>
InterruptedException
- if interrupted while waitingpublic PriorityDelayQueue.QueueElement<E> poll(long timeout, TimeUnit unit) throws InterruptedException
NOTE: This method is to fulfill the BlockingQueue interface. Not implemented in the most optimal way.
poll
in interface BlockingQueue<PriorityDelayQueue.QueueElement<E>>
timeout
- how long to wait before giving up, in units of
unitunit
- a TimeUnit determining how to interpret the
timeout parameterInterruptedException
- if interrupted while waitingpublic int remainingCapacity()
Note that you cannot always tell if an attempt to insert an element will succeed by inspecting remainingCapacity because it may be the case that another thread is about to insert or remove an element.
NOTE: This method is to fulfill the BlockingQueue interface. Not implemented in the most optimal way.
remainingCapacity
in interface BlockingQueue<PriorityDelayQueue.QueueElement<E>>
public int drainTo(Collection<? super PriorityDelayQueue.QueueElement<E>> c)
NOTE: This method is to fulfill the BlockingQueue interface. Not implemented in the most optimal way.
drainTo
in interface BlockingQueue<PriorityDelayQueue.QueueElement<E>>
c
- the collection to transfer elements intoUnsupportedOperationException
- if addition of elements
is not supported by the specified collectionClassCastException
- if the class of an element of this queue
prevents it from being added to the specified collectionNullPointerException
- if the specified collection is nullIllegalArgumentException
- if the specified collection is this
queue, or some property of an element of this queue prevents
it from being added to the specified collectionpublic int drainTo(Collection<? super PriorityDelayQueue.QueueElement<E>> c, int maxElements)
NOTE: This method is to fulfill the BlockingQueue interface. Not implemented in the most optimal way.
drainTo
in interface BlockingQueue<PriorityDelayQueue.QueueElement<E>>
c
- the collection to transfer elements intomaxElements
- the maximum number of elements to transferUnsupportedOperationException
- if addition of elements
is not supported by the specified collectionClassCastException
- if the class of an element of this queue
prevents it from being added to the specified collectionNullPointerException
- if the specified collection is nullIllegalArgumentException
- if the specified collection is this
queue, or some property of an element of this queue prevents
it from being added to the specified collectionpublic void clear()
clear
in interface Collection<PriorityDelayQueue.QueueElement<E>>
clear
in class AbstractQueue<PriorityDelayQueue.QueueElement<E>>
Copyright © 2018 Apache Software Foundation. All rights reserved.