This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.util;
019
020 import java.util.AbstractQueue;
021 import java.util.ArrayList;
022 import java.util.Arrays;
023 import java.util.Collection;
024 import java.util.ConcurrentModificationException;
025 import java.util.Iterator;
026 import java.util.List;
027 import java.util.concurrent.BlockingQueue;
028 import java.util.concurrent.DelayQueue;
029 import java.util.concurrent.Delayed;
030 import java.util.concurrent.TimeUnit;
031 import java.util.concurrent.atomic.AtomicInteger;
032 import java.util.concurrent.locks.ReentrantLock;
033
034 /**
035 * A Queue implementation that support queuing elements into the future and priority queuing.
036 * <p/>
037 * The {@link PriorityDelayQueue} avoids starvation by raising elements priority as they age.
038 * <p/>
039 * To support queuing elements into the future, the JDK <code>DelayQueue</code> is used.
040 * <p/>
041 * To support priority queuing, an array of <code>DelayQueue</code> sub-queues is used. Elements are consumed from the
042 * higher priority sub-queues first. From a sub-queue, elements are available based on their age.
043 * <p/>
044 * To avoid starvation, there is is maximum wait time for an an element in a sub-queue, after the maximum wait time has
045 * elapsed, the element is promoted to the next higher priority sub-queue. Eventually it will reach the maximum priority
046 * sub-queue and it will be consumed when it is the oldest element in the that sub-queue.
047 * <p/>
048 * Every time an element is promoted to a higher priority sub-queue, a new maximum wait time applies.
049 * <p/>
050 * This class does not use a separate thread for anti-starvation check, instead, the check is performed on polling and
051 * seeking operations. This check is performed, the most every 1/2 second.
052 */
053 public class PriorityDelayQueue<E> extends AbstractQueue<PriorityDelayQueue.QueueElement<E>>
054 implements BlockingQueue<PriorityDelayQueue.QueueElement<E>> {
055
056 /**
057 * Element wrapper required by the queue.
058 * <p/>
059 * This wrapper keeps track of the priority and the age of a queue element.
060 */
061 public static class QueueElement<E> implements Delayed {
062 private E element;
063 private int priority;
064 private long baseTime;
065 boolean inQueue;
066
067 /**
068 * Create an Element wrapper.
069 *
070 * @param element element.
071 * @param priority priority of the element.
072 * @param delay delay of the element.
073 * @param unit time unit of the delay.
074 *
075 * @throws IllegalArgumentException if the element is <tt>NULL</tt>, the priority is negative or if the delay is
076 * negative.
077 */
078 public QueueElement(E element, int priority, long delay, TimeUnit unit) {
079 if (element == null) {
080 throw new IllegalArgumentException("element cannot be null");
081 }
082 if (priority < 0) {
083 throw new IllegalArgumentException("priority cannot be negative, [" + element + "]");
084 }
085 if (delay < 0) {
086 throw new IllegalArgumentException("delay cannot be negative");
087 }
088 this.element = element;
089 this.priority = priority;
090 setDelay(delay, unit);
091 }
092
093 /**
094 * Create an Element wrapper with no delay and minimum priority.
095 *
096 * @param element element.
097 */
098 public QueueElement(E element) {
099 this(element, 0, 0, TimeUnit.MILLISECONDS);
100 }
101
102 /**
103 * Return the element from the wrapper.
104 *
105 * @return the element.
106 */
107 public E getElement() {
108 return element;
109 }
110
111 /**
112 * Return the priority of the element.
113 *
114 * @return the priority of the element.
115 */
116 public int getPriority() {
117 return priority;
118 }
119
120 /**
121 * Set the delay of the element.
122 *
123 * @param delay delay of the element.
124 * @param unit time unit of the delay.
125 */
126 public void setDelay(long delay, TimeUnit unit) {
127 baseTime = System.currentTimeMillis() + unit.toMillis(delay);
128 }
129
130 /**
131 * Return the delay of the element.
132 *
133 * @param unit time unit of the delay.
134 *
135 * @return the delay in the specified time unit.
136 */
137 public long getDelay(TimeUnit unit) {
138 return unit.convert(baseTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
139 }
140
141 /**
142 * Compare the age of this wrapper element with another. The priority is not used for the comparision.
143 *
144 * @param o the other wrapper element to compare with.
145 *
146 * @return less than zero if this wrapper is older, zero if both wrapper elements have the same age, greater
147 * than zero if the parameter wrapper element is older.
148 */
149 public int compareTo(Delayed o) {
150 long diff = (getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
151 if(diff > 0) {
152 return 1;
153 } else if(diff < 0) {
154 return -1;
155 } else {
156 return 0;
157 }
158 }
159
160 /**
161 * Return the string representation of the wrapper element.
162 *
163 * @return the string representation of the wrapper element.
164 */
165 @Override
166 public String toString() {
167 StringBuilder sb = new StringBuilder();
168 sb.append("[").append(element).append("] priority=").append(priority).append(" delay=").
169 append(getDelay(TimeUnit.MILLISECONDS));
170 return sb.toString();
171 }
172
173 }
174
175 /**
176 * Frequency, in milliseconds, of the anti-starvation check.
177 */
178 public static final long ANTI_STARVATION_INTERVAL = 500;
179
180 protected int priorities;
181 protected DelayQueue<QueueElement<E>>[] queues;
182 protected transient final ReentrantLock lock = new ReentrantLock();
183 private transient long lastAntiStarvationCheck = 0;
184 private long maxWait;
185 private int maxSize;
186 protected AtomicInteger currentSize;
187
188 /**
189 * Create a <code>PriorityDelayQueue</code>.
190 *
191 * @param priorities number of priorities the queue will support.
192 * @param maxWait max wait time for elements before they are promoted to the next higher priority.
193 * @param unit time unit of the max wait time.
194 * @param maxSize maximum size of the queue, -1 means unbounded.
195 */
196 @SuppressWarnings("unchecked")
197 public PriorityDelayQueue(int priorities, long maxWait, TimeUnit unit, int maxSize) {
198 if (priorities < 1) {
199 throw new IllegalArgumentException("priorities must be 1 or more");
200 }
201 if (maxWait < 0) {
202 throw new IllegalArgumentException("maxWait must be greater than 0");
203 }
204 if (maxSize < -1 || maxSize == 0) {
205 throw new IllegalArgumentException("maxSize must be -1 or greater than 0");
206 }
207 this.priorities = priorities;
208 queues = new DelayQueue[priorities];
209 for (int i = 0; i < priorities; i++) {
210 queues[i] = new DelayQueue<QueueElement<E>>();
211 }
212 this.maxWait = unit.toMillis(maxWait);
213 this.maxSize = maxSize;
214 if (maxSize != -1) {
215 currentSize = new AtomicInteger();
216 }
217 }
218
219 /**
220 * Return number of priorities the queue supports.
221 *
222 * @return number of priorities the queue supports.
223 */
224 public int getPriorities() {
225 return priorities;
226 }
227
228 /**
229 * Return the max wait time for elements before they are promoted to the next higher priority.
230 *
231 * @param unit time unit of the max wait time.
232 *
233 * @return the max wait time in the specified time unit.
234 */
235 public long getMaxWait(TimeUnit unit) {
236 return unit.convert(maxWait, TimeUnit.MILLISECONDS);
237 }
238
239 /**
240 * Return the maximum queue size.
241 *
242 * @return the maximum queue size. If <code>-1</code> the queue is unbounded.
243 */
244 public long getMaxSize() {
245 return maxSize;
246 }
247
248 /**
249 * Return an iterator over all the {@link QueueElement} elements (both expired and unexpired) in this queue. The
250 * iterator does not return the elements in any particular order. The returned <tt>Iterator</tt> is a "weakly
251 * consistent" iterator that will never throw {@link ConcurrentModificationException}, and guarantees to traverse
252 * elements as they existed upon construction of the iterator, and may (but is not guaranteed to) reflect any
253 * modifications subsequent to construction.
254 *
255 * @return an iterator over the {@link QueueElement} elements in this queue.
256 */
257 @Override
258 @SuppressWarnings("unchecked")
259 public Iterator<QueueElement<E>> iterator() {
260 QueueElement[][] queueElements = new QueueElement[queues.length][];
261 try {
262 lock.lock();
263 for (int i = 0; i < queues.length; i++) {
264 queueElements[i] = queues[i].toArray(new QueueElement[0]);
265 }
266 }
267 finally {
268 lock.unlock();
269 }
270 List<QueueElement<E>> list = new ArrayList<QueueElement<E>>();
271 for (QueueElement[] elements : queueElements) {
272 list.addAll(Arrays.asList((QueueElement<E>[]) elements));
273 }
274 return list.iterator();
275 }
276
277 /**
278 * Return the number of elements in the queue.
279 *
280 * @return the number of elements in the queue.
281 */
282 @Override
283 public int size() {
284 int size = 0;
285 for (DelayQueue<QueueElement<E>> queue : queues) {
286 size += queue.size();
287 }
288 return size;
289 }
290
291 /**
292 * Return the number of elements on each priority sub-queue.
293 *
294 * @return the number of elements on each priority sub-queue.
295 */
296 public int[] sizes() {
297 int[] sizes = new int[queues.length];
298 for (int i = 0; i < queues.length; i++) {
299 sizes[i] = queues[i].size();
300 }
301 return sizes;
302 }
303
304 /**
305 * Inserts the specified element into this queue if it is possible to do
306 * so immediately without violating capacity restrictions, returning
307 * <tt>true</tt> upon success and throwing an
308 * <tt>IllegalStateException</tt> if no space is currently available.
309 * When using a capacity-restricted queue, it is generally preferable to
310 * use {@link #offer(Object) offer}.
311 *
312 * @param queueElement the {@link QueueElement} element to add.
313 * @return <tt>true</tt> (as specified by {@link Collection#add})
314 * @throws IllegalStateException if the element cannot be added at this
315 * time due to capacity restrictions
316 * @throws ClassCastException if the class of the specified element
317 * prevents it from being added to this queue
318 * @throws NullPointerException if the specified element is null
319 * @throws IllegalArgumentException if some property of the specified
320 * element prevents it from being added to this queue
321 */
322 @Override
323 public boolean add(QueueElement<E> queueElement) {
324 return offer(queueElement, false);
325 }
326
327 /**
328 * Insert the specified {@link QueueElement} element into the queue.
329 *
330 * @param queueElement the {@link QueueElement} element to add.
331 * @param ignoreSize if the queue is bound to a maximum size and the maximum size is reached, this parameter (if set
332 * to <tt>true</tt>) allows to ignore the maximum size and add the element to the queue.
333 *
334 * @return <tt>true</tt> if the element has been inserted, <tt>false</tt> if the element was not inserted (the queue
335 * has reached its maximum size).
336 *
337 * @throws NullPointerException if the specified element is null
338 */
339 boolean offer(QueueElement<E> queueElement, boolean ignoreSize) {
340 if (queueElement == null) {
341 throw new NullPointerException("queueElement is NULL");
342 }
343 if (queueElement.getPriority() < 0 && queueElement.getPriority() >= priorities) {
344 throw new IllegalArgumentException("priority out of range");
345 }
346 if (queueElement.inQueue) {
347 throw new IllegalStateException("queueElement already in a queue");
348 }
349 if (!ignoreSize && currentSize != null && currentSize.get() >= maxSize) {
350 return false;
351 }
352 boolean accepted = queues[queueElement.getPriority()].offer(queueElement);
353 debug("offer([{0}]), to P[{1}] delay[{2}ms] accepted[{3}]", queueElement.getElement().toString(),
354 queueElement.getPriority(), queueElement.getDelay(TimeUnit.MILLISECONDS), accepted);
355 if (accepted) {
356 if (currentSize != null) {
357 currentSize.incrementAndGet();
358 }
359 queueElement.inQueue = true;
360 }
361 return accepted;
362 }
363
364 /**
365 * Insert the specified element into the queue.
366 * <p/>
367 * The element is added with minimun priority and no delay.
368 *
369 * @param queueElement the element to add.
370 *
371 * @return <tt>true</tt> if the element has been inserted, <tt>false</tt> if the element was not inserted (the queue
372 * has reached its maximum size).
373 *
374 * @throws NullPointerException if the specified element is null
375 */
376 @Override
377 public boolean offer(QueueElement<E> queueElement) {
378 return offer(queueElement, false);
379 }
380
381 /**
382 * Retrieve and remove the head of this queue, or return <tt>null</tt> if this queue has no elements with an expired
383 * delay.
384 * <p/>
385 * The retrieved element is the oldest one from the highest priority sub-queue.
386 * <p/>
387 * Invocations to this method run the anti-starvation (once every interval check).
388 *
389 * @return the head of this queue, or <tt>null</tt> if this queue has no elements with an expired delay.
390 */
391 @Override
392 public QueueElement<E> poll() {
393 try {
394 lock.lock();
395 antiStarvation();
396 QueueElement<E> e = null;
397 int i = priorities;
398 for (; e == null && i > 0; i--) {
399 e = queues[i - 1].poll();
400 }
401 if (e != null) {
402 if (currentSize != null) {
403 currentSize.decrementAndGet();
404 }
405 e.inQueue = false;
406 debug("poll(): [{0}], from P[{1}]", e.getElement().toString(), i);
407 }
408 return e;
409 }
410 finally {
411 lock.unlock();
412 }
413 }
414
415 /**
416 * Retrieve, but does not remove, the head of this queue, or returns <tt>null</tt> if this queue is empty. Unlike
417 * <tt>poll</tt>, if no expired elements are available in the queue, this method returns the element that will
418 * expire next, if one exists.
419 *
420 * @return the head of this queue, or <tt>null</tt> if this queue is empty.
421 */
422 @Override
423 public QueueElement<E> peek() {
424 try {
425 lock.lock();
426 antiStarvation();
427 QueueElement<E> e = null;
428
429 QueueElement<E> [] seeks = new QueueElement[priorities];
430 boolean foundElement = false;
431 for (int i = priorities - 1; i > -1; i--) {
432 e = queues[i].peek();
433 debug("peek(): considering [{0}] from P[{1}]", e, i);
434 seeks[priorities - i - 1] = e;
435 foundElement |= e != null;
436 }
437 if (foundElement) {
438 e = null;
439 for (int i = 0; e == null && i < priorities; i++) {
440 if (seeks[i] != null && seeks[i].getDelay(TimeUnit.MILLISECONDS) > 0) {
441 debug("peek, ignoring [{0}]", seeks[i]);
442 }
443 else {
444 e = seeks[i];
445 }
446 }
447 if (e != null) {
448 debug("peek(): choosing [{0}]", e);
449 }
450 if (e == null) {
451 int first;
452 for (first = 0; e == null && first < priorities; first++) {
453 e = seeks[first];
454 }
455 if (e != null) {
456 debug("peek(): initial choosing [{0}]", e);
457 }
458 for (int i = first; i < priorities; i++) {
459 QueueElement<E> ee = seeks[i];
460 if (ee != null && ee.getDelay(TimeUnit.MILLISECONDS) < e.getDelay(TimeUnit.MILLISECONDS)) {
461 debug("peek(): choosing [{0}] over [{1}]", ee, e);
462 e = ee;
463 }
464 }
465 }
466 }
467 if (e != null) {
468 debug("peek(): [{0}], from P[{1}]", e.getElement().toString(), e.getPriority());
469 }
470 else {
471 debug("peek(): NULL");
472 }
473 return e;
474 }
475 finally {
476 lock.unlock();
477 }
478 }
479
480 /**
481 * Run the anti-starvation check every {@link #ANTI_STARVATION_INTERVAL} milliseconds.
482 * <p/>
483 * It promotes elements beyond max wait time to the next higher priority sub-queue.
484 */
485 protected void antiStarvation() {
486 long now = System.currentTimeMillis();
487 if (now - lastAntiStarvationCheck > ANTI_STARVATION_INTERVAL) {
488 for (int i = 0; i < queues.length - 1; i++) {
489 antiStarvation(queues[i], queues[i + 1], "from P[" + i + "] to P[" + (i + 1) + "]");
490 }
491 StringBuilder sb = new StringBuilder();
492 for (int i = 0; i < queues.length; i++) {
493 sb.append("P[").append(i).append("]=").append(queues[i].size()).append(" ");
494 }
495 debug("sub-queue sizes: {0}", sb.toString());
496 lastAntiStarvationCheck = System.currentTimeMillis();
497 }
498 }
499
500 /**
501 * Promote elements beyond max wait time from a lower priority sub-queue to a higher priority sub-queue.
502 *
503 * @param lowerQ lower priority sub-queue.
504 * @param higherQ higher priority sub-queue.
505 * @param msg sub-queues msg (from-to) for debugging purposes.
506 */
507 private void antiStarvation(DelayQueue<QueueElement<E>> lowerQ, DelayQueue<QueueElement<E>> higherQ, String msg) {
508 int moved = 0;
509 QueueElement<E> e = lowerQ.poll();
510 while (e != null && e.getDelay(TimeUnit.MILLISECONDS) < -maxWait) {
511 e.setDelay(0, TimeUnit.MILLISECONDS);
512 if (!higherQ.offer(e)) {
513 throw new IllegalStateException("Could not move element to higher sub-queue, element rejected");
514 }
515 e.priority++;
516 e = lowerQ.poll();
517 moved++;
518 }
519 if (e != null) {
520 if (!lowerQ.offer(e)) {
521 throw new IllegalStateException("Could not reinsert element to current sub-queue, element rejected");
522 }
523 }
524 debug("anti-starvation, moved {0} element(s) {1}", moved, msg);
525 }
526
527 /**
528 * Method for debugging purposes. This implementation is a <tt>NOP</tt>.
529 * <p/>
530 * This method should be overriden for logging purposes.
531 * <p/>
532 * Message templates used by this class are in JDK's <tt>MessageFormat</tt> syntax.
533 *
534 * @param msgTemplate message template.
535 * @param msgArgs arguments for the message template.
536 */
537 protected void debug(String msgTemplate, Object... msgArgs) {
538 }
539
540 /**
541 * Insert the specified element into this queue, waiting if necessary
542 * for space to become available.
543 * <p/>
544 * NOTE: This method is to fulfill the <tt>BlockingQueue<tt/> interface. Not implemented in the most optimal way.
545 *
546 * @param e the element to add
547 * @throws InterruptedException if interrupted while waiting
548 * @throws ClassCastException if the class of the specified element
549 * prevents it from being added to this queue
550 * @throws NullPointerException if the specified element is null
551 * @throws IllegalArgumentException if some property of the specified
552 * element prevents it from being added to this queue
553 */
554 @Override
555 public void put(QueueElement<E> e) throws InterruptedException {
556 while (!offer(e, true)) {
557 Thread.sleep(10);
558 }
559 }
560
561 /**
562 * Insert the specified element into this queue, waiting up to the
563 * specified wait time if necessary for space to become available.
564 * <p/>
565 * IMPORTANT: This implementation forces the addition of the element to the queue regardless
566 * of the queue current size. The timeout value is ignored as the element is added immediately.
567 * <p/>
568 * NOTE: This method is to fulfill the <tt>BlockingQueue<tt/> interface. Not implemented in the most optimal way.
569 *
570 * @param e the element to add
571 * @param timeout how long to wait before giving up, in units of
572 * <tt>unit</tt>
573 * @param unit a <tt>TimeUnit</tt> determining how to interpret the
574 * <tt>timeout</tt> parameter
575 * @return <tt>true</tt> if successful, or <tt>false</tt> if
576 * the specified waiting time elapses before space is available
577 * @throws InterruptedException if interrupted while waiting
578 * @throws ClassCastException if the class of the specified element
579 * prevents it from being added to this queue
580 * @throws NullPointerException if the specified element is null
581 * @throws IllegalArgumentException if some property of the specified
582 * element prevents it from being added to this queue
583 */
584 @Override
585 public boolean offer(QueueElement<E> e, long timeout, TimeUnit unit) throws InterruptedException {
586 return offer(e, true);
587 }
588
589 /**
590 * Retrieve and removes the head of this queue, waiting if necessary
591 * until an element becomes available.
592 * <p/>
593 * IMPORTANT: This implementation has a delay of up to 10ms (when the queue is empty) to detect a new element
594 * is available. It is doing a 10ms sleep.
595 * <p/>
596 * NOTE: This method is to fulfill the <tt>BlockingQueue<tt/> interface. Not implemented in the most optimal way.
597 *
598 * @return the head of this queue
599 * @throws InterruptedException if interrupted while waiting
600 */
601 @Override
602 public QueueElement<E> take() throws InterruptedException {
603 QueueElement<E> e = poll();
604 while (e == null) {
605 Thread.sleep(10);
606 e = poll();
607 }
608 return e;
609 }
610
611 /**
612 * Retrieve and removes the head of this queue, waiting up to the
613 * specified wait time if necessary for an element to become available.
614 * <p/>
615 * NOTE: This method is to fulfill the <tt>BlockingQueue<tt/> interface. Not implemented in the most optimal way.
616 *
617 * @param timeout how long to wait before giving up, in units of
618 * <tt>unit</tt>
619 * @param unit a <tt>TimeUnit</tt> determining how to interpret the
620 * <tt>timeout</tt> parameter
621 * @return the head of this queue, or <tt>null</tt> if the
622 * specified waiting time elapses before an element is available
623 * @throws InterruptedException if interrupted while waiting
624 */
625 @Override
626 public QueueElement<E> poll(long timeout, TimeUnit unit) throws InterruptedException {
627 QueueElement<E> e = poll();
628 long time = System.currentTimeMillis() + unit.toMillis(timeout);
629 while (e == null && time > System.currentTimeMillis()) {
630 Thread.sleep(10);
631 e = poll();
632 }
633 return poll();
634 }
635
636 /**
637 * Return the number of additional elements that this queue can ideally
638 * (in the absence of memory or resource constraints) accept without
639 * blocking, or <tt>Integer.MAX_VALUE</tt> if there is no intrinsic
640 * limit.
641 *
642 * <p>Note that you <em>cannot</em> always tell if an attempt to insert
643 * an element will succeed by inspecting <tt>remainingCapacity</tt>
644 * because it may be the case that another thread is about to
645 * insert or remove an element.
646 * <p/>
647 * NOTE: This method is to fulfill the <tt>BlockingQueue<tt/> interface. Not implemented in the most optimal way.
648 *
649 * @return the remaining capacity
650 */
651 @Override
652 public int remainingCapacity() {
653 return (maxSize == -1) ? -1 : maxSize - size();
654 }
655
656 /**
657 * Remove all available elements from this queue and adds them
658 * to the given collection. This operation may be more
659 * efficient than repeatedly polling this queue. A failure
660 * encountered while attempting to add elements to
661 * collection <tt>c</tt> may result in elements being in neither,
662 * either or both collections when the associated exception is
663 * thrown. Attempt to drain a queue to itself result in
664 * <tt>IllegalArgumentException</tt>. Further, the behavior of
665 * this operation is undefined if the specified collection is
666 * modified while the operation is in progress.
667 * <p/>
668 * NOTE: This method is to fulfill the <tt>BlockingQueue<tt/> interface. Not implemented in the most optimal way.
669 *
670 * @param c the collection to transfer elements into
671 * @return the number of elements transferred
672 * @throws UnsupportedOperationException if addition of elements
673 * is not supported by the specified collection
674 * @throws ClassCastException if the class of an element of this queue
675 * prevents it from being added to the specified collection
676 * @throws NullPointerException if the specified collection is null
677 * @throws IllegalArgumentException if the specified collection is this
678 * queue, or some property of an element of this queue prevents
679 * it from being added to the specified collection
680 */
681 @Override
682 public int drainTo(Collection<? super QueueElement<E>> c) {
683 int count = 0;
684 for (DelayQueue<QueueElement<E>> q : queues) {
685 count += q.drainTo(c);
686 }
687 return count;
688 }
689
690 /**
691 * Remove at most the given number of available elements from
692 * this queue and adds them to the given collection. A failure
693 * encountered while attempting to add elements to
694 * collection <tt>c</tt> may result in elements being in neither,
695 * either or both collections when the associated exception is
696 * thrown. Attempt to drain a queue to itself result in
697 * <tt>IllegalArgumentException</tt>. Further, the behavior of
698 * this operation is undefined if the specified collection is
699 * modified while the operation is in progress.
700 * <p/>
701 * NOTE: This method is to fulfill the <tt>BlockingQueue<tt/> interface. Not implemented in the most optimal way.
702 *
703 * @param c the collection to transfer elements into
704 * @param maxElements the maximum number of elements to transfer
705 * @return the number of elements transferred
706 * @throws UnsupportedOperationException if addition of elements
707 * is not supported by the specified collection
708 * @throws ClassCastException if the class of an element of this queue
709 * prevents it from being added to the specified collection
710 * @throws NullPointerException if the specified collection is null
711 * @throws IllegalArgumentException if the specified collection is this
712 * queue, or some property of an element of this queue prevents
713 * it from being added to the specified collection
714 */
715 @Override
716 public int drainTo(Collection<? super QueueElement<E>> c, int maxElements) {
717 int left = maxElements;
718 int count = 0;
719 for (DelayQueue<QueueElement<E>> q : queues) {
720 int drained = q.drainTo(c, left);
721 count += drained;
722 left -= drained;
723 }
724 return count;
725 }
726
727 /**
728 * Removes all of the elements from this queue. The queue will be empty after this call returns.
729 */
730 @Override
731 public void clear() {
732 for (DelayQueue<QueueElement<E>> q : queues) {
733 q.clear();
734 }
735 }
736 }