001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.util;
020
021import java.util.AbstractQueue;
022import java.util.ArrayList;
023import java.util.Arrays;
024import java.util.Collection;
025import java.util.ConcurrentModificationException;
026import java.util.Iterator;
027import java.util.List;
028import java.util.concurrent.BlockingQueue;
029import java.util.concurrent.DelayQueue;
030import java.util.concurrent.Delayed;
031import java.util.concurrent.FutureTask;
032import java.util.concurrent.TimeUnit;
033import java.util.concurrent.atomic.AtomicInteger;
034import java.util.concurrent.locks.ReentrantLock;
035
036/**
037 * A Queue implementation that support queuing elements into the future and priority queuing.
038 * <p>
039 * The {@link PriorityDelayQueue} avoids starvation by raising elements priority as they age.
040 * <p>
041 * To support queuing elements into the future, the JDK <code>DelayQueue</code> is used.
042 * <p>
043 * To support priority queuing, an array of <code>DelayQueue</code> sub-queues is used. Elements are consumed from the
044 * higher priority sub-queues first. From a sub-queue, elements are available based on their age.
045 * <p>
046 * To avoid starvation, there is is maximum wait time for an an element in a sub-queue, after the maximum wait time has
047 * elapsed, the element is promoted to the next higher priority sub-queue. Eventually it will reach the maximum priority
048 * sub-queue and it will be consumed when it is the oldest element in the that sub-queue.
049 * <p>
050 * Every time an element is promoted to a higher priority sub-queue, a new maximum wait time applies.
051 * <p>
052 * This class does not use a separate thread for anti-starvation check, instead, the check is performed on polling and
053 * seeking operations. This check is performed, the most every 1/2 second.
054 */
055public class PriorityDelayQueue<E> extends AbstractQueue<PriorityDelayQueue.QueueElement<E>>
056        implements BlockingQueue<PriorityDelayQueue.QueueElement<E>> {
057
058    /**
059     * Element wrapper required by the queue.
060     * <p>
061     * This wrapper keeps track of the priority and the age of a queue element.
062     */
063    public static class QueueElement<E> extends FutureTask<E> implements Delayed {
064        private XCallable<E> element;
065        private int priority;
066        private long baseTime;
067        boolean inQueue;
068
069        /**
070         * Create an Element wrapper.
071         *
072         * @param element element.
073         * @param priority priority of the element.
074         * @param delay delay of the element.
075         * @param unit time unit of the delay.
076         *
077         * @throws IllegalArgumentException if the element is <tt>NULL</tt>, the priority is negative or if the delay is
078         * negative.
079         */
080        public QueueElement(XCallable<E> element, int priority, long delay, TimeUnit unit) {
081            super(element);
082            if (priority < 0) {
083                throw new IllegalArgumentException("priority cannot be negative, [" + element + "]");
084            }
085            if (delay < 0) {
086                throw new IllegalArgumentException("delay cannot be negative");
087            }
088            this.element = element;
089            this.priority = priority;
090            setDelay(delay, unit);
091        }
092
093        /**
094         * Return the element from the wrapper.
095         *
096         * @return the element.
097         */
098        public XCallable<E> getElement() {
099            return element;
100        }
101
102        /**
103         * Return the priority of the element.
104         *
105         * @return the priority of the element.
106         */
107        public int getPriority() {
108            return priority;
109        }
110
111        /**
112         * Set the delay of the element.
113         *
114         * @param delay delay of the element.
115         * @param unit time unit of the delay.
116         */
117        public void setDelay(long delay, TimeUnit unit) {
118            baseTime = System.currentTimeMillis() + unit.toMillis(delay);
119        }
120
121        /**
122         * Return the delay of the element.
123         *
124         * @param unit time unit of the delay.
125         *
126         * @return the delay in the specified time unit.
127         */
128        public long getDelay(TimeUnit unit) {
129            return unit.convert(baseTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
130        }
131
132        /**
133         * Compare the age of this wrapper element with another. The priority is not used for the comparision.
134         *
135         * @param o the other wrapper element to compare with.
136         *
137         * @return less than zero if this wrapper is older, zero if both wrapper elements have the same age, greater
138         *         than zero if the parameter wrapper element is older.
139         */
140        public int compareTo(Delayed o) {
141            long diff = (getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
142            if(diff > 0) {
143                return 1;
144            } else if(diff < 0) {
145                return -1;
146            } else {
147                return 0;
148            }
149        }
150
151        /**
152         * Return the string representation of the wrapper element.
153         *
154         * @return the string representation of the wrapper element.
155         */
156        @Override
157        public String toString() {
158            StringBuilder sb = new StringBuilder();
159            sb.append("[").append(element).append("] priority=").append(priority).append(" delay=").
160                    append(getDelay(TimeUnit.MILLISECONDS));
161            return sb.toString();
162        }
163
164    }
165
166    /**
167     * Frequency, in milliseconds, of the anti-starvation check.
168     */
169    public static final long ANTI_STARVATION_INTERVAL = 500;
170
171    protected int priorities;
172    protected DelayQueue<QueueElement<E>>[] queues;
173    protected transient final ReentrantLock lock = new ReentrantLock();
174    private transient long lastAntiStarvationCheck = 0;
175    private long maxWait;
176    private int maxSize;
177    protected AtomicInteger currentSize;
178
179    /**
180     * Create a <code>PriorityDelayQueue</code>.
181     *
182     * @param priorities number of priorities the queue will support.
183     * @param maxWait max wait time for elements before they are promoted to the next higher priority.
184     * @param unit time unit of the max wait time.
185     * @param maxSize maximum size of the queue, -1 means unbounded.
186     */
187    @SuppressWarnings("unchecked")
188    public PriorityDelayQueue(int priorities, long maxWait, TimeUnit unit, int maxSize) {
189        if (priorities < 1) {
190            throw new IllegalArgumentException("priorities must be 1 or more");
191        }
192        if (maxWait < 0) {
193            throw new IllegalArgumentException("maxWait must be greater than 0");
194        }
195        if (maxSize < -1 || maxSize == 0) {
196            throw new IllegalArgumentException("maxSize must be -1 or greater than 0");
197        }
198        this.priorities = priorities;
199        queues = new DelayQueue[priorities];
200        for (int i = 0; i < priorities; i++) {
201            queues[i] = new DelayQueue<QueueElement<E>>();
202        }
203        this.maxWait = unit.toMillis(maxWait);
204        this.maxSize = maxSize;
205        if (maxSize != -1) {
206            currentSize = new AtomicInteger();
207        }
208    }
209
210    /**
211     * Return number of priorities the queue supports.
212     *
213     * @return number of priorities the queue supports.
214     */
215    public int getPriorities() {
216        return priorities;
217    }
218
219    /**
220     * Return the max wait time for elements before they are promoted to the next higher priority.
221     *
222     * @param unit time unit of the max wait time.
223     *
224     * @return the max wait time in the specified time unit.
225     */
226    public long getMaxWait(TimeUnit unit) {
227        return unit.convert(maxWait, TimeUnit.MILLISECONDS);
228    }
229
230    /**
231     * Return the maximum queue size.
232     *
233     * @return the maximum queue size. If <code>-1</code> the queue is unbounded.
234     */
235    public long getMaxSize() {
236        return maxSize;
237    }
238
239    /**
240     * Return an iterator over all the {@link QueueElement} elements (both expired and unexpired) in this queue. The
241     * iterator does not return the elements in any particular order.  The returned <tt>Iterator</tt> is a "weakly
242     * consistent" iterator that will never throw {@link ConcurrentModificationException}, and guarantees to traverse
243     * elements as they existed upon construction of the iterator, and may (but is not guaranteed to) reflect any
244     * modifications subsequent to construction.
245     *
246     * @return an iterator over the {@link QueueElement} elements in this queue.
247     */
248    @Override
249    @SuppressWarnings("unchecked")
250    public Iterator<QueueElement<E>> iterator() {
251        QueueElement[][] queueElements = new QueueElement[queues.length][];
252        lock.lock();
253        try {
254            for (int i = 0; i < queues.length; i++) {
255                queueElements[i] = queues[i].toArray(new QueueElement[0]);
256            }
257        }
258        finally {
259            lock.unlock();
260        }
261        List<QueueElement<E>> list = new ArrayList<QueueElement<E>>();
262        for (QueueElement[] elements : queueElements) {
263            list.addAll(Arrays.asList((QueueElement<E>[]) elements));
264        }
265        return list.iterator();
266    }
267
268    /**
269     * Return the number of elements in the queue.
270     *
271     * @return the number of elements in the queue.
272     */
273    @Override
274    public int size() {
275        int size = 0;
276        for (DelayQueue<QueueElement<E>> queue : queues) {
277            size += queue.size();
278        }
279        return size;
280    }
281
282    /**
283     * Return the number of elements on each priority sub-queue.
284     *
285     * @return the number of elements on each priority sub-queue.
286     */
287    public int[] sizes() {
288        int[] sizes = new int[queues.length];
289        for (int i = 0; i < queues.length; i++) {
290            sizes[i] = queues[i].size();
291        }
292        return sizes;
293    }
294
295    /**
296     * Inserts the specified element into this queue if it is possible to do
297     * so immediately without violating capacity restrictions, returning
298     * <tt>true</tt> upon success and throwing an
299     * <tt>IllegalStateException</tt> if no space is currently available.
300     * When using a capacity-restricted queue, it is generally preferable to
301     * use {@link #offer(Object) offer}.
302     *
303     * @param queueElement the {@link QueueElement} element to add.
304     * @return <tt>true</tt> (as specified by {@link Collection#add})
305     * @throws IllegalStateException if the element cannot be added at this
306     *         time due to capacity restrictions
307     * @throws ClassCastException if the class of the specified element
308     *         prevents it from being added to this queue
309     * @throws NullPointerException if the specified element is null
310     * @throws IllegalArgumentException if some property of the specified
311     *         element prevents it from being added to this queue
312     */
313    @Override
314    public boolean add(QueueElement<E> queueElement) {
315        return offer(queueElement, false);
316    }
317
318    /**
319     * Insert the specified {@link QueueElement} element into the queue.
320     *
321     * @param queueElement the {@link QueueElement} element to add.
322     * @param ignoreSize if the queue is bound to a maximum size and the maximum size is reached, this parameter (if set
323     * to <tt>true</tt>) allows to ignore the maximum size and add the element to the queue.
324     *
325     * @return <tt>true</tt> if the element has been inserted, <tt>false</tt> if the element was not inserted (the queue
326     *         has reached its maximum size).
327     *
328     * @throws NullPointerException if the specified element is null
329     */
330    boolean offer(QueueElement<E> queueElement, boolean ignoreSize) {
331        ParamChecker.notNull(queueElement, "queueElement");
332        if (queueElement.getPriority() < 0 || queueElement.getPriority() >= priorities) {
333            throw new IllegalArgumentException("priority out of range: " + queueElement);
334        }
335        if (queueElement.inQueue) {
336            throw new IllegalStateException("queueElement already in a queue: " + queueElement);
337        }
338        if (!ignoreSize && currentSize != null && currentSize.get() >= maxSize) {
339            return false;
340        }
341        boolean accepted = queues[queueElement.getPriority()].offer(queueElement);
342        debug("offer([{0}]), to P[{1}] delay[{2}ms] accepted[{3}]", queueElement.getElement().toString(),
343              queueElement.getPriority(), queueElement.getDelay(TimeUnit.MILLISECONDS), accepted);
344        if (accepted) {
345            if (currentSize != null) {
346                currentSize.incrementAndGet();
347            }
348            queueElement.inQueue = true;
349        }
350        return accepted;
351    }
352
353    /**
354     * Insert the specified element into the queue.
355     * <p>
356     * The element is added with minimun priority and no delay.
357     *
358     * @param queueElement the element to add.
359     *
360     * @return <tt>true</tt> if the element has been inserted, <tt>false</tt> if the element was not inserted (the queue
361     *         has reached its maximum size).
362     *
363     * @throws NullPointerException if the specified element is null
364     */
365    @Override
366    public boolean offer(QueueElement<E> queueElement) {
367        return offer(queueElement, false);
368    }
369
370    /**
371     * Retrieve and remove the head of this queue, or return <tt>null</tt> if this queue has no elements with an expired
372     * delay.
373     * <p>
374     * The retrieved element is the oldest one from the highest priority sub-queue.
375     * <p>
376     * Invocations to this method run the anti-starvation (once every interval check).
377     *
378     * @return the head of this queue, or <tt>null</tt> if this queue has no elements with an expired delay.
379     */
380    @Override
381    public QueueElement<E> poll() {
382        lock.lock();
383        try {
384            antiStarvation();
385            QueueElement<E> e = null;
386            int i = priorities;
387            for (; e == null && i > 0; i--) {
388                e = queues[i - 1].poll();
389            }
390            if (e != null) {
391                if (currentSize != null) {
392                    currentSize.decrementAndGet();
393                }
394                e.inQueue = false;
395                debug("poll(): [{0}], from P[{1}]", e.getElement().toString(), i);
396            }
397            return e;
398        }
399        finally {
400            lock.unlock();
401        }
402    }
403
404    /**
405     * Retrieve, but does not remove, the head of this queue, or returns <tt>null</tt> if this queue is empty.  Unlike
406     * <tt>poll</tt>, if no expired elements are available in the queue, this method returns the element that will
407     * expire next, if one exists.
408     *
409     * @return the head of this queue, or <tt>null</tt> if this queue is empty.
410     */
411    @Override
412    public QueueElement<E> peek() {
413        lock.lock();
414        try {
415            antiStarvation();
416            QueueElement<E> e = null;
417
418            QueueElement<E> [] seeks = new QueueElement[priorities];
419            boolean foundElement = false;
420            for (int i = priorities - 1; i > -1; i--) {
421                e = queues[i].peek();
422                debug("peek(): considering [{0}] from P[{1}]", e, i);
423                seeks[priorities - i - 1] = e;
424                foundElement |= e != null;
425            }
426            if (foundElement) {
427                e = null;
428                for (int i = 0; e == null && i < priorities; i++) {
429                    if (seeks[i] != null && seeks[i].getDelay(TimeUnit.MILLISECONDS) > 0) {
430                        debug("peek, ignoring [{0}]", seeks[i]);
431                    }
432                    else {
433                        e = seeks[i];
434                    }
435                }
436                if (e != null) {
437                    debug("peek(): choosing [{0}]", e);
438                }
439                if (e == null) {
440                    int first;
441                    for (first = 0; e == null && first < priorities; first++) {
442                        e = seeks[first];
443                    }
444                    if (e != null) {
445                        debug("peek(): initial choosing [{0}]", e);
446                    }
447                    for (int i = first; i < priorities; i++) {
448                        QueueElement<E> ee = seeks[i];
449                        if (ee != null && ee.getDelay(TimeUnit.MILLISECONDS) < e.getDelay(TimeUnit.MILLISECONDS)) {
450                            debug("peek(): choosing [{0}] over [{1}]", ee, e);
451                            e = ee;
452                        }
453                    }
454                }
455            }
456            if (e != null) {
457                debug("peek(): [{0}], from P[{1}]", e.getElement().toString(), e.getPriority());
458            }
459            else {
460                debug("peek(): NULL");
461            }
462            return e;
463        }
464        finally {
465            lock.unlock();
466        }
467    }
468
469    /**
470     * Run the anti-starvation check every {@link #ANTI_STARVATION_INTERVAL} milliseconds.
471     * <p>
472     * It promotes elements beyond max wait time to the next higher priority sub-queue.
473     */
474    protected void antiStarvation() {
475        long now = System.currentTimeMillis();
476        if (now - lastAntiStarvationCheck > ANTI_STARVATION_INTERVAL) {
477            for (int i = 0; i < queues.length - 1; i++) {
478                antiStarvation(queues[i], queues[i + 1], "from P[" + i + "] to P[" + (i + 1) + "]");
479            }
480            StringBuilder sb = new StringBuilder();
481            for (int i = 0; i < queues.length; i++) {
482                sb.append("P[").append(i).append("]=").append(queues[i].size()).append(" ");
483            }
484            debug("sub-queue sizes: {0}", sb.toString());
485            lastAntiStarvationCheck = System.currentTimeMillis();
486        }
487    }
488
489    /**
490     * Promote elements beyond max wait time from a lower priority sub-queue to a higher priority sub-queue.
491     *
492     * @param lowerQ lower priority sub-queue.
493     * @param higherQ higher priority sub-queue.
494     * @param msg sub-queues msg (from-to) for debugging purposes.
495     */
496    private void antiStarvation(DelayQueue<QueueElement<E>> lowerQ, DelayQueue<QueueElement<E>> higherQ, String msg) {
497        int moved = 0;
498        QueueElement<E> e = lowerQ.poll();
499        while (e != null && e.getDelay(TimeUnit.MILLISECONDS) < -maxWait) {
500            e.setDelay(0, TimeUnit.MILLISECONDS);
501            if (!higherQ.offer(e)) {
502                throw new IllegalStateException("Could not move element to higher sub-queue, element rejected");
503            }
504            e.priority++;
505            e = lowerQ.poll();
506            moved++;
507        }
508        if (e != null) {
509            if (!lowerQ.offer(e)) {
510                throw new IllegalStateException("Could not reinsert element to current sub-queue, element rejected");
511            }
512        }
513        debug("anti-starvation, moved {0} element(s) {1}", moved, msg);
514    }
515
516    /**
517     * Method for debugging purposes. This implementation is a <tt>NOP</tt>.
518     * <p>
519     * This method should be overriden for logging purposes.
520     * <p>
521     * Message templates used by this class are in JDK's <tt>MessageFormat</tt> syntax.
522     *
523     * @param msgTemplate message template.
524     * @param msgArgs arguments for the message template.
525     */
526    protected void debug(String msgTemplate, Object... msgArgs) {
527    }
528
529    /**
530     * Insert the specified element into this queue, waiting if necessary
531     * for space to become available.
532     * <p>
533     * NOTE: This method is to fulfill the <tt>BlockingQueue</tt> interface. Not implemented in the most optimal way.
534     *
535     * @param e the element to add
536     * @throws InterruptedException if interrupted while waiting
537     * @throws ClassCastException if the class of the specified element
538     *         prevents it from being added to this queue
539     * @throws NullPointerException if the specified element is null
540     * @throws IllegalArgumentException if some property of the specified
541     *         element prevents it from being added to this queue
542     */
543    @Override
544    public void put(QueueElement<E> e) throws InterruptedException {
545        while (!offer(e, true)) {
546            Thread.sleep(10);
547        }
548    }
549
550    /**
551     * Insert the specified element into this queue, waiting up to the
552     * specified wait time if necessary for space to become available.
553     * <p>
554     * IMPORTANT: This implementation forces the addition of the element to the queue regardless
555     * of the queue current size. The timeout value is ignored as the element is added immediately.
556     * <p>
557     * NOTE: This method is to fulfill the <tt>BlockingQueue</tt> interface. Not implemented in the most optimal way.
558     *
559     * @param e the element to add
560     * @param timeout how long to wait before giving up, in units of
561     *        <tt>unit</tt>
562     * @param unit a <tt>TimeUnit</tt> determining how to interpret the
563     *        <tt>timeout</tt> parameter
564     * @return <tt>true</tt> if successful, or <tt>false</tt> if
565     *         the specified waiting time elapses before space is available
566     * @throws InterruptedException if interrupted while waiting
567     * @throws ClassCastException if the class of the specified element
568     *         prevents it from being added to this queue
569     * @throws NullPointerException if the specified element is null
570     * @throws IllegalArgumentException if some property of the specified
571     *         element prevents it from being added to this queue
572     */
573    @Override
574    public boolean offer(QueueElement<E> e, long timeout, TimeUnit unit) throws InterruptedException {
575        return offer(e, true);
576    }
577
578    /**
579     * Retrieve and removes the head of this queue, waiting if necessary
580     * until an element becomes available.
581     * <p>
582     * IMPORTANT: This implementation has a delay of up to 10ms (when the queue is empty) to detect a new element
583     * is available. It is doing a 10ms sleep.
584     * <p>
585     * NOTE: This method is to fulfill the <tt>BlockingQueue</tt> interface. Not implemented in the most optimal way.
586     *
587     * @return the head of this queue
588     * @throws InterruptedException if interrupted while waiting
589     */
590    @Override
591    public QueueElement<E> take() throws InterruptedException {
592        QueueElement<E> e = poll();
593        while (e == null) {
594            Thread.sleep(10);
595            e = poll();
596        }
597        return e;
598    }
599
600    /**
601     * Retrieve and removes the head of this queue, waiting up to the
602     * specified wait time if necessary for an element to become available.
603     * <p>
604     * NOTE: This method is to fulfill the <tt>BlockingQueue</tt> interface. Not implemented in the most optimal way.
605     *
606     * @param timeout how long to wait before giving up, in units of
607     *        <tt>unit</tt>
608     * @param unit a <tt>TimeUnit</tt> determining how to interpret the
609     *        <tt>timeout</tt> parameter
610     * @return the head of this queue, or <tt>null</tt> if the
611     *         specified waiting time elapses before an element is available
612     * @throws InterruptedException if interrupted while waiting
613     */
614    @Override
615    public QueueElement<E> poll(long timeout, TimeUnit unit) throws InterruptedException {
616        QueueElement<E> e = poll();
617        long time = System.currentTimeMillis() + unit.toMillis(timeout);
618        while (e == null && time > System.currentTimeMillis()) {
619            Thread.sleep(10);
620            e = poll();
621        }
622        return poll();
623    }
624
625    /**
626     * Return the number of additional elements that this queue can ideally
627     * (in the absence of memory or resource constraints) accept without
628     * blocking, or <tt>Integer.MAX_VALUE</tt> if there is no intrinsic
629     * limit.
630     *
631     * <p>Note that you <em>cannot</em> always tell if an attempt to insert
632     * an element will succeed by inspecting <tt>remainingCapacity</tt>
633     * because it may be the case that another thread is about to
634     * insert or remove an element.
635     * <p>
636     * NOTE: This method is to fulfill the <tt>BlockingQueue</tt> interface. Not implemented in the most optimal way.
637     *
638     * @return the remaining capacity
639     */
640    @Override
641    public int remainingCapacity() {
642        return (maxSize == -1) ? -1 : maxSize - size();
643    }
644
645    /**
646     * Remove all available elements from this queue and adds them
647     * to the given collection.  This operation may be more
648     * efficient than repeatedly polling this queue.  A failure
649     * encountered while attempting to add elements to
650     * collection <tt>c</tt> may result in elements being in neither,
651     * either or both collections when the associated exception is
652     * thrown.  Attempt to drain a queue to itself result in
653     * <tt>IllegalArgumentException</tt>. Further, the behavior of
654     * this operation is undefined if the specified collection is
655     * modified while the operation is in progress.
656     * <p>
657     * NOTE: This method is to fulfill the <tt>BlockingQueue</tt> interface. Not implemented in the most optimal way.
658     *
659     * @param c the collection to transfer elements into
660     * @return the number of elements transferred
661     * @throws UnsupportedOperationException if addition of elements
662     *         is not supported by the specified collection
663     * @throws ClassCastException if the class of an element of this queue
664     *         prevents it from being added to the specified collection
665     * @throws NullPointerException if the specified collection is null
666     * @throws IllegalArgumentException if the specified collection is this
667     *         queue, or some property of an element of this queue prevents
668     *         it from being added to the specified collection
669     */
670    @Override
671    public int drainTo(Collection<? super QueueElement<E>> c) {
672        int count = 0;
673        for (DelayQueue<QueueElement<E>> q : queues) {
674            count += q.drainTo(c);
675        }
676        return count;
677    }
678
679    /**
680     * Remove at most the given number of available elements from
681     * this queue and adds them to the given collection.  A failure
682     * encountered while attempting to add elements to
683     * collection <tt>c</tt> may result in elements being in neither,
684     * either or both collections when the associated exception is
685     * thrown.  Attempt to drain a queue to itself result in
686     * <tt>IllegalArgumentException</tt>. Further, the behavior of
687     * this operation is undefined if the specified collection is
688     * modified while the operation is in progress.
689     * <p>
690     * NOTE: This method is to fulfill the <tt>BlockingQueue</tt> interface. Not implemented in the most optimal way.
691     *
692     * @param c the collection to transfer elements into
693     * @param maxElements the maximum number of elements to transfer
694     * @return the number of elements transferred
695     * @throws UnsupportedOperationException if addition of elements
696     *         is not supported by the specified collection
697     * @throws ClassCastException if the class of an element of this queue
698     *         prevents it from being added to the specified collection
699     * @throws NullPointerException if the specified collection is null
700     * @throws IllegalArgumentException if the specified collection is this
701     *         queue, or some property of an element of this queue prevents
702     *         it from being added to the specified collection
703     */
704    @Override
705    public int drainTo(Collection<? super QueueElement<E>> c, int maxElements) {
706        int left = maxElements;
707        int count = 0;
708        for (DelayQueue<QueueElement<E>> q : queues) {
709            int drained = q.drainTo(c, left);
710            count += drained;
711            left -= drained;
712        }
713        return count;
714    }
715
716    /**
717     * Removes all of the elements from this queue. The queue will be empty after this call returns.
718     */
719    @Override
720    public void clear() {
721        for (DelayQueue<QueueElement<E>> q : queues) {
722            q.clear();
723        }
724    }
725}