org.apache.oozie.util
Class PriorityDelayQueue<E>

java.lang.Object
  extended by java.util.AbstractCollection<E>
      extended by java.util.AbstractQueue<PriorityDelayQueue.QueueElement<E>>
          extended by org.apache.oozie.util.PriorityDelayQueue<E>
All Implemented Interfaces:
Iterable<PriorityDelayQueue.QueueElement<E>>, Collection<PriorityDelayQueue.QueueElement<E>>, BlockingQueue<PriorityDelayQueue.QueueElement<E>>, Queue<PriorityDelayQueue.QueueElement<E>>
Direct Known Subclasses:
PollablePriorityDelayQueue

public class PriorityDelayQueue<E>
extends AbstractQueue<PriorityDelayQueue.QueueElement<E>>
implements BlockingQueue<PriorityDelayQueue.QueueElement<E>>

A Queue implementation that support queuing elements into the future and priority queuing.

The PriorityDelayQueue avoids starvation by raising elements priority as they age.

To support queuing elements into the future, the JDK DelayQueue is used.

To support priority queuing, an array of DelayQueue sub-queues is used. Elements are consumed from the higher priority sub-queues first. From a sub-queue, elements are available based on their age.

To avoid starvation, there is is maximum wait time for an an element in a sub-queue, after the maximum wait time has elapsed, the element is promoted to the next higher priority sub-queue. Eventually it will reach the maximum priority sub-queue and it will be consumed when it is the oldest element in the that sub-queue.

Every time an element is promoted to a higher priority sub-queue, a new maximum wait time applies.

This class does not use a separate thread for anti-starvation check, instead, the check is performed on polling and seeking operations. This check is performed, the most every 1/2 second.


Nested Class Summary
static class PriorityDelayQueue.QueueElement<E>
          Element wrapper required by the queue.
 
Field Summary
static long ANTI_STARVATION_INTERVAL
          Frequency, in milliseconds, of the anti-starvation check.
protected  AtomicInteger currentSize
           
protected  ReentrantLock lock
           
protected  int priorities
           
protected  DelayQueue<PriorityDelayQueue.QueueElement<E>>[] queues
           
 
Constructor Summary
PriorityDelayQueue(int priorities, long maxWait, TimeUnit unit, int maxSize)
          Create a PriorityDelayQueue.
 
Method Summary
 boolean add(PriorityDelayQueue.QueueElement<E> queueElement)
          Inserts the specified element into this queue if it is possible to do so immediately without violating capacity restrictions, returning true upon success and throwing an IllegalStateException if no space is currently available.
protected  void antiStarvation()
          Run the anti-starvation check every ANTI_STARVATION_INTERVAL milliseconds.
 void clear()
          Removes all of the elements from this queue.
protected  void debug(String msgTemplate, Object... msgArgs)
          Method for debugging purposes.
 int drainTo(Collection<? super PriorityDelayQueue.QueueElement<E>> c)
          Remove all available elements from this queue and adds them to the given collection.
 int drainTo(Collection<? super PriorityDelayQueue.QueueElement<E>> c, int maxElements)
          Remove at most the given number of available elements from this queue and adds them to the given collection.
 long getMaxSize()
          Return the maximum queue size.
 long getMaxWait(TimeUnit unit)
          Return the max wait time for elements before they are promoted to the next higher priority.
 int getPriorities()
          Return number of priorities the queue supports.
 Iterator<PriorityDelayQueue.QueueElement<E>> iterator()
          Return an iterator over all the PriorityDelayQueue.QueueElement elements (both expired and unexpired) in this queue.
 boolean offer(PriorityDelayQueue.QueueElement<E> queueElement)
          Insert the specified element into the queue.
 boolean offer(PriorityDelayQueue.QueueElement<E> e, long timeout, TimeUnit unit)
          Insert the specified element into this queue, waiting up to the specified wait time if necessary for space to become available.
 PriorityDelayQueue.QueueElement<E> peek()
          Retrieve, but does not remove, the head of this queue, or returns null if this queue is empty.
 PriorityDelayQueue.QueueElement<E> poll()
          Retrieve and remove the head of this queue, or return null if this queue has no elements with an expired delay.
 PriorityDelayQueue.QueueElement<E> poll(long timeout, TimeUnit unit)
          Retrieve and removes the head of this queue, waiting up to the specified wait time if necessary for an element to become available.
 void put(PriorityDelayQueue.QueueElement<E> e)
          Insert the specified element into this queue, waiting if necessary for space to become available.
 int remainingCapacity()
          Return the number of additional elements that this queue can ideally (in the absence of memory or resource constraints) accept without blocking, or Integer.MAX_VALUE if there is no intrinsic limit.
 int size()
          Return the number of elements in the queue.
 int[] sizes()
          Return the number of elements on each priority sub-queue.
 PriorityDelayQueue.QueueElement<E> take()
          Retrieve and removes the head of this queue, waiting if necessary until an element becomes available.
 
Methods inherited from class java.util.AbstractQueue
addAll, element, remove
 
Methods inherited from class java.util.AbstractCollection
contains, containsAll, isEmpty, remove, removeAll, retainAll, toArray, toArray, toString
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
 
Methods inherited from interface java.util.concurrent.BlockingQueue
contains, remove
 
Methods inherited from interface java.util.Queue
element, remove
 
Methods inherited from interface java.util.Collection
addAll, containsAll, equals, hashCode, isEmpty, removeAll, retainAll, toArray, toArray
 

Field Detail

ANTI_STARVATION_INTERVAL

public static final long ANTI_STARVATION_INTERVAL
Frequency, in milliseconds, of the anti-starvation check.

See Also:
Constant Field Values

priorities

protected int priorities

queues

protected DelayQueue<PriorityDelayQueue.QueueElement<E>>[] queues

lock

protected final transient ReentrantLock lock

currentSize

protected AtomicInteger currentSize
Constructor Detail

PriorityDelayQueue

public PriorityDelayQueue(int priorities,
                          long maxWait,
                          TimeUnit unit,
                          int maxSize)
Create a PriorityDelayQueue.

Parameters:
priorities - number of priorities the queue will support.
maxWait - max wait time for elements before they are promoted to the next higher priority.
unit - time unit of the max wait time.
maxSize - maximum size of the queue, -1 means unbounded.
Method Detail

getPriorities

public int getPriorities()
Return number of priorities the queue supports.

Returns:
number of priorities the queue supports.

getMaxWait

public long getMaxWait(TimeUnit unit)
Return the max wait time for elements before they are promoted to the next higher priority.

Parameters:
unit - time unit of the max wait time.
Returns:
the max wait time in the specified time unit.

getMaxSize

public long getMaxSize()
Return the maximum queue size.

Returns:
the maximum queue size. If -1 the queue is unbounded.

iterator

public Iterator<PriorityDelayQueue.QueueElement<E>> iterator()
Return an iterator over all the PriorityDelayQueue.QueueElement elements (both expired and unexpired) in this queue. The iterator does not return the elements in any particular order. The returned Iterator is a "weakly consistent" iterator that will never throw ConcurrentModificationException, and guarantees to traverse elements as they existed upon construction of the iterator, and may (but is not guaranteed to) reflect any modifications subsequent to construction.

Specified by:
iterator in interface Iterable<PriorityDelayQueue.QueueElement<E>>
Specified by:
iterator in interface Collection<PriorityDelayQueue.QueueElement<E>>
Specified by:
iterator in class AbstractCollection<PriorityDelayQueue.QueueElement<E>>
Returns:
an iterator over the PriorityDelayQueue.QueueElement elements in this queue.

size

public int size()
Return the number of elements in the queue.

Specified by:
size in interface Collection<PriorityDelayQueue.QueueElement<E>>
Specified by:
size in class AbstractCollection<PriorityDelayQueue.QueueElement<E>>
Returns:
the number of elements in the queue.

sizes

public int[] sizes()
Return the number of elements on each priority sub-queue.

Returns:
the number of elements on each priority sub-queue.

add

public boolean add(PriorityDelayQueue.QueueElement<E> queueElement)
Inserts the specified element into this queue if it is possible to do so immediately without violating capacity restrictions, returning true upon success and throwing an IllegalStateException if no space is currently available. When using a capacity-restricted queue, it is generally preferable to use offer.

Specified by:
add in interface Collection<PriorityDelayQueue.QueueElement<E>>
Specified by:
add in interface BlockingQueue<PriorityDelayQueue.QueueElement<E>>
Specified by:
add in interface Queue<PriorityDelayQueue.QueueElement<E>>
Overrides:
add in class AbstractQueue<PriorityDelayQueue.QueueElement<E>>
Parameters:
queueElement - the PriorityDelayQueue.QueueElement element to add.
Returns:
true (as specified by Collection.add(E))
Throws:
IllegalStateException - if the element cannot be added at this time due to capacity restrictions
ClassCastException - if the class of the specified element prevents it from being added to this queue
NullPointerException - if the specified element is null
IllegalArgumentException - if some property of the specified element prevents it from being added to this queue

offer

public boolean offer(PriorityDelayQueue.QueueElement<E> queueElement)
Insert the specified element into the queue.

The element is added with minimun priority and no delay.

Specified by:
offer in interface BlockingQueue<PriorityDelayQueue.QueueElement<E>>
Specified by:
offer in interface Queue<PriorityDelayQueue.QueueElement<E>>
Parameters:
queueElement - the element to add.
Returns:
true if the element has been inserted, false if the element was not inserted (the queue has reached its maximum size).
Throws:
NullPointerException - if the specified element is null

poll

public PriorityDelayQueue.QueueElement<E> poll()
Retrieve and remove the head of this queue, or return null if this queue has no elements with an expired delay.

The retrieved element is the oldest one from the highest priority sub-queue.

Invocations to this method run the anti-starvation (once every interval check).

Specified by:
poll in interface Queue<PriorityDelayQueue.QueueElement<E>>
Returns:
the head of this queue, or null if this queue has no elements with an expired delay.

peek

public PriorityDelayQueue.QueueElement<E> peek()
Retrieve, but does not remove, the head of this queue, or returns null if this queue is empty. Unlike poll, if no expired elements are available in the queue, this method returns the element that will expire next, if one exists.

Specified by:
peek in interface Queue<PriorityDelayQueue.QueueElement<E>>
Returns:
the head of this queue, or null if this queue is empty.

antiStarvation

protected void antiStarvation()
Run the anti-starvation check every ANTI_STARVATION_INTERVAL milliseconds.

It promotes elements beyond max wait time to the next higher priority sub-queue.


debug

protected void debug(String msgTemplate,
                     Object... msgArgs)
Method for debugging purposes. This implementation is a NOP.

This method should be overriden for logging purposes.

Message templates used by this class are in JDK's MessageFormat syntax.

Parameters:
msgTemplate - message template.
msgArgs - arguments for the message template.

put

public void put(PriorityDelayQueue.QueueElement<E> e)
         throws InterruptedException
Insert the specified element into this queue, waiting if necessary for space to become available.

NOTE: This method is to fulfill the BlockingQueue interface. Not implemented in the most optimal way.

Specified by:
put in interface BlockingQueue<PriorityDelayQueue.QueueElement<E>>
Parameters:
e - the element to add
Throws:
InterruptedException - if interrupted while waiting
ClassCastException - if the class of the specified element prevents it from being added to this queue
NullPointerException - if the specified element is null
IllegalArgumentException - if some property of the specified element prevents it from being added to this queue

offer

public boolean offer(PriorityDelayQueue.QueueElement<E> e,
                     long timeout,
                     TimeUnit unit)
              throws InterruptedException
Insert the specified element into this queue, waiting up to the specified wait time if necessary for space to become available.

IMPORTANT: This implementation forces the addition of the element to the queue regardless of the queue current size. The timeout value is ignored as the element is added immediately.

NOTE: This method is to fulfill the BlockingQueue interface. Not implemented in the most optimal way.

Specified by:
offer in interface BlockingQueue<PriorityDelayQueue.QueueElement<E>>
Parameters:
e - the element to add
timeout - how long to wait before giving up, in units of unit
unit - a TimeUnit determining how to interpret the timeout parameter
Returns:
true if successful, or false if the specified waiting time elapses before space is available
Throws:
InterruptedException - if interrupted while waiting
ClassCastException - if the class of the specified element prevents it from being added to this queue
NullPointerException - if the specified element is null
IllegalArgumentException - if some property of the specified element prevents it from being added to this queue

take

public PriorityDelayQueue.QueueElement<E> take()
                                        throws InterruptedException
Retrieve and removes the head of this queue, waiting if necessary until an element becomes available.

IMPORTANT: This implementation has a delay of up to 10ms (when the queue is empty) to detect a new element is available. It is doing a 10ms sleep.

NOTE: This method is to fulfill the BlockingQueue interface. Not implemented in the most optimal way.

Specified by:
take in interface BlockingQueue<PriorityDelayQueue.QueueElement<E>>
Returns:
the head of this queue
Throws:
InterruptedException - if interrupted while waiting

poll

public PriorityDelayQueue.QueueElement<E> poll(long timeout,
                                               TimeUnit unit)
                                        throws InterruptedException
Retrieve and removes the head of this queue, waiting up to the specified wait time if necessary for an element to become available.

NOTE: This method is to fulfill the BlockingQueue interface. Not implemented in the most optimal way.

Specified by:
poll in interface BlockingQueue<PriorityDelayQueue.QueueElement<E>>
Parameters:
timeout - how long to wait before giving up, in units of unit
unit - a TimeUnit determining how to interpret the timeout parameter
Returns:
the head of this queue, or null if the specified waiting time elapses before an element is available
Throws:
InterruptedException - if interrupted while waiting

remainingCapacity

public int remainingCapacity()
Return the number of additional elements that this queue can ideally (in the absence of memory or resource constraints) accept without blocking, or Integer.MAX_VALUE if there is no intrinsic limit.

Note that you cannot always tell if an attempt to insert an element will succeed by inspecting remainingCapacity because it may be the case that another thread is about to insert or remove an element.

NOTE: This method is to fulfill the BlockingQueue interface. Not implemented in the most optimal way.

Specified by:
remainingCapacity in interface BlockingQueue<PriorityDelayQueue.QueueElement<E>>
Returns:
the remaining capacity

drainTo

public int drainTo(Collection<? super PriorityDelayQueue.QueueElement<E>> c)
Remove all available elements from this queue and adds them to the given collection. This operation may be more efficient than repeatedly polling this queue. A failure encountered while attempting to add elements to collection c may result in elements being in neither, either or both collections when the associated exception is thrown. Attempt to drain a queue to itself result in IllegalArgumentException. Further, the behavior of this operation is undefined if the specified collection is modified while the operation is in progress.

NOTE: This method is to fulfill the BlockingQueue interface. Not implemented in the most optimal way.

Specified by:
drainTo in interface BlockingQueue<PriorityDelayQueue.QueueElement<E>>
Parameters:
c - the collection to transfer elements into
Returns:
the number of elements transferred
Throws:
UnsupportedOperationException - if addition of elements is not supported by the specified collection
ClassCastException - if the class of an element of this queue prevents it from being added to the specified collection
NullPointerException - if the specified collection is null
IllegalArgumentException - if the specified collection is this queue, or some property of an element of this queue prevents it from being added to the specified collection

drainTo

public int drainTo(Collection<? super PriorityDelayQueue.QueueElement<E>> c,
                   int maxElements)
Remove at most the given number of available elements from this queue and adds them to the given collection. A failure encountered while attempting to add elements to collection c may result in elements being in neither, either or both collections when the associated exception is thrown. Attempt to drain a queue to itself result in IllegalArgumentException. Further, the behavior of this operation is undefined if the specified collection is modified while the operation is in progress.

NOTE: This method is to fulfill the BlockingQueue interface. Not implemented in the most optimal way.

Specified by:
drainTo in interface BlockingQueue<PriorityDelayQueue.QueueElement<E>>
Parameters:
c - the collection to transfer elements into
maxElements - the maximum number of elements to transfer
Returns:
the number of elements transferred
Throws:
UnsupportedOperationException - if addition of elements is not supported by the specified collection
ClassCastException - if the class of an element of this queue prevents it from being added to the specified collection
NullPointerException - if the specified collection is null
IllegalArgumentException - if the specified collection is this queue, or some property of an element of this queue prevents it from being added to the specified collection

clear

public void clear()
Removes all of the elements from this queue. The queue will be empty after this call returns.

Specified by:
clear in interface Collection<PriorityDelayQueue.QueueElement<E>>
Overrides:
clear in class AbstractQueue<PriorityDelayQueue.QueueElement<E>>


Copyright © 2012 Apache Software Foundation. All Rights Reserved.