001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.util; 019 020 import java.util.AbstractQueue; 021 import java.util.ArrayList; 022 import java.util.Arrays; 023 import java.util.Collection; 024 import java.util.ConcurrentModificationException; 025 import java.util.Iterator; 026 import java.util.List; 027 import java.util.concurrent.BlockingQueue; 028 import java.util.concurrent.DelayQueue; 029 import java.util.concurrent.Delayed; 030 import java.util.concurrent.TimeUnit; 031 import java.util.concurrent.atomic.AtomicInteger; 032 import java.util.concurrent.locks.ReentrantLock; 033 034 /** 035 * A Queue implementation that support queuing elements into the future and priority queuing. 036 * <p/> 037 * The {@link PriorityDelayQueue} avoids starvation by raising elements priority as they age. 038 * <p/> 039 * To support queuing elements into the future, the JDK <code>DelayQueue</code> is used. 040 * <p/> 041 * To support priority queuing, an array of <code>DelayQueue</code> sub-queues is used. Elements are consumed from the 042 * higher priority sub-queues first. From a sub-queue, elements are available based on their age. 043 * <p/> 044 * To avoid starvation, there is is maximum wait time for an an element in a sub-queue, after the maximum wait time has 045 * elapsed, the element is promoted to the next higher priority sub-queue. Eventually it will reach the maximum priority 046 * sub-queue and it will be consumed when it is the oldest element in the that sub-queue. 047 * <p/> 048 * Every time an element is promoted to a higher priority sub-queue, a new maximum wait time applies. 049 * <p/> 050 * This class does not use a separate thread for anti-starvation check, instead, the check is performed on polling and 051 * seeking operations. This check is performed, the most every 1/2 second. 052 */ 053 public class PriorityDelayQueue<E> extends AbstractQueue<PriorityDelayQueue.QueueElement<E>> 054 implements BlockingQueue<PriorityDelayQueue.QueueElement<E>> { 055 056 /** 057 * Element wrapper required by the queue. 058 * <p/> 059 * This wrapper keeps track of the priority and the age of a queue element. 060 */ 061 public static class QueueElement<E> implements Delayed { 062 private E element; 063 private int priority; 064 private long baseTime; 065 boolean inQueue; 066 067 /** 068 * Create an Element wrapper. 069 * 070 * @param element element. 071 * @param priority priority of the element. 072 * @param delay delay of the element. 073 * @param unit time unit of the delay. 074 * 075 * @throws IllegalArgumentException if the element is <tt>NULL</tt>, the priority is negative or if the delay is 076 * negative. 077 */ 078 public QueueElement(E element, int priority, long delay, TimeUnit unit) { 079 if (element == null) { 080 throw new IllegalArgumentException("element cannot be null"); 081 } 082 if (priority < 0) { 083 throw new IllegalArgumentException("priority cannot be negative, [" + element + "]"); 084 } 085 if (delay < 0) { 086 throw new IllegalArgumentException("delay cannot be negative"); 087 } 088 this.element = element; 089 this.priority = priority; 090 setDelay(delay, unit); 091 } 092 093 /** 094 * Create an Element wrapper with no delay and minimum priority. 095 * 096 * @param element element. 097 */ 098 public QueueElement(E element) { 099 this(element, 0, 0, TimeUnit.MILLISECONDS); 100 } 101 102 /** 103 * Return the element from the wrapper. 104 * 105 * @return the element. 106 */ 107 public E getElement() { 108 return element; 109 } 110 111 /** 112 * Return the priority of the element. 113 * 114 * @return the priority of the element. 115 */ 116 public int getPriority() { 117 return priority; 118 } 119 120 /** 121 * Set the delay of the element. 122 * 123 * @param delay delay of the element. 124 * @param unit time unit of the delay. 125 */ 126 public void setDelay(long delay, TimeUnit unit) { 127 baseTime = System.currentTimeMillis() + unit.toMillis(delay); 128 } 129 130 /** 131 * Return the delay of the element. 132 * 133 * @param unit time unit of the delay. 134 * 135 * @return the delay in the specified time unit. 136 */ 137 public long getDelay(TimeUnit unit) { 138 return unit.convert(baseTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); 139 } 140 141 /** 142 * Compare the age of this wrapper element with another. The priority is not used for the comparision. 143 * 144 * @param o the other wrapper element to compare with. 145 * 146 * @return less than zero if this wrapper is older, zero if both wrapper elements have the same age, greater 147 * than zero if the parameter wrapper element is older. 148 */ 149 public int compareTo(Delayed o) { 150 long diff = (getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS)); 151 if(diff > 0) { 152 return 1; 153 } else if(diff < 0) { 154 return -1; 155 } else { 156 return 0; 157 } 158 } 159 160 /** 161 * Return the string representation of the wrapper element. 162 * 163 * @return the string representation of the wrapper element. 164 */ 165 @Override 166 public String toString() { 167 StringBuilder sb = new StringBuilder(); 168 sb.append("[").append(element).append("] priority=").append(priority).append(" delay="). 169 append(getDelay(TimeUnit.MILLISECONDS)); 170 return sb.toString(); 171 } 172 173 } 174 175 /** 176 * Frequency, in milliseconds, of the anti-starvation check. 177 */ 178 public static final long ANTI_STARVATION_INTERVAL = 500; 179 180 protected int priorities; 181 protected DelayQueue<QueueElement<E>>[] queues; 182 protected transient final ReentrantLock lock = new ReentrantLock(); 183 private transient long lastAntiStarvationCheck = 0; 184 private long maxWait; 185 private int maxSize; 186 protected AtomicInteger currentSize; 187 188 /** 189 * Create a <code>PriorityDelayQueue</code>. 190 * 191 * @param priorities number of priorities the queue will support. 192 * @param maxWait max wait time for elements before they are promoted to the next higher priority. 193 * @param unit time unit of the max wait time. 194 * @param maxSize maximum size of the queue, -1 means unbounded. 195 */ 196 @SuppressWarnings("unchecked") 197 public PriorityDelayQueue(int priorities, long maxWait, TimeUnit unit, int maxSize) { 198 if (priorities < 1) { 199 throw new IllegalArgumentException("priorities must be 1 or more"); 200 } 201 if (maxWait < 0) { 202 throw new IllegalArgumentException("maxWait must be greater than 0"); 203 } 204 if (maxSize < -1 || maxSize == 0) { 205 throw new IllegalArgumentException("maxSize must be -1 or greater than 0"); 206 } 207 this.priorities = priorities; 208 queues = new DelayQueue[priorities]; 209 for (int i = 0; i < priorities; i++) { 210 queues[i] = new DelayQueue<QueueElement<E>>(); 211 } 212 this.maxWait = unit.toMillis(maxWait); 213 this.maxSize = maxSize; 214 if (maxSize != -1) { 215 currentSize = new AtomicInteger(); 216 } 217 } 218 219 /** 220 * Return number of priorities the queue supports. 221 * 222 * @return number of priorities the queue supports. 223 */ 224 public int getPriorities() { 225 return priorities; 226 } 227 228 /** 229 * Return the max wait time for elements before they are promoted to the next higher priority. 230 * 231 * @param unit time unit of the max wait time. 232 * 233 * @return the max wait time in the specified time unit. 234 */ 235 public long getMaxWait(TimeUnit unit) { 236 return unit.convert(maxWait, TimeUnit.MILLISECONDS); 237 } 238 239 /** 240 * Return the maximum queue size. 241 * 242 * @return the maximum queue size. If <code>-1</code> the queue is unbounded. 243 */ 244 public long getMaxSize() { 245 return maxSize; 246 } 247 248 /** 249 * Return an iterator over all the {@link QueueElement} elements (both expired and unexpired) in this queue. The 250 * iterator does not return the elements in any particular order. The returned <tt>Iterator</tt> is a "weakly 251 * consistent" iterator that will never throw {@link ConcurrentModificationException}, and guarantees to traverse 252 * elements as they existed upon construction of the iterator, and may (but is not guaranteed to) reflect any 253 * modifications subsequent to construction. 254 * 255 * @return an iterator over the {@link QueueElement} elements in this queue. 256 */ 257 @Override 258 @SuppressWarnings("unchecked") 259 public Iterator<QueueElement<E>> iterator() { 260 QueueElement[][] queueElements = new QueueElement[queues.length][]; 261 try { 262 lock.lock(); 263 for (int i = 0; i < queues.length; i++) { 264 queueElements[i] = queues[i].toArray(new QueueElement[0]); 265 } 266 } 267 finally { 268 lock.unlock(); 269 } 270 List<QueueElement<E>> list = new ArrayList<QueueElement<E>>(); 271 for (QueueElement[] elements : queueElements) { 272 list.addAll(Arrays.asList((QueueElement<E>[]) elements)); 273 } 274 return list.iterator(); 275 } 276 277 /** 278 * Return the number of elements in the queue. 279 * 280 * @return the number of elements in the queue. 281 */ 282 @Override 283 public int size() { 284 int size = 0; 285 for (DelayQueue<QueueElement<E>> queue : queues) { 286 size += queue.size(); 287 } 288 return size; 289 } 290 291 /** 292 * Return the number of elements on each priority sub-queue. 293 * 294 * @return the number of elements on each priority sub-queue. 295 */ 296 public int[] sizes() { 297 int[] sizes = new int[queues.length]; 298 for (int i = 0; i < queues.length; i++) { 299 sizes[i] = queues[i].size(); 300 } 301 return sizes; 302 } 303 304 /** 305 * Inserts the specified element into this queue if it is possible to do 306 * so immediately without violating capacity restrictions, returning 307 * <tt>true</tt> upon success and throwing an 308 * <tt>IllegalStateException</tt> if no space is currently available. 309 * When using a capacity-restricted queue, it is generally preferable to 310 * use {@link #offer(Object) offer}. 311 * 312 * @param queueElement the {@link QueueElement} element to add. 313 * @return <tt>true</tt> (as specified by {@link Collection#add}) 314 * @throws IllegalStateException if the element cannot be added at this 315 * time due to capacity restrictions 316 * @throws ClassCastException if the class of the specified element 317 * prevents it from being added to this queue 318 * @throws NullPointerException if the specified element is null 319 * @throws IllegalArgumentException if some property of the specified 320 * element prevents it from being added to this queue 321 */ 322 @Override 323 public boolean add(QueueElement<E> queueElement) { 324 return offer(queueElement, false); 325 } 326 327 /** 328 * Insert the specified {@link QueueElement} element into the queue. 329 * 330 * @param queueElement the {@link QueueElement} element to add. 331 * @param ignoreSize if the queue is bound to a maximum size and the maximum size is reached, this parameter (if set 332 * to <tt>true</tt>) allows to ignore the maximum size and add the element to the queue. 333 * 334 * @return <tt>true</tt> if the element has been inserted, <tt>false</tt> if the element was not inserted (the queue 335 * has reached its maximum size). 336 * 337 * @throws NullPointerException if the specified element is null 338 */ 339 boolean offer(QueueElement<E> queueElement, boolean ignoreSize) { 340 if (queueElement == null) { 341 throw new NullPointerException("queueElement is NULL"); 342 } 343 if (queueElement.getPriority() < 0 && queueElement.getPriority() >= priorities) { 344 throw new IllegalArgumentException("priority out of range"); 345 } 346 if (queueElement.inQueue) { 347 throw new IllegalStateException("queueElement already in a queue"); 348 } 349 if (!ignoreSize && currentSize != null && currentSize.get() >= maxSize) { 350 return false; 351 } 352 boolean accepted = queues[queueElement.getPriority()].offer(queueElement); 353 debug("offer([{0}]), to P[{1}] delay[{2}ms] accepted[{3}]", queueElement.getElement().toString(), 354 queueElement.getPriority(), queueElement.getDelay(TimeUnit.MILLISECONDS), accepted); 355 if (accepted) { 356 if (currentSize != null) { 357 currentSize.incrementAndGet(); 358 } 359 queueElement.inQueue = true; 360 } 361 return accepted; 362 } 363 364 /** 365 * Insert the specified element into the queue. 366 * <p/> 367 * The element is added with minimun priority and no delay. 368 * 369 * @param queueElement the element to add. 370 * 371 * @return <tt>true</tt> if the element has been inserted, <tt>false</tt> if the element was not inserted (the queue 372 * has reached its maximum size). 373 * 374 * @throws NullPointerException if the specified element is null 375 */ 376 @Override 377 public boolean offer(QueueElement<E> queueElement) { 378 return offer(queueElement, false); 379 } 380 381 /** 382 * Retrieve and remove the head of this queue, or return <tt>null</tt> if this queue has no elements with an expired 383 * delay. 384 * <p/> 385 * The retrieved element is the oldest one from the highest priority sub-queue. 386 * <p/> 387 * Invocations to this method run the anti-starvation (once every interval check). 388 * 389 * @return the head of this queue, or <tt>null</tt> if this queue has no elements with an expired delay. 390 */ 391 @Override 392 public QueueElement<E> poll() { 393 try { 394 lock.lock(); 395 antiStarvation(); 396 QueueElement<E> e = null; 397 int i = priorities; 398 for (; e == null && i > 0; i--) { 399 e = queues[i - 1].poll(); 400 } 401 if (e != null) { 402 if (currentSize != null) { 403 currentSize.decrementAndGet(); 404 } 405 e.inQueue = false; 406 debug("poll(): [{0}], from P[{1}]", e.getElement().toString(), i); 407 } 408 return e; 409 } 410 finally { 411 lock.unlock(); 412 } 413 } 414 415 /** 416 * Retrieve, but does not remove, the head of this queue, or returns <tt>null</tt> if this queue is empty. Unlike 417 * <tt>poll</tt>, if no expired elements are available in the queue, this method returns the element that will 418 * expire next, if one exists. 419 * 420 * @return the head of this queue, or <tt>null</tt> if this queue is empty. 421 */ 422 @Override 423 public QueueElement<E> peek() { 424 try { 425 lock.lock(); 426 antiStarvation(); 427 QueueElement<E> e = null; 428 429 QueueElement<E> [] seeks = new QueueElement[priorities]; 430 boolean foundElement = false; 431 for (int i = priorities - 1; i > -1; i--) { 432 e = queues[i].peek(); 433 debug("peek(): considering [{0}] from P[{1}]", e, i); 434 seeks[priorities - i - 1] = e; 435 foundElement |= e != null; 436 } 437 if (foundElement) { 438 e = null; 439 for (int i = 0; e == null && i < priorities; i++) { 440 if (seeks[i] != null && seeks[i].getDelay(TimeUnit.MILLISECONDS) > 0) { 441 debug("peek, ignoring [{0}]", seeks[i]); 442 } 443 else { 444 e = seeks[i]; 445 } 446 } 447 if (e != null) { 448 debug("peek(): choosing [{0}]", e); 449 } 450 if (e == null) { 451 int first; 452 for (first = 0; e == null && first < priorities; first++) { 453 e = seeks[first]; 454 } 455 if (e != null) { 456 debug("peek(): initial choosing [{0}]", e); 457 } 458 for (int i = first; i < priorities; i++) { 459 QueueElement<E> ee = seeks[i]; 460 if (ee != null && ee.getDelay(TimeUnit.MILLISECONDS) < e.getDelay(TimeUnit.MILLISECONDS)) { 461 debug("peek(): choosing [{0}] over [{1}]", ee, e); 462 e = ee; 463 } 464 } 465 } 466 } 467 if (e != null) { 468 debug("peek(): [{0}], from P[{1}]", e.getElement().toString(), e.getPriority()); 469 } 470 else { 471 debug("peek(): NULL"); 472 } 473 return e; 474 } 475 finally { 476 lock.unlock(); 477 } 478 } 479 480 /** 481 * Run the anti-starvation check every {@link #ANTI_STARVATION_INTERVAL} milliseconds. 482 * <p/> 483 * It promotes elements beyond max wait time to the next higher priority sub-queue. 484 */ 485 protected void antiStarvation() { 486 long now = System.currentTimeMillis(); 487 if (now - lastAntiStarvationCheck > ANTI_STARVATION_INTERVAL) { 488 for (int i = 0; i < queues.length - 1; i++) { 489 antiStarvation(queues[i], queues[i + 1], "from P[" + i + "] to P[" + (i + 1) + "]"); 490 } 491 StringBuilder sb = new StringBuilder(); 492 for (int i = 0; i < queues.length; i++) { 493 sb.append("P[").append(i).append("]=").append(queues[i].size()).append(" "); 494 } 495 debug("sub-queue sizes: {0}", sb.toString()); 496 lastAntiStarvationCheck = System.currentTimeMillis(); 497 } 498 } 499 500 /** 501 * Promote elements beyond max wait time from a lower priority sub-queue to a higher priority sub-queue. 502 * 503 * @param lowerQ lower priority sub-queue. 504 * @param higherQ higher priority sub-queue. 505 * @param msg sub-queues msg (from-to) for debugging purposes. 506 */ 507 private void antiStarvation(DelayQueue<QueueElement<E>> lowerQ, DelayQueue<QueueElement<E>> higherQ, String msg) { 508 int moved = 0; 509 QueueElement<E> e = lowerQ.poll(); 510 while (e != null && e.getDelay(TimeUnit.MILLISECONDS) < -maxWait) { 511 e.setDelay(0, TimeUnit.MILLISECONDS); 512 if (!higherQ.offer(e)) { 513 throw new IllegalStateException("Could not move element to higher sub-queue, element rejected"); 514 } 515 e.priority++; 516 e = lowerQ.poll(); 517 moved++; 518 } 519 if (e != null) { 520 if (!lowerQ.offer(e)) { 521 throw new IllegalStateException("Could not reinsert element to current sub-queue, element rejected"); 522 } 523 } 524 debug("anti-starvation, moved {0} element(s) {1}", moved, msg); 525 } 526 527 /** 528 * Method for debugging purposes. This implementation is a <tt>NOP</tt>. 529 * <p/> 530 * This method should be overriden for logging purposes. 531 * <p/> 532 * Message templates used by this class are in JDK's <tt>MessageFormat</tt> syntax. 533 * 534 * @param msgTemplate message template. 535 * @param msgArgs arguments for the message template. 536 */ 537 protected void debug(String msgTemplate, Object... msgArgs) { 538 } 539 540 /** 541 * Insert the specified element into this queue, waiting if necessary 542 * for space to become available. 543 * <p/> 544 * NOTE: This method is to fulfill the <tt>BlockingQueue<tt/> interface. Not implemented in the most optimal way. 545 * 546 * @param e the element to add 547 * @throws InterruptedException if interrupted while waiting 548 * @throws ClassCastException if the class of the specified element 549 * prevents it from being added to this queue 550 * @throws NullPointerException if the specified element is null 551 * @throws IllegalArgumentException if some property of the specified 552 * element prevents it from being added to this queue 553 */ 554 @Override 555 public void put(QueueElement<E> e) throws InterruptedException { 556 while (!offer(e, true)) { 557 Thread.sleep(10); 558 } 559 } 560 561 /** 562 * Insert the specified element into this queue, waiting up to the 563 * specified wait time if necessary for space to become available. 564 * <p/> 565 * IMPORTANT: This implementation forces the addition of the element to the queue regardless 566 * of the queue current size. The timeout value is ignored as the element is added immediately. 567 * <p/> 568 * NOTE: This method is to fulfill the <tt>BlockingQueue<tt/> interface. Not implemented in the most optimal way. 569 * 570 * @param e the element to add 571 * @param timeout how long to wait before giving up, in units of 572 * <tt>unit</tt> 573 * @param unit a <tt>TimeUnit</tt> determining how to interpret the 574 * <tt>timeout</tt> parameter 575 * @return <tt>true</tt> if successful, or <tt>false</tt> if 576 * the specified waiting time elapses before space is available 577 * @throws InterruptedException if interrupted while waiting 578 * @throws ClassCastException if the class of the specified element 579 * prevents it from being added to this queue 580 * @throws NullPointerException if the specified element is null 581 * @throws IllegalArgumentException if some property of the specified 582 * element prevents it from being added to this queue 583 */ 584 @Override 585 public boolean offer(QueueElement<E> e, long timeout, TimeUnit unit) throws InterruptedException { 586 return offer(e, true); 587 } 588 589 /** 590 * Retrieve and removes the head of this queue, waiting if necessary 591 * until an element becomes available. 592 * <p/> 593 * IMPORTANT: This implementation has a delay of up to 10ms (when the queue is empty) to detect a new element 594 * is available. It is doing a 10ms sleep. 595 * <p/> 596 * NOTE: This method is to fulfill the <tt>BlockingQueue<tt/> interface. Not implemented in the most optimal way. 597 * 598 * @return the head of this queue 599 * @throws InterruptedException if interrupted while waiting 600 */ 601 @Override 602 public QueueElement<E> take() throws InterruptedException { 603 QueueElement<E> e = poll(); 604 while (e == null) { 605 Thread.sleep(10); 606 e = poll(); 607 } 608 return e; 609 } 610 611 /** 612 * Retrieve and removes the head of this queue, waiting up to the 613 * specified wait time if necessary for an element to become available. 614 * <p/> 615 * NOTE: This method is to fulfill the <tt>BlockingQueue<tt/> interface. Not implemented in the most optimal way. 616 * 617 * @param timeout how long to wait before giving up, in units of 618 * <tt>unit</tt> 619 * @param unit a <tt>TimeUnit</tt> determining how to interpret the 620 * <tt>timeout</tt> parameter 621 * @return the head of this queue, or <tt>null</tt> if the 622 * specified waiting time elapses before an element is available 623 * @throws InterruptedException if interrupted while waiting 624 */ 625 @Override 626 public QueueElement<E> poll(long timeout, TimeUnit unit) throws InterruptedException { 627 QueueElement<E> e = poll(); 628 long time = System.currentTimeMillis() + unit.toMillis(timeout); 629 while (e == null && time > System.currentTimeMillis()) { 630 Thread.sleep(10); 631 e = poll(); 632 } 633 return poll(); 634 } 635 636 /** 637 * Return the number of additional elements that this queue can ideally 638 * (in the absence of memory or resource constraints) accept without 639 * blocking, or <tt>Integer.MAX_VALUE</tt> if there is no intrinsic 640 * limit. 641 * 642 * <p>Note that you <em>cannot</em> always tell if an attempt to insert 643 * an element will succeed by inspecting <tt>remainingCapacity</tt> 644 * because it may be the case that another thread is about to 645 * insert or remove an element. 646 * <p/> 647 * NOTE: This method is to fulfill the <tt>BlockingQueue<tt/> interface. Not implemented in the most optimal way. 648 * 649 * @return the remaining capacity 650 */ 651 @Override 652 public int remainingCapacity() { 653 return (maxSize == -1) ? -1 : maxSize - size(); 654 } 655 656 /** 657 * Remove all available elements from this queue and adds them 658 * to the given collection. This operation may be more 659 * efficient than repeatedly polling this queue. A failure 660 * encountered while attempting to add elements to 661 * collection <tt>c</tt> may result in elements being in neither, 662 * either or both collections when the associated exception is 663 * thrown. Attempt to drain a queue to itself result in 664 * <tt>IllegalArgumentException</tt>. Further, the behavior of 665 * this operation is undefined if the specified collection is 666 * modified while the operation is in progress. 667 * <p/> 668 * NOTE: This method is to fulfill the <tt>BlockingQueue<tt/> interface. Not implemented in the most optimal way. 669 * 670 * @param c the collection to transfer elements into 671 * @return the number of elements transferred 672 * @throws UnsupportedOperationException if addition of elements 673 * is not supported by the specified collection 674 * @throws ClassCastException if the class of an element of this queue 675 * prevents it from being added to the specified collection 676 * @throws NullPointerException if the specified collection is null 677 * @throws IllegalArgumentException if the specified collection is this 678 * queue, or some property of an element of this queue prevents 679 * it from being added to the specified collection 680 */ 681 @Override 682 public int drainTo(Collection<? super QueueElement<E>> c) { 683 int count = 0; 684 for (DelayQueue<QueueElement<E>> q : queues) { 685 count += q.drainTo(c); 686 } 687 return count; 688 } 689 690 /** 691 * Remove at most the given number of available elements from 692 * this queue and adds them to the given collection. A failure 693 * encountered while attempting to add elements to 694 * collection <tt>c</tt> may result in elements being in neither, 695 * either or both collections when the associated exception is 696 * thrown. Attempt to drain a queue to itself result in 697 * <tt>IllegalArgumentException</tt>. Further, the behavior of 698 * this operation is undefined if the specified collection is 699 * modified while the operation is in progress. 700 * <p/> 701 * NOTE: This method is to fulfill the <tt>BlockingQueue<tt/> interface. Not implemented in the most optimal way. 702 * 703 * @param c the collection to transfer elements into 704 * @param maxElements the maximum number of elements to transfer 705 * @return the number of elements transferred 706 * @throws UnsupportedOperationException if addition of elements 707 * is not supported by the specified collection 708 * @throws ClassCastException if the class of an element of this queue 709 * prevents it from being added to the specified collection 710 * @throws NullPointerException if the specified collection is null 711 * @throws IllegalArgumentException if the specified collection is this 712 * queue, or some property of an element of this queue prevents 713 * it from being added to the specified collection 714 */ 715 @Override 716 public int drainTo(Collection<? super QueueElement<E>> c, int maxElements) { 717 int left = maxElements; 718 int count = 0; 719 for (DelayQueue<QueueElement<E>> q : queues) { 720 int drained = q.drainTo(c, left); 721 count += drained; 722 left -= drained; 723 } 724 return count; 725 } 726 727 /** 728 * Removes all of the elements from this queue. The queue will be empty after this call returns. 729 */ 730 @Override 731 public void clear() { 732 for (DelayQueue<QueueElement<E>> q : queues) { 733 q.clear(); 734 } 735 } 736 }