001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.util;
019    
020    import java.util.AbstractQueue;
021    import java.util.ArrayList;
022    import java.util.Arrays;
023    import java.util.Collection;
024    import java.util.ConcurrentModificationException;
025    import java.util.Iterator;
026    import java.util.List;
027    import java.util.concurrent.BlockingQueue;
028    import java.util.concurrent.DelayQueue;
029    import java.util.concurrent.Delayed;
030    import java.util.concurrent.TimeUnit;
031    import java.util.concurrent.atomic.AtomicInteger;
032    import java.util.concurrent.locks.ReentrantLock;
033    
034    /**
035     * A Queue implementation that support queuing elements into the future and priority queuing.
036     * <p/>
037     * The {@link PriorityDelayQueue} avoids starvation by raising elements priority as they age.
038     * <p/>
039     * To support queuing elements into the future, the JDK <code>DelayQueue</code> is used.
040     * <p/>
041     * To support priority queuing, an array of <code>DelayQueue</code> sub-queues is used. Elements are consumed from the
042     * higher priority sub-queues first. From a sub-queue, elements are available based on their age.
043     * <p/>
044     * To avoid starvation, there is is maximum wait time for an an element in a sub-queue, after the maximum wait time has
045     * elapsed, the element is promoted to the next higher priority sub-queue. Eventually it will reach the maximum priority
046     * sub-queue and it will be consumed when it is the oldest element in the that sub-queue.
047     * <p/>
048     * Every time an element is promoted to a higher priority sub-queue, a new maximum wait time applies.
049     * <p/>
050     * This class does not use a separate thread for anti-starvation check, instead, the check is performed on polling and
051     * seeking operations. This check is performed, the most every 1/2 second.
052     */
053    public class PriorityDelayQueue<E> extends AbstractQueue<PriorityDelayQueue.QueueElement<E>>
054            implements BlockingQueue<PriorityDelayQueue.QueueElement<E>> {
055    
056        /**
057         * Element wrapper required by the queue.
058         * <p/>
059         * This wrapper keeps track of the priority and the age of a queue element.
060         */
061        public static class QueueElement<E> implements Delayed {
062            private E element;
063            private int priority;
064            private long baseTime;
065            boolean inQueue;
066    
067            /**
068             * Create an Element wrapper.
069             *
070             * @param element element.
071             * @param priority priority of the element.
072             * @param delay delay of the element.
073             * @param unit time unit of the delay.
074             *
075             * @throws IllegalArgumentException if the element is <tt>NULL</tt>, the priority is negative or if the delay is
076             * negative.
077             */
078            public QueueElement(E element, int priority, long delay, TimeUnit unit) {
079                if (element == null) {
080                    throw new IllegalArgumentException("element cannot be null");
081                }
082                if (priority < 0) {
083                    throw new IllegalArgumentException("priority cannot be negative, [" + element + "]");
084                }
085                if (delay < 0) {
086                    throw new IllegalArgumentException("delay cannot be negative");
087                }
088                this.element = element;
089                this.priority = priority;
090                setDelay(delay, unit);
091            }
092    
093            /**
094             * Create an Element wrapper with no delay and minimum priority.
095             *
096             * @param element element.
097             */
098            public QueueElement(E element) {
099                this(element, 0, 0, TimeUnit.MILLISECONDS);
100            }
101    
102            /**
103             * Return the element from the wrapper.
104             *
105             * @return the element.
106             */
107            public E getElement() {
108                return element;
109            }
110    
111            /**
112             * Return the priority of the element.
113             *
114             * @return the priority of the element.
115             */
116            public int getPriority() {
117                return priority;
118            }
119    
120            /**
121             * Set the delay of the element.
122             *
123             * @param delay delay of the element.
124             * @param unit time unit of the delay.
125             */
126            public void setDelay(long delay, TimeUnit unit) {
127                baseTime = System.currentTimeMillis() + unit.toMillis(delay);
128            }
129    
130            /**
131             * Return the delay of the element.
132             *
133             * @param unit time unit of the delay.
134             *
135             * @return the delay in the specified time unit.
136             */
137            public long getDelay(TimeUnit unit) {
138                return unit.convert(baseTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
139            }
140    
141            /**
142             * Compare the age of this wrapper element with another. The priority is not used for the comparision.
143             *
144             * @param o the other wrapper element to compare with.
145             *
146             * @return less than zero if this wrapper is older, zero if both wrapper elements have the same age, greater
147             *         than zero if the parameter wrapper element is older.
148             */
149            public int compareTo(Delayed o) {
150                long diff = (getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
151                if(diff > 0) {
152                    return 1;
153                } else if(diff < 0) {
154                    return -1;
155                } else {
156                    return 0;
157                }
158            }
159    
160            /**
161             * Return the string representation of the wrapper element.
162             *
163             * @return the string representation of the wrapper element.
164             */
165            @Override
166            public String toString() {
167                StringBuilder sb = new StringBuilder();
168                sb.append("[").append(element).append("] priority=").append(priority).append(" delay=").
169                        append(getDelay(TimeUnit.MILLISECONDS));
170                return sb.toString();
171            }
172    
173        }
174    
175        /**
176         * Frequency, in milliseconds, of the anti-starvation check.
177         */
178        public static final long ANTI_STARVATION_INTERVAL = 500;
179    
180        protected int priorities;
181        protected DelayQueue<QueueElement<E>>[] queues;
182        protected transient final ReentrantLock lock = new ReentrantLock();
183        private transient long lastAntiStarvationCheck = 0;
184        private long maxWait;
185        private int maxSize;
186        protected AtomicInteger currentSize;
187    
188        /**
189         * Create a <code>PriorityDelayQueue</code>.
190         *
191         * @param priorities number of priorities the queue will support.
192         * @param maxWait max wait time for elements before they are promoted to the next higher priority.
193         * @param unit time unit of the max wait time.
194         * @param maxSize maximum size of the queue, -1 means unbounded.
195         */
196        @SuppressWarnings("unchecked")
197        public PriorityDelayQueue(int priorities, long maxWait, TimeUnit unit, int maxSize) {
198            if (priorities < 1) {
199                throw new IllegalArgumentException("priorities must be 1 or more");
200            }
201            if (maxWait < 0) {
202                throw new IllegalArgumentException("maxWait must be greater than 0");
203            }
204            if (maxSize < -1 || maxSize == 0) {
205                throw new IllegalArgumentException("maxSize must be -1 or greater than 0");
206            }
207            this.priorities = priorities;
208            queues = new DelayQueue[priorities];
209            for (int i = 0; i < priorities; i++) {
210                queues[i] = new DelayQueue<QueueElement<E>>();
211            }
212            this.maxWait = unit.toMillis(maxWait);
213            this.maxSize = maxSize;
214            if (maxSize != -1) {
215                currentSize = new AtomicInteger();
216            }
217        }
218    
219        /**
220         * Return number of priorities the queue supports.
221         *
222         * @return number of priorities the queue supports.
223         */
224        public int getPriorities() {
225            return priorities;
226        }
227    
228        /**
229         * Return the max wait time for elements before they are promoted to the next higher priority.
230         *
231         * @param unit time unit of the max wait time.
232         *
233         * @return the max wait time in the specified time unit.
234         */
235        public long getMaxWait(TimeUnit unit) {
236            return unit.convert(maxWait, TimeUnit.MILLISECONDS);
237        }
238    
239        /**
240         * Return the maximum queue size.
241         *
242         * @return the maximum queue size. If <code>-1</code> the queue is unbounded.
243         */
244        public long getMaxSize() {
245            return maxSize;
246        }
247    
248        /**
249         * Return an iterator over all the {@link QueueElement} elements (both expired and unexpired) in this queue. The
250         * iterator does not return the elements in any particular order.  The returned <tt>Iterator</tt> is a "weakly
251         * consistent" iterator that will never throw {@link ConcurrentModificationException}, and guarantees to traverse
252         * elements as they existed upon construction of the iterator, and may (but is not guaranteed to) reflect any
253         * modifications subsequent to construction.
254         *
255         * @return an iterator over the {@link QueueElement} elements in this queue.
256         */
257        @Override
258        @SuppressWarnings("unchecked")
259        public Iterator<QueueElement<E>> iterator() {
260            QueueElement[][] queueElements = new QueueElement[queues.length][];
261            try {
262                lock.lock();
263                for (int i = 0; i < queues.length; i++) {
264                    queueElements[i] = queues[i].toArray(new QueueElement[0]);
265                }
266            }
267            finally {
268                lock.unlock();
269            }
270            List<QueueElement<E>> list = new ArrayList<QueueElement<E>>();
271            for (QueueElement[] elements : queueElements) {
272                list.addAll(Arrays.asList((QueueElement<E>[]) elements));
273            }
274            return list.iterator();
275        }
276    
277        /**
278         * Return the number of elements in the queue.
279         *
280         * @return the number of elements in the queue.
281         */
282        @Override
283        public int size() {
284            int size = 0;
285            for (DelayQueue<QueueElement<E>> queue : queues) {
286                size += queue.size();
287            }
288            return size;
289        }
290    
291        /**
292         * Return the number of elements on each priority sub-queue.
293         *
294         * @return the number of elements on each priority sub-queue.
295         */
296        public int[] sizes() {
297            int[] sizes = new int[queues.length];
298            for (int i = 0; i < queues.length; i++) {
299                sizes[i] = queues[i].size();
300            }
301            return sizes;
302        }
303    
304        /**
305         * Inserts the specified element into this queue if it is possible to do
306         * so immediately without violating capacity restrictions, returning
307         * <tt>true</tt> upon success and throwing an
308         * <tt>IllegalStateException</tt> if no space is currently available.
309         * When using a capacity-restricted queue, it is generally preferable to
310         * use {@link #offer(Object) offer}.
311         *
312         * @param queueElement the {@link QueueElement} element to add.
313         * @return <tt>true</tt> (as specified by {@link Collection#add})
314         * @throws IllegalStateException if the element cannot be added at this
315         *         time due to capacity restrictions
316         * @throws ClassCastException if the class of the specified element
317         *         prevents it from being added to this queue
318         * @throws NullPointerException if the specified element is null
319         * @throws IllegalArgumentException if some property of the specified
320         *         element prevents it from being added to this queue
321         */
322        @Override
323        public boolean add(QueueElement<E> queueElement) {
324            return offer(queueElement, false);
325        }
326    
327        /**
328         * Insert the specified {@link QueueElement} element into the queue.
329         *
330         * @param queueElement the {@link QueueElement} element to add.
331         * @param ignoreSize if the queue is bound to a maximum size and the maximum size is reached, this parameter (if set
332         * to <tt>true</tt>) allows to ignore the maximum size and add the element to the queue.
333         *
334         * @return <tt>true</tt> if the element has been inserted, <tt>false</tt> if the element was not inserted (the queue
335         *         has reached its maximum size).
336         *
337         * @throws NullPointerException if the specified element is null
338         */
339        boolean offer(QueueElement<E> queueElement, boolean ignoreSize) {
340            if (queueElement == null) {
341                throw new NullPointerException("queueElement is NULL");
342            }
343            if (queueElement.getPriority() < 0 && queueElement.getPriority() >= priorities) {
344                throw new IllegalArgumentException("priority out of range");
345            }
346            if (queueElement.inQueue) {
347                throw new IllegalStateException("queueElement already in a queue");
348            }
349            if (!ignoreSize && currentSize != null && currentSize.get() >= maxSize) {
350                return false;
351            }
352            boolean accepted = queues[queueElement.getPriority()].offer(queueElement);
353            debug("offer([{0}]), to P[{1}] delay[{2}ms] accepted[{3}]", queueElement.getElement().toString(),
354                  queueElement.getPriority(), queueElement.getDelay(TimeUnit.MILLISECONDS), accepted);
355            if (accepted) {
356                if (currentSize != null) {
357                    currentSize.incrementAndGet();
358                }
359                queueElement.inQueue = true;
360            }
361            return accepted;
362        }
363    
364        /**
365         * Insert the specified element into the queue.
366         * <p/>
367         * The element is added with minimun priority and no delay.
368         *
369         * @param queueElement the element to add.
370         *
371         * @return <tt>true</tt> if the element has been inserted, <tt>false</tt> if the element was not inserted (the queue
372         *         has reached its maximum size).
373         *
374         * @throws NullPointerException if the specified element is null
375         */
376        @Override
377        public boolean offer(QueueElement<E> queueElement) {
378            return offer(queueElement, false);
379        }
380    
381        /**
382         * Retrieve and remove the head of this queue, or return <tt>null</tt> if this queue has no elements with an expired
383         * delay.
384         * <p/>
385         * The retrieved element is the oldest one from the highest priority sub-queue.
386         * <p/>
387         * Invocations to this method run the anti-starvation (once every interval check).
388         *
389         * @return the head of this queue, or <tt>null</tt> if this queue has no elements with an expired delay.
390         */
391        @Override
392        public QueueElement<E> poll() {
393            try {
394                lock.lock();
395                antiStarvation();
396                QueueElement<E> e = null;
397                int i = priorities;
398                for (; e == null && i > 0; i--) {
399                    e = queues[i - 1].poll();
400                }
401                if (e != null) {
402                    if (currentSize != null) {
403                        currentSize.decrementAndGet();
404                    }
405                    e.inQueue = false;
406                    debug("poll(): [{0}], from P[{1}]", e.getElement().toString(), i);
407                }
408                return e;
409            }
410            finally {
411                lock.unlock();
412            }
413        }
414    
415        /**
416         * Retrieve, but does not remove, the head of this queue, or returns <tt>null</tt> if this queue is empty.  Unlike
417         * <tt>poll</tt>, if no expired elements are available in the queue, this method returns the element that will
418         * expire next, if one exists.
419         *
420         * @return the head of this queue, or <tt>null</tt> if this queue is empty.
421         */
422        @Override
423        public QueueElement<E> peek() {
424            try {
425                lock.lock();
426                antiStarvation();
427                QueueElement<E> e = null;
428    
429                QueueElement<E> [] seeks = new QueueElement[priorities];
430                boolean foundElement = false;
431                for (int i = priorities - 1; i > -1; i--) {
432                    e = queues[i].peek();
433                    debug("peek(): considering [{0}] from P[{1}]", e, i);
434                    seeks[priorities - i - 1] = e;
435                    foundElement |= e != null;
436                }
437                if (foundElement) {
438                    e = null;
439                    for (int i = 0; e == null && i < priorities; i++) {
440                        if (seeks[i] != null && seeks[i].getDelay(TimeUnit.MILLISECONDS) > 0) {
441                            debug("peek, ignoring [{0}]", seeks[i]);
442                        }
443                        else {
444                            e = seeks[i];
445                        }
446                    }
447                    if (e != null) {
448                        debug("peek(): choosing [{0}]", e);
449                    }
450                    if (e == null) {
451                        int first;
452                        for (first = 0; e == null && first < priorities; first++) {
453                            e = seeks[first];
454                        }
455                        if (e != null) {
456                            debug("peek(): initial choosing [{0}]", e);
457                        }
458                        for (int i = first; i < priorities; i++) {
459                            QueueElement<E> ee = seeks[i];
460                            if (ee != null && ee.getDelay(TimeUnit.MILLISECONDS) < e.getDelay(TimeUnit.MILLISECONDS)) {
461                                debug("peek(): choosing [{0}] over [{1}]", ee, e);
462                                e = ee;
463                            }
464                        }
465                    }
466                }
467                if (e != null) {
468                    debug("peek(): [{0}], from P[{1}]", e.getElement().toString(), e.getPriority());
469                }
470                else {
471                    debug("peek(): NULL");
472                }
473                return e;
474            }
475            finally {
476                lock.unlock();
477            }
478        }
479    
480        /**
481         * Run the anti-starvation check every {@link #ANTI_STARVATION_INTERVAL} milliseconds.
482         * <p/>
483         * It promotes elements beyond max wait time to the next higher priority sub-queue.
484         */
485        protected void antiStarvation() {
486            long now = System.currentTimeMillis();
487            if (now - lastAntiStarvationCheck > ANTI_STARVATION_INTERVAL) {
488                for (int i = 0; i < queues.length - 1; i++) {
489                    antiStarvation(queues[i], queues[i + 1], "from P[" + i + "] to P[" + (i + 1) + "]");
490                }
491                StringBuilder sb = new StringBuilder();
492                for (int i = 0; i < queues.length; i++) {
493                    sb.append("P[").append(i).append("]=").append(queues[i].size()).append(" ");
494                }
495                debug("sub-queue sizes: {0}", sb.toString());
496                lastAntiStarvationCheck = System.currentTimeMillis();
497            }
498        }
499    
500        /**
501         * Promote elements beyond max wait time from a lower priority sub-queue to a higher priority sub-queue.
502         *
503         * @param lowerQ lower priority sub-queue.
504         * @param higherQ higher priority sub-queue.
505         * @param msg sub-queues msg (from-to) for debugging purposes.
506         */
507        private void antiStarvation(DelayQueue<QueueElement<E>> lowerQ, DelayQueue<QueueElement<E>> higherQ, String msg) {
508            int moved = 0;
509            QueueElement<E> e = lowerQ.poll();
510            while (e != null && e.getDelay(TimeUnit.MILLISECONDS) < -maxWait) {
511                e.setDelay(0, TimeUnit.MILLISECONDS);
512                if (!higherQ.offer(e)) {
513                    throw new IllegalStateException("Could not move element to higher sub-queue, element rejected");
514                }
515                e.priority++;
516                e = lowerQ.poll();
517                moved++;
518            }
519            if (e != null) {
520                if (!lowerQ.offer(e)) {
521                    throw new IllegalStateException("Could not reinsert element to current sub-queue, element rejected");
522                }
523            }
524            debug("anti-starvation, moved {0} element(s) {1}", moved, msg);
525        }
526    
527        /**
528         * Method for debugging purposes. This implementation is a <tt>NOP</tt>.
529         * <p/>
530         * This method should be overriden for logging purposes.
531         * <p/>
532         * Message templates used by this class are in JDK's <tt>MessageFormat</tt> syntax.
533         *
534         * @param msgTemplate message template.
535         * @param msgArgs arguments for the message template.
536         */
537        protected void debug(String msgTemplate, Object... msgArgs) {
538        }
539    
540        /**
541         * Insert the specified element into this queue, waiting if necessary
542         * for space to become available.
543         * <p/>
544         * NOTE: This method is to fulfill the <tt>BlockingQueue<tt/> interface. Not implemented in the most optimal way.
545         *
546         * @param e the element to add
547         * @throws InterruptedException if interrupted while waiting
548         * @throws ClassCastException if the class of the specified element
549         *         prevents it from being added to this queue
550         * @throws NullPointerException if the specified element is null
551         * @throws IllegalArgumentException if some property of the specified
552         *         element prevents it from being added to this queue
553         */
554        @Override
555        public void put(QueueElement<E> e) throws InterruptedException {
556            while (!offer(e, true)) {
557                Thread.sleep(10);
558            }
559        }
560    
561        /**
562         * Insert the specified element into this queue, waiting up to the
563         * specified wait time if necessary for space to become available.
564         * <p/>
565         * IMPORTANT: This implementation forces the addition of the element to the queue regardless
566         * of the queue current size. The timeout value is ignored as the element is added immediately.
567         * <p/>
568         * NOTE: This method is to fulfill the <tt>BlockingQueue<tt/> interface. Not implemented in the most optimal way.
569         *
570         * @param e the element to add
571         * @param timeout how long to wait before giving up, in units of
572         *        <tt>unit</tt>
573         * @param unit a <tt>TimeUnit</tt> determining how to interpret the
574         *        <tt>timeout</tt> parameter
575         * @return <tt>true</tt> if successful, or <tt>false</tt> if
576         *         the specified waiting time elapses before space is available
577         * @throws InterruptedException if interrupted while waiting
578         * @throws ClassCastException if the class of the specified element
579         *         prevents it from being added to this queue
580         * @throws NullPointerException if the specified element is null
581         * @throws IllegalArgumentException if some property of the specified
582         *         element prevents it from being added to this queue
583         */
584        @Override
585        public boolean offer(QueueElement<E> e, long timeout, TimeUnit unit) throws InterruptedException {
586            return offer(e, true);
587        }
588    
589        /**
590         * Retrieve and removes the head of this queue, waiting if necessary
591         * until an element becomes available.
592         * <p/>
593         * IMPORTANT: This implementation has a delay of up to 10ms (when the queue is empty) to detect a new element
594         * is available. It is doing a 10ms sleep.
595         * <p/>
596         * NOTE: This method is to fulfill the <tt>BlockingQueue<tt/> interface. Not implemented in the most optimal way.
597         *
598         * @return the head of this queue
599         * @throws InterruptedException if interrupted while waiting
600         */
601        @Override
602        public QueueElement<E> take() throws InterruptedException {
603            QueueElement<E> e = poll();
604            while (e == null) {
605                Thread.sleep(10);
606                e = poll();
607            }
608            return e;
609        }
610    
611        /**
612         * Retrieve and removes the head of this queue, waiting up to the
613         * specified wait time if necessary for an element to become available.
614         * <p/>
615         * NOTE: This method is to fulfill the <tt>BlockingQueue<tt/> interface. Not implemented in the most optimal way.
616         *
617         * @param timeout how long to wait before giving up, in units of
618         *        <tt>unit</tt>
619         * @param unit a <tt>TimeUnit</tt> determining how to interpret the
620         *        <tt>timeout</tt> parameter
621         * @return the head of this queue, or <tt>null</tt> if the
622         *         specified waiting time elapses before an element is available
623         * @throws InterruptedException if interrupted while waiting
624         */
625        @Override
626        public QueueElement<E> poll(long timeout, TimeUnit unit) throws InterruptedException {
627            QueueElement<E> e = poll();
628            long time = System.currentTimeMillis() + unit.toMillis(timeout);
629            while (e == null && time > System.currentTimeMillis()) {
630                Thread.sleep(10);
631                e = poll();
632            }
633            return poll();
634        }
635    
636        /**
637         * Return the number of additional elements that this queue can ideally
638         * (in the absence of memory or resource constraints) accept without
639         * blocking, or <tt>Integer.MAX_VALUE</tt> if there is no intrinsic
640         * limit.
641         *
642         * <p>Note that you <em>cannot</em> always tell if an attempt to insert
643         * an element will succeed by inspecting <tt>remainingCapacity</tt>
644         * because it may be the case that another thread is about to
645         * insert or remove an element.
646         * <p/>
647         * NOTE: This method is to fulfill the <tt>BlockingQueue<tt/> interface. Not implemented in the most optimal way.
648         *
649         * @return the remaining capacity
650         */
651        @Override
652        public int remainingCapacity() {
653            return (maxSize == -1) ? -1 : maxSize - size();
654        }
655    
656        /**
657         * Remove all available elements from this queue and adds them
658         * to the given collection.  This operation may be more
659         * efficient than repeatedly polling this queue.  A failure
660         * encountered while attempting to add elements to
661         * collection <tt>c</tt> may result in elements being in neither,
662         * either or both collections when the associated exception is
663         * thrown.  Attempt to drain a queue to itself result in
664         * <tt>IllegalArgumentException</tt>. Further, the behavior of
665         * this operation is undefined if the specified collection is
666         * modified while the operation is in progress.
667         * <p/>
668         * NOTE: This method is to fulfill the <tt>BlockingQueue<tt/> interface. Not implemented in the most optimal way.
669         *
670         * @param c the collection to transfer elements into
671         * @return the number of elements transferred
672         * @throws UnsupportedOperationException if addition of elements
673         *         is not supported by the specified collection
674         * @throws ClassCastException if the class of an element of this queue
675         *         prevents it from being added to the specified collection
676         * @throws NullPointerException if the specified collection is null
677         * @throws IllegalArgumentException if the specified collection is this
678         *         queue, or some property of an element of this queue prevents
679         *         it from being added to the specified collection
680         */
681        @Override
682        public int drainTo(Collection<? super QueueElement<E>> c) {
683            int count = 0;
684            for (DelayQueue<QueueElement<E>> q : queues) {
685                count += q.drainTo(c);
686            }
687            return count;
688        }
689    
690        /**
691         * Remove at most the given number of available elements from
692         * this queue and adds them to the given collection.  A failure
693         * encountered while attempting to add elements to
694         * collection <tt>c</tt> may result in elements being in neither,
695         * either or both collections when the associated exception is
696         * thrown.  Attempt to drain a queue to itself result in
697         * <tt>IllegalArgumentException</tt>. Further, the behavior of
698         * this operation is undefined if the specified collection is
699         * modified while the operation is in progress.
700         * <p/>
701         * NOTE: This method is to fulfill the <tt>BlockingQueue<tt/> interface. Not implemented in the most optimal way.
702         *
703         * @param c the collection to transfer elements into
704         * @param maxElements the maximum number of elements to transfer
705         * @return the number of elements transferred
706         * @throws UnsupportedOperationException if addition of elements
707         *         is not supported by the specified collection
708         * @throws ClassCastException if the class of an element of this queue
709         *         prevents it from being added to the specified collection
710         * @throws NullPointerException if the specified collection is null
711         * @throws IllegalArgumentException if the specified collection is this
712         *         queue, or some property of an element of this queue prevents
713         *         it from being added to the specified collection
714         */
715        @Override
716        public int drainTo(Collection<? super QueueElement<E>> c, int maxElements) {
717            int left = maxElements;
718            int count = 0;
719            for (DelayQueue<QueueElement<E>> q : queues) {
720                int drained = q.drainTo(c, left);
721                count += drained;
722                left -= drained;
723            }
724            return count;
725        }
726    
727        /**
728         * Removes all of the elements from this queue. The queue will be empty after this call returns.
729         */
730        @Override
731        public void clear() {
732            for (DelayQueue<QueueElement<E>> q : queues) {
733                q.clear();
734            }
735        }
736    }