001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 * 
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 * 
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.util;
019
020import java.util.AbstractQueue;
021import java.util.ArrayList;
022import java.util.Arrays;
023import java.util.Collection;
024import java.util.ConcurrentModificationException;
025import java.util.Iterator;
026import java.util.List;
027import java.util.concurrent.BlockingQueue;
028import java.util.concurrent.DelayQueue;
029import java.util.concurrent.Delayed;
030import java.util.concurrent.TimeUnit;
031import java.util.concurrent.atomic.AtomicInteger;
032import java.util.concurrent.locks.ReentrantLock;
033
034/**
035 * A Queue implementation that support queuing elements into the future and priority queuing.
036 * <p/>
037 * The {@link PriorityDelayQueue} avoids starvation by raising elements priority as they age.
038 * <p/>
039 * To support queuing elements into the future, the JDK <code>DelayQueue</code> is used.
040 * <p/>
041 * To support priority queuing, an array of <code>DelayQueue</code> sub-queues is used. Elements are consumed from the
042 * higher priority sub-queues first. From a sub-queue, elements are available based on their age.
043 * <p/>
044 * To avoid starvation, there is is maximum wait time for an an element in a sub-queue, after the maximum wait time has
045 * elapsed, the element is promoted to the next higher priority sub-queue. Eventually it will reach the maximum priority
046 * sub-queue and it will be consumed when it is the oldest element in the that sub-queue.
047 * <p/>
048 * Every time an element is promoted to a higher priority sub-queue, a new maximum wait time applies.
049 * <p/>
050 * This class does not use a separate thread for anti-starvation check, instead, the check is performed on polling and
051 * seeking operations. This check is performed, the most every 1/2 second.
052 */
053public class PriorityDelayQueue<E> extends AbstractQueue<PriorityDelayQueue.QueueElement<E>>
054        implements BlockingQueue<PriorityDelayQueue.QueueElement<E>> {
055
056    /**
057     * Element wrapper required by the queue.
058     * <p/>
059     * This wrapper keeps track of the priority and the age of a queue element.
060     */
061    public static class QueueElement<E> implements Delayed {
062        private E element;
063        private int priority;
064        private long baseTime;
065        boolean inQueue;
066
067        /**
068         * Create an Element wrapper.
069         *
070         * @param element element.
071         * @param priority priority of the element.
072         * @param delay delay of the element.
073         * @param unit time unit of the delay.
074         *
075         * @throws IllegalArgumentException if the element is <tt>NULL</tt>, the priority is negative or if the delay is
076         * negative.
077         */
078        public QueueElement(E element, int priority, long delay, TimeUnit unit) {
079            if (element == null) {
080                throw new IllegalArgumentException("element cannot be null");
081            }
082            if (priority < 0) {
083                throw new IllegalArgumentException("priority cannot be negative, [" + element + "]");
084            }
085            if (delay < 0) {
086                throw new IllegalArgumentException("delay cannot be negative");
087            }
088            this.element = element;
089            this.priority = priority;
090            setDelay(delay, unit);
091        }
092
093        /**
094         * Create an Element wrapper with no delay and minimum priority.
095         *
096         * @param element element.
097         */
098        public QueueElement(E element) {
099            this(element, 0, 0, TimeUnit.MILLISECONDS);
100        }
101
102        /**
103         * Return the element from the wrapper.
104         *
105         * @return the element.
106         */
107        public E getElement() {
108            return element;
109        }
110
111        /**
112         * Return the priority of the element.
113         *
114         * @return the priority of the element.
115         */
116        public int getPriority() {
117            return priority;
118        }
119
120        /**
121         * Set the delay of the element.
122         *
123         * @param delay delay of the element.
124         * @param unit time unit of the delay.
125         */
126        public void setDelay(long delay, TimeUnit unit) {
127            baseTime = System.currentTimeMillis() + unit.toMillis(delay);
128        }
129
130        /**
131         * Return the delay of the element.
132         *
133         * @param unit time unit of the delay.
134         *
135         * @return the delay in the specified time unit.
136         */
137        public long getDelay(TimeUnit unit) {
138            return unit.convert(baseTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
139        }
140
141        /**
142         * Compare the age of this wrapper element with another. The priority is not used for the comparision.
143         *
144         * @param o the other wrapper element to compare with.
145         *
146         * @return less than zero if this wrapper is older, zero if both wrapper elements have the same age, greater
147         *         than zero if the parameter wrapper element is older.
148         */
149        public int compareTo(Delayed o) {
150            long diff = (getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
151            if(diff > 0) {
152                return 1;
153            } else if(diff < 0) {
154                return -1;
155            } else {
156                return 0;
157            }
158        }
159
160        /**
161         * Return the string representation of the wrapper element.
162         *
163         * @return the string representation of the wrapper element.
164         */
165        @Override
166        public String toString() {
167            StringBuilder sb = new StringBuilder();
168            sb.append("[").append(element).append("] priority=").append(priority).append(" delay=").
169                    append(getDelay(TimeUnit.MILLISECONDS));
170            return sb.toString();
171        }
172
173    }
174
175    /**
176     * Frequency, in milliseconds, of the anti-starvation check.
177     */
178    public static final long ANTI_STARVATION_INTERVAL = 500;
179
180    protected int priorities;
181    protected DelayQueue<QueueElement<E>>[] queues;
182    protected transient final ReentrantLock lock = new ReentrantLock();
183    private transient long lastAntiStarvationCheck = 0;
184    private long maxWait;
185    private int maxSize;
186    protected AtomicInteger currentSize;
187
188    /**
189     * Create a <code>PriorityDelayQueue</code>.
190     *
191     * @param priorities number of priorities the queue will support.
192     * @param maxWait max wait time for elements before they are promoted to the next higher priority.
193     * @param unit time unit of the max wait time.
194     * @param maxSize maximum size of the queue, -1 means unbounded.
195     */
196    @SuppressWarnings("unchecked")
197    public PriorityDelayQueue(int priorities, long maxWait, TimeUnit unit, int maxSize) {
198        if (priorities < 1) {
199            throw new IllegalArgumentException("priorities must be 1 or more");
200        }
201        if (maxWait < 0) {
202            throw new IllegalArgumentException("maxWait must be greater than 0");
203        }
204        if (maxSize < -1 || maxSize == 0) {
205            throw new IllegalArgumentException("maxSize must be -1 or greater than 0");
206        }
207        this.priorities = priorities;
208        queues = new DelayQueue[priorities];
209        for (int i = 0; i < priorities; i++) {
210            queues[i] = new DelayQueue<QueueElement<E>>();
211        }
212        this.maxWait = unit.toMillis(maxWait);
213        this.maxSize = maxSize;
214        if (maxSize != -1) {
215            currentSize = new AtomicInteger();
216        }
217    }
218
219    /**
220     * Return number of priorities the queue supports.
221     *
222     * @return number of priorities the queue supports.
223     */
224    public int getPriorities() {
225        return priorities;
226    }
227
228    /**
229     * Return the max wait time for elements before they are promoted to the next higher priority.
230     *
231     * @param unit time unit of the max wait time.
232     *
233     * @return the max wait time in the specified time unit.
234     */
235    public long getMaxWait(TimeUnit unit) {
236        return unit.convert(maxWait, TimeUnit.MILLISECONDS);
237    }
238
239    /**
240     * Return the maximum queue size.
241     *
242     * @return the maximum queue size. If <code>-1</code> the queue is unbounded.
243     */
244    public long getMaxSize() {
245        return maxSize;
246    }
247
248    /**
249     * Return an iterator over all the {@link QueueElement} elements (both expired and unexpired) in this queue. The
250     * iterator does not return the elements in any particular order.  The returned <tt>Iterator</tt> is a "weakly
251     * consistent" iterator that will never throw {@link ConcurrentModificationException}, and guarantees to traverse
252     * elements as they existed upon construction of the iterator, and may (but is not guaranteed to) reflect any
253     * modifications subsequent to construction.
254     *
255     * @return an iterator over the {@link QueueElement} elements in this queue.
256     */
257    @Override
258    @SuppressWarnings("unchecked")
259    public Iterator<QueueElement<E>> iterator() {
260        QueueElement[][] queueElements = new QueueElement[queues.length][];
261        lock.lock();
262        try {
263            for (int i = 0; i < queues.length; i++) {
264                queueElements[i] = queues[i].toArray(new QueueElement[0]);
265            }
266        }
267        finally {
268            lock.unlock();
269        }
270        List<QueueElement<E>> list = new ArrayList<QueueElement<E>>();
271        for (QueueElement[] elements : queueElements) {
272            list.addAll(Arrays.asList((QueueElement<E>[]) elements));
273        }
274        return list.iterator();
275    }
276
277    /**
278     * Return the number of elements in the queue.
279     *
280     * @return the number of elements in the queue.
281     */
282    @Override
283    public int size() {
284        int size = 0;
285        for (DelayQueue<QueueElement<E>> queue : queues) {
286            size += queue.size();
287        }
288        return size;
289    }
290
291    /**
292     * Return the number of elements on each priority sub-queue.
293     *
294     * @return the number of elements on each priority sub-queue.
295     */
296    public int[] sizes() {
297        int[] sizes = new int[queues.length];
298        for (int i = 0; i < queues.length; i++) {
299            sizes[i] = queues[i].size();
300        }
301        return sizes;
302    }
303
304    /**
305     * Inserts the specified element into this queue if it is possible to do
306     * so immediately without violating capacity restrictions, returning
307     * <tt>true</tt> upon success and throwing an
308     * <tt>IllegalStateException</tt> if no space is currently available.
309     * When using a capacity-restricted queue, it is generally preferable to
310     * use {@link #offer(Object) offer}.
311     *
312     * @param queueElement the {@link QueueElement} element to add.
313     * @return <tt>true</tt> (as specified by {@link Collection#add})
314     * @throws IllegalStateException if the element cannot be added at this
315     *         time due to capacity restrictions
316     * @throws ClassCastException if the class of the specified element
317     *         prevents it from being added to this queue
318     * @throws NullPointerException if the specified element is null
319     * @throws IllegalArgumentException if some property of the specified
320     *         element prevents it from being added to this queue
321     */
322    @Override
323    public boolean add(QueueElement<E> queueElement) {
324        return offer(queueElement, false);
325    }
326
327    /**
328     * Insert the specified {@link QueueElement} element into the queue.
329     *
330     * @param queueElement the {@link QueueElement} element to add.
331     * @param ignoreSize if the queue is bound to a maximum size and the maximum size is reached, this parameter (if set
332     * to <tt>true</tt>) allows to ignore the maximum size and add the element to the queue.
333     *
334     * @return <tt>true</tt> if the element has been inserted, <tt>false</tt> if the element was not inserted (the queue
335     *         has reached its maximum size).
336     *
337     * @throws NullPointerException if the specified element is null
338     */
339    boolean offer(QueueElement<E> queueElement, boolean ignoreSize) {
340        if (queueElement == null) {
341            throw new NullPointerException("queueElement is NULL");
342        }
343        if (queueElement.getPriority() < 0 || queueElement.getPriority() >= priorities) {
344            throw new IllegalArgumentException("priority out of range: " + queueElement);
345        }
346        if (queueElement.inQueue) {
347            throw new IllegalStateException("queueElement already in a queue: " + queueElement);
348        }
349        if (!ignoreSize && currentSize != null && currentSize.get() >= maxSize) {
350            return false;
351        }
352        boolean accepted = queues[queueElement.getPriority()].offer(queueElement);
353        debug("offer([{0}]), to P[{1}] delay[{2}ms] accepted[{3}]", queueElement.getElement().toString(),
354              queueElement.getPriority(), queueElement.getDelay(TimeUnit.MILLISECONDS), accepted);
355        if (accepted) {
356            if (currentSize != null) {
357                currentSize.incrementAndGet();
358            }
359            queueElement.inQueue = true;
360        }
361        return accepted;
362    }
363
364    /**
365     * Insert the specified element into the queue.
366     * <p/>
367     * The element is added with minimun priority and no delay.
368     *
369     * @param queueElement the element to add.
370     *
371     * @return <tt>true</tt> if the element has been inserted, <tt>false</tt> if the element was not inserted (the queue
372     *         has reached its maximum size).
373     *
374     * @throws NullPointerException if the specified element is null
375     */
376    @Override
377    public boolean offer(QueueElement<E> queueElement) {
378        return offer(queueElement, false);
379    }
380
381    /**
382     * Retrieve and remove the head of this queue, or return <tt>null</tt> if this queue has no elements with an expired
383     * delay.
384     * <p/>
385     * The retrieved element is the oldest one from the highest priority sub-queue.
386     * <p/>
387     * Invocations to this method run the anti-starvation (once every interval check).
388     *
389     * @return the head of this queue, or <tt>null</tt> if this queue has no elements with an expired delay.
390     */
391    @Override
392    public QueueElement<E> poll() {
393        lock.lock();
394        try {
395            antiStarvation();
396            QueueElement<E> e = null;
397            int i = priorities;
398            for (; e == null && i > 0; i--) {
399                e = queues[i - 1].poll();
400            }
401            if (e != null) {
402                if (currentSize != null) {
403                    currentSize.decrementAndGet();
404                }
405                e.inQueue = false;
406                debug("poll(): [{0}], from P[{1}]", e.getElement().toString(), i);
407            }
408            return e;
409        }
410        finally {
411            lock.unlock();
412        }
413    }
414
415    /**
416     * Retrieve, but does not remove, the head of this queue, or returns <tt>null</tt> if this queue is empty.  Unlike
417     * <tt>poll</tt>, if no expired elements are available in the queue, this method returns the element that will
418     * expire next, if one exists.
419     *
420     * @return the head of this queue, or <tt>null</tt> if this queue is empty.
421     */
422    @Override
423    public QueueElement<E> peek() {
424        lock.lock();
425        try {
426            antiStarvation();
427            QueueElement<E> e = null;
428
429            QueueElement<E> [] seeks = new QueueElement[priorities];
430            boolean foundElement = false;
431            for (int i = priorities - 1; i > -1; i--) {
432                e = queues[i].peek();
433                debug("peek(): considering [{0}] from P[{1}]", e, i);
434                seeks[priorities - i - 1] = e;
435                foundElement |= e != null;
436            }
437            if (foundElement) {
438                e = null;
439                for (int i = 0; e == null && i < priorities; i++) {
440                    if (seeks[i] != null && seeks[i].getDelay(TimeUnit.MILLISECONDS) > 0) {
441                        debug("peek, ignoring [{0}]", seeks[i]);
442                    }
443                    else {
444                        e = seeks[i];
445                    }
446                }
447                if (e != null) {
448                    debug("peek(): choosing [{0}]", e);
449                }
450                if (e == null) {
451                    int first;
452                    for (first = 0; e == null && first < priorities; first++) {
453                        e = seeks[first];
454                    }
455                    if (e != null) {
456                        debug("peek(): initial choosing [{0}]", e);
457                    }
458                    for (int i = first; i < priorities; i++) {
459                        QueueElement<E> ee = seeks[i];
460                        if (ee != null && ee.getDelay(TimeUnit.MILLISECONDS) < e.getDelay(TimeUnit.MILLISECONDS)) {
461                            debug("peek(): choosing [{0}] over [{1}]", ee, e);
462                            e = ee;
463                        }
464                    }
465                }
466            }
467            if (e != null) {
468                debug("peek(): [{0}], from P[{1}]", e.getElement().toString(), e.getPriority());
469            }
470            else {
471                debug("peek(): NULL");
472            }
473            return e;
474        }
475        finally {
476            lock.unlock();
477        }
478    }
479
480    /**
481     * Run the anti-starvation check every {@link #ANTI_STARVATION_INTERVAL} milliseconds.
482     * <p/>
483     * It promotes elements beyond max wait time to the next higher priority sub-queue.
484     */
485    protected void antiStarvation() {
486        long now = System.currentTimeMillis();
487        if (now - lastAntiStarvationCheck > ANTI_STARVATION_INTERVAL) {
488            for (int i = 0; i < queues.length - 1; i++) {
489                antiStarvation(queues[i], queues[i + 1], "from P[" + i + "] to P[" + (i + 1) + "]");
490            }
491            StringBuilder sb = new StringBuilder();
492            for (int i = 0; i < queues.length; i++) {
493                sb.append("P[").append(i).append("]=").append(queues[i].size()).append(" ");
494            }
495            debug("sub-queue sizes: {0}", sb.toString());
496            lastAntiStarvationCheck = System.currentTimeMillis();
497        }
498    }
499
500    /**
501     * Promote elements beyond max wait time from a lower priority sub-queue to a higher priority sub-queue.
502     *
503     * @param lowerQ lower priority sub-queue.
504     * @param higherQ higher priority sub-queue.
505     * @param msg sub-queues msg (from-to) for debugging purposes.
506     */
507    private void antiStarvation(DelayQueue<QueueElement<E>> lowerQ, DelayQueue<QueueElement<E>> higherQ, String msg) {
508        int moved = 0;
509        QueueElement<E> e = lowerQ.poll();
510        while (e != null && e.getDelay(TimeUnit.MILLISECONDS) < -maxWait) {
511            e.setDelay(0, TimeUnit.MILLISECONDS);
512            if (!higherQ.offer(e)) {
513                throw new IllegalStateException("Could not move element to higher sub-queue, element rejected");
514            }
515            e.priority++;
516            e = lowerQ.poll();
517            moved++;
518        }
519        if (e != null) {
520            if (!lowerQ.offer(e)) {
521                throw new IllegalStateException("Could not reinsert element to current sub-queue, element rejected");
522            }
523        }
524        debug("anti-starvation, moved {0} element(s) {1}", moved, msg);
525    }
526
527    /**
528     * Method for debugging purposes. This implementation is a <tt>NOP</tt>.
529     * <p/>
530     * This method should be overriden for logging purposes.
531     * <p/>
532     * Message templates used by this class are in JDK's <tt>MessageFormat</tt> syntax.
533     *
534     * @param msgTemplate message template.
535     * @param msgArgs arguments for the message template.
536     */
537    protected void debug(String msgTemplate, Object... msgArgs) {
538    }
539
540    /**
541     * Insert the specified element into this queue, waiting if necessary
542     * for space to become available.
543     * <p/>
544     * NOTE: This method is to fulfill the <tt>BlockingQueue<tt/> interface. Not implemented in the most optimal way.
545     *
546     * @param e the element to add
547     * @throws InterruptedException if interrupted while waiting
548     * @throws ClassCastException if the class of the specified element
549     *         prevents it from being added to this queue
550     * @throws NullPointerException if the specified element is null
551     * @throws IllegalArgumentException if some property of the specified
552     *         element prevents it from being added to this queue
553     */
554    @Override
555    public void put(QueueElement<E> e) throws InterruptedException {
556        while (!offer(e, true)) {
557            Thread.sleep(10);
558        }
559    }
560
561    /**
562     * Insert the specified element into this queue, waiting up to the
563     * specified wait time if necessary for space to become available.
564     * <p/>
565     * IMPORTANT: This implementation forces the addition of the element to the queue regardless
566     * of the queue current size. The timeout value is ignored as the element is added immediately.
567     * <p/>
568     * NOTE: This method is to fulfill the <tt>BlockingQueue<tt/> interface. Not implemented in the most optimal way.
569     *
570     * @param e the element to add
571     * @param timeout how long to wait before giving up, in units of
572     *        <tt>unit</tt>
573     * @param unit a <tt>TimeUnit</tt> determining how to interpret the
574     *        <tt>timeout</tt> parameter
575     * @return <tt>true</tt> if successful, or <tt>false</tt> if
576     *         the specified waiting time elapses before space is available
577     * @throws InterruptedException if interrupted while waiting
578     * @throws ClassCastException if the class of the specified element
579     *         prevents it from being added to this queue
580     * @throws NullPointerException if the specified element is null
581     * @throws IllegalArgumentException if some property of the specified
582     *         element prevents it from being added to this queue
583     */
584    @Override
585    public boolean offer(QueueElement<E> e, long timeout, TimeUnit unit) throws InterruptedException {
586        return offer(e, true);
587    }
588
589    /**
590     * Retrieve and removes the head of this queue, waiting if necessary
591     * until an element becomes available.
592     * <p/>
593     * IMPORTANT: This implementation has a delay of up to 10ms (when the queue is empty) to detect a new element
594     * is available. It is doing a 10ms sleep.
595     * <p/>
596     * NOTE: This method is to fulfill the <tt>BlockingQueue<tt/> interface. Not implemented in the most optimal way.
597     *
598     * @return the head of this queue
599     * @throws InterruptedException if interrupted while waiting
600     */
601    @Override
602    public QueueElement<E> take() throws InterruptedException {
603        QueueElement<E> e = poll();
604        while (e == null) {
605            Thread.sleep(10);
606            e = poll();
607        }
608        return e;
609    }
610
611    /**
612     * Retrieve and removes the head of this queue, waiting up to the
613     * specified wait time if necessary for an element to become available.
614     * <p/>
615     * NOTE: This method is to fulfill the <tt>BlockingQueue<tt/> interface. Not implemented in the most optimal way.
616     *
617     * @param timeout how long to wait before giving up, in units of
618     *        <tt>unit</tt>
619     * @param unit a <tt>TimeUnit</tt> determining how to interpret the
620     *        <tt>timeout</tt> parameter
621     * @return the head of this queue, or <tt>null</tt> if the
622     *         specified waiting time elapses before an element is available
623     * @throws InterruptedException if interrupted while waiting
624     */
625    @Override
626    public QueueElement<E> poll(long timeout, TimeUnit unit) throws InterruptedException {
627        QueueElement<E> e = poll();
628        long time = System.currentTimeMillis() + unit.toMillis(timeout);
629        while (e == null && time > System.currentTimeMillis()) {
630            Thread.sleep(10);
631            e = poll();
632        }
633        return poll();
634    }
635
636    /**
637     * Return the number of additional elements that this queue can ideally
638     * (in the absence of memory or resource constraints) accept without
639     * blocking, or <tt>Integer.MAX_VALUE</tt> if there is no intrinsic
640     * limit.
641     *
642     * <p>Note that you <em>cannot</em> always tell if an attempt to insert
643     * an element will succeed by inspecting <tt>remainingCapacity</tt>
644     * because it may be the case that another thread is about to
645     * insert or remove an element.
646     * <p/>
647     * NOTE: This method is to fulfill the <tt>BlockingQueue<tt/> interface. Not implemented in the most optimal way.
648     *
649     * @return the remaining capacity
650     */
651    @Override
652    public int remainingCapacity() {
653        return (maxSize == -1) ? -1 : maxSize - size();
654    }
655
656    /**
657     * Remove all available elements from this queue and adds them
658     * to the given collection.  This operation may be more
659     * efficient than repeatedly polling this queue.  A failure
660     * encountered while attempting to add elements to
661     * collection <tt>c</tt> may result in elements being in neither,
662     * either or both collections when the associated exception is
663     * thrown.  Attempt to drain a queue to itself result in
664     * <tt>IllegalArgumentException</tt>. Further, the behavior of
665     * this operation is undefined if the specified collection is
666     * modified while the operation is in progress.
667     * <p/>
668     * NOTE: This method is to fulfill the <tt>BlockingQueue<tt/> interface. Not implemented in the most optimal way.
669     *
670     * @param c the collection to transfer elements into
671     * @return the number of elements transferred
672     * @throws UnsupportedOperationException if addition of elements
673     *         is not supported by the specified collection
674     * @throws ClassCastException if the class of an element of this queue
675     *         prevents it from being added to the specified collection
676     * @throws NullPointerException if the specified collection is null
677     * @throws IllegalArgumentException if the specified collection is this
678     *         queue, or some property of an element of this queue prevents
679     *         it from being added to the specified collection
680     */
681    @Override
682    public int drainTo(Collection<? super QueueElement<E>> c) {
683        int count = 0;
684        for (DelayQueue<QueueElement<E>> q : queues) {
685            count += q.drainTo(c);
686        }
687        return count;
688    }
689
690    /**
691     * Remove at most the given number of available elements from
692     * this queue and adds them to the given collection.  A failure
693     * encountered while attempting to add elements to
694     * collection <tt>c</tt> may result in elements being in neither,
695     * either or both collections when the associated exception is
696     * thrown.  Attempt to drain a queue to itself result in
697     * <tt>IllegalArgumentException</tt>. Further, the behavior of
698     * this operation is undefined if the specified collection is
699     * modified while the operation is in progress.
700     * <p/>
701     * NOTE: This method is to fulfill the <tt>BlockingQueue<tt/> interface. Not implemented in the most optimal way.
702     *
703     * @param c the collection to transfer elements into
704     * @param maxElements the maximum number of elements to transfer
705     * @return the number of elements transferred
706     * @throws UnsupportedOperationException if addition of elements
707     *         is not supported by the specified collection
708     * @throws ClassCastException if the class of an element of this queue
709     *         prevents it from being added to the specified collection
710     * @throws NullPointerException if the specified collection is null
711     * @throws IllegalArgumentException if the specified collection is this
712     *         queue, or some property of an element of this queue prevents
713     *         it from being added to the specified collection
714     */
715    @Override
716    public int drainTo(Collection<? super QueueElement<E>> c, int maxElements) {
717        int left = maxElements;
718        int count = 0;
719        for (DelayQueue<QueueElement<E>> q : queues) {
720            int drained = q.drainTo(c, left);
721            count += drained;
722            left -= drained;
723        }
724        return count;
725    }
726
727    /**
728     * Removes all of the elements from this queue. The queue will be empty after this call returns.
729     */
730    @Override
731    public void clear() {
732        for (DelayQueue<QueueElement<E>> q : queues) {
733            q.clear();
734        }
735    }
736}