001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.util; 020 021import java.util.AbstractQueue; 022import java.util.ArrayList; 023import java.util.Arrays; 024import java.util.Collection; 025import java.util.ConcurrentModificationException; 026import java.util.Iterator; 027import java.util.List; 028import java.util.concurrent.BlockingQueue; 029import java.util.concurrent.DelayQueue; 030import java.util.concurrent.Delayed; 031import java.util.concurrent.FutureTask; 032import java.util.concurrent.TimeUnit; 033import java.util.concurrent.atomic.AtomicInteger; 034import java.util.concurrent.locks.ReentrantLock; 035 036/** 037 * A Queue implementation that support queuing elements into the future and priority queuing. 038 * <p> 039 * The {@link PriorityDelayQueue} avoids starvation by raising elements priority as they age. 040 * <p> 041 * To support queuing elements into the future, the JDK <code>DelayQueue</code> is used. 042 * <p> 043 * To support priority queuing, an array of <code>DelayQueue</code> sub-queues is used. Elements are consumed from the 044 * higher priority sub-queues first. From a sub-queue, elements are available based on their age. 045 * <p> 046 * To avoid starvation, there is is maximum wait time for an an element in a sub-queue, after the maximum wait time has 047 * elapsed, the element is promoted to the next higher priority sub-queue. Eventually it will reach the maximum priority 048 * sub-queue and it will be consumed when it is the oldest element in the that sub-queue. 049 * <p> 050 * Every time an element is promoted to a higher priority sub-queue, a new maximum wait time applies. 051 * <p> 052 * This class does not use a separate thread for anti-starvation check, instead, the check is performed on polling and 053 * seeking operations. This check is performed, the most every 1/2 second. 054 */ 055public class PriorityDelayQueue<E> extends AbstractQueue<PriorityDelayQueue.QueueElement<E>> 056 implements BlockingQueue<PriorityDelayQueue.QueueElement<E>> { 057 058 /** 059 * Element wrapper required by the queue. 060 * <p> 061 * This wrapper keeps track of the priority and the age of a queue element. 062 */ 063 public static class QueueElement<E> extends FutureTask<E> implements Delayed { 064 private XCallable<E> element; 065 private int priority; 066 private long baseTime; 067 boolean inQueue; 068 069 /** 070 * Create an Element wrapper. 071 * 072 * @param element element. 073 * @param priority priority of the element. 074 * @param delay delay of the element. 075 * @param unit time unit of the delay. 076 * 077 * @throws IllegalArgumentException if the element is <tt>NULL</tt>, the priority is negative or if the delay is 078 * negative. 079 */ 080 public QueueElement(XCallable<E> element, int priority, long delay, TimeUnit unit) { 081 super(element); 082 if (priority < 0) { 083 throw new IllegalArgumentException("priority cannot be negative, [" + element + "]"); 084 } 085 if (delay < 0) { 086 throw new IllegalArgumentException("delay cannot be negative"); 087 } 088 this.element = element; 089 this.priority = priority; 090 setDelay(delay, unit); 091 } 092 093 /** 094 * Return the element from the wrapper. 095 * 096 * @return the element. 097 */ 098 public XCallable<E> getElement() { 099 return element; 100 } 101 102 /** 103 * Return the priority of the element. 104 * 105 * @return the priority of the element. 106 */ 107 public int getPriority() { 108 return priority; 109 } 110 111 /** 112 * Set the delay of the element. 113 * 114 * @param delay delay of the element. 115 * @param unit time unit of the delay. 116 */ 117 public void setDelay(long delay, TimeUnit unit) { 118 baseTime = System.currentTimeMillis() + unit.toMillis(delay); 119 } 120 121 /** 122 * Return the delay of the element. 123 * 124 * @param unit time unit of the delay. 125 * 126 * @return the delay in the specified time unit. 127 */ 128 public long getDelay(TimeUnit unit) { 129 return unit.convert(baseTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); 130 } 131 132 /** 133 * Compare the age of this wrapper element with another. The priority is not used for the comparision. 134 * 135 * @param o the other wrapper element to compare with. 136 * 137 * @return less than zero if this wrapper is older, zero if both wrapper elements have the same age, greater 138 * than zero if the parameter wrapper element is older. 139 */ 140 public int compareTo(Delayed o) { 141 long diff = (getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS)); 142 if(diff > 0) { 143 return 1; 144 } else if(diff < 0) { 145 return -1; 146 } else { 147 return 0; 148 } 149 } 150 151 /** 152 * Return the string representation of the wrapper element. 153 * 154 * @return the string representation of the wrapper element. 155 */ 156 @Override 157 public String toString() { 158 StringBuilder sb = new StringBuilder(); 159 sb.append("[").append(element).append("] priority=").append(priority).append(" delay="). 160 append(getDelay(TimeUnit.MILLISECONDS)); 161 return sb.toString(); 162 } 163 164 } 165 166 /** 167 * Frequency, in milliseconds, of the anti-starvation check. 168 */ 169 public static final long ANTI_STARVATION_INTERVAL = 500; 170 171 protected int priorities; 172 protected DelayQueue<QueueElement<E>>[] queues; 173 protected transient final ReentrantLock lock = new ReentrantLock(); 174 private transient long lastAntiStarvationCheck = 0; 175 private long maxWait; 176 private int maxSize; 177 protected AtomicInteger currentSize; 178 179 /** 180 * Create a <code>PriorityDelayQueue</code>. 181 * 182 * @param priorities number of priorities the queue will support. 183 * @param maxWait max wait time for elements before they are promoted to the next higher priority. 184 * @param unit time unit of the max wait time. 185 * @param maxSize maximum size of the queue, -1 means unbounded. 186 */ 187 @SuppressWarnings("unchecked") 188 public PriorityDelayQueue(int priorities, long maxWait, TimeUnit unit, int maxSize) { 189 if (priorities < 1) { 190 throw new IllegalArgumentException("priorities must be 1 or more"); 191 } 192 if (maxWait < 0) { 193 throw new IllegalArgumentException("maxWait must be greater than 0"); 194 } 195 if (maxSize < -1 || maxSize == 0) { 196 throw new IllegalArgumentException("maxSize must be -1 or greater than 0"); 197 } 198 this.priorities = priorities; 199 queues = new DelayQueue[priorities]; 200 for (int i = 0; i < priorities; i++) { 201 queues[i] = new DelayQueue<QueueElement<E>>(); 202 } 203 this.maxWait = unit.toMillis(maxWait); 204 this.maxSize = maxSize; 205 if (maxSize != -1) { 206 currentSize = new AtomicInteger(); 207 } 208 } 209 210 /** 211 * Return number of priorities the queue supports. 212 * 213 * @return number of priorities the queue supports. 214 */ 215 public int getPriorities() { 216 return priorities; 217 } 218 219 /** 220 * Return the max wait time for elements before they are promoted to the next higher priority. 221 * 222 * @param unit time unit of the max wait time. 223 * 224 * @return the max wait time in the specified time unit. 225 */ 226 public long getMaxWait(TimeUnit unit) { 227 return unit.convert(maxWait, TimeUnit.MILLISECONDS); 228 } 229 230 /** 231 * Return the maximum queue size. 232 * 233 * @return the maximum queue size. If <code>-1</code> the queue is unbounded. 234 */ 235 public long getMaxSize() { 236 return maxSize; 237 } 238 239 /** 240 * Return an iterator over all the {@link QueueElement} elements (both expired and unexpired) in this queue. The 241 * iterator does not return the elements in any particular order. The returned <tt>Iterator</tt> is a "weakly 242 * consistent" iterator that will never throw {@link ConcurrentModificationException}, and guarantees to traverse 243 * elements as they existed upon construction of the iterator, and may (but is not guaranteed to) reflect any 244 * modifications subsequent to construction. 245 * 246 * @return an iterator over the {@link QueueElement} elements in this queue. 247 */ 248 @Override 249 @SuppressWarnings("unchecked") 250 public Iterator<QueueElement<E>> iterator() { 251 QueueElement[][] queueElements = new QueueElement[queues.length][]; 252 lock.lock(); 253 try { 254 for (int i = 0; i < queues.length; i++) { 255 queueElements[i] = queues[i].toArray(new QueueElement[0]); 256 } 257 } 258 finally { 259 lock.unlock(); 260 } 261 List<QueueElement<E>> list = new ArrayList<QueueElement<E>>(); 262 for (QueueElement[] elements : queueElements) { 263 list.addAll(Arrays.asList((QueueElement<E>[]) elements)); 264 } 265 return list.iterator(); 266 } 267 268 /** 269 * Return the number of elements in the queue. 270 * 271 * @return the number of elements in the queue. 272 */ 273 @Override 274 public int size() { 275 int size = 0; 276 for (DelayQueue<QueueElement<E>> queue : queues) { 277 size += queue.size(); 278 } 279 return size; 280 } 281 282 /** 283 * Return the number of elements on each priority sub-queue. 284 * 285 * @return the number of elements on each priority sub-queue. 286 */ 287 public int[] sizes() { 288 int[] sizes = new int[queues.length]; 289 for (int i = 0; i < queues.length; i++) { 290 sizes[i] = queues[i].size(); 291 } 292 return sizes; 293 } 294 295 /** 296 * Inserts the specified element into this queue if it is possible to do 297 * so immediately without violating capacity restrictions, returning 298 * <tt>true</tt> upon success and throwing an 299 * <tt>IllegalStateException</tt> if no space is currently available. 300 * When using a capacity-restricted queue, it is generally preferable to 301 * use {@link #offer(Object) offer}. 302 * 303 * @param queueElement the {@link QueueElement} element to add. 304 * @return <tt>true</tt> (as specified by {@link Collection#add}) 305 * @throws IllegalStateException if the element cannot be added at this 306 * time due to capacity restrictions 307 * @throws ClassCastException if the class of the specified element 308 * prevents it from being added to this queue 309 * @throws NullPointerException if the specified element is null 310 * @throws IllegalArgumentException if some property of the specified 311 * element prevents it from being added to this queue 312 */ 313 @Override 314 public boolean add(QueueElement<E> queueElement) { 315 return offer(queueElement, false); 316 } 317 318 /** 319 * Insert the specified {@link QueueElement} element into the queue. 320 * 321 * @param queueElement the {@link QueueElement} element to add. 322 * @param ignoreSize if the queue is bound to a maximum size and the maximum size is reached, this parameter (if set 323 * to <tt>true</tt>) allows to ignore the maximum size and add the element to the queue. 324 * 325 * @return <tt>true</tt> if the element has been inserted, <tt>false</tt> if the element was not inserted (the queue 326 * has reached its maximum size). 327 * 328 * @throws NullPointerException if the specified element is null 329 */ 330 boolean offer(QueueElement<E> queueElement, boolean ignoreSize) { 331 ParamChecker.notNull(queueElement, "queueElement"); 332 if (queueElement.getPriority() < 0 || queueElement.getPriority() >= priorities) { 333 throw new IllegalArgumentException("priority out of range: " + queueElement); 334 } 335 if (queueElement.inQueue) { 336 throw new IllegalStateException("queueElement already in a queue: " + queueElement); 337 } 338 if (!ignoreSize && currentSize != null && currentSize.get() >= maxSize) { 339 return false; 340 } 341 boolean accepted = queues[queueElement.getPriority()].offer(queueElement); 342 debug("offer([{0}]), to P[{1}] delay[{2}ms] accepted[{3}]", queueElement.getElement().toString(), 343 queueElement.getPriority(), queueElement.getDelay(TimeUnit.MILLISECONDS), accepted); 344 if (accepted) { 345 if (currentSize != null) { 346 currentSize.incrementAndGet(); 347 } 348 queueElement.inQueue = true; 349 } 350 return accepted; 351 } 352 353 /** 354 * Insert the specified element into the queue. 355 * <p> 356 * The element is added with minimun priority and no delay. 357 * 358 * @param queueElement the element to add. 359 * 360 * @return <tt>true</tt> if the element has been inserted, <tt>false</tt> if the element was not inserted (the queue 361 * has reached its maximum size). 362 * 363 * @throws NullPointerException if the specified element is null 364 */ 365 @Override 366 public boolean offer(QueueElement<E> queueElement) { 367 return offer(queueElement, false); 368 } 369 370 /** 371 * Retrieve and remove the head of this queue, or return <tt>null</tt> if this queue has no elements with an expired 372 * delay. 373 * <p> 374 * The retrieved element is the oldest one from the highest priority sub-queue. 375 * <p> 376 * Invocations to this method run the anti-starvation (once every interval check). 377 * 378 * @return the head of this queue, or <tt>null</tt> if this queue has no elements with an expired delay. 379 */ 380 @Override 381 public QueueElement<E> poll() { 382 lock.lock(); 383 try { 384 antiStarvation(); 385 QueueElement<E> e = null; 386 int i = priorities; 387 for (; e == null && i > 0; i--) { 388 e = queues[i - 1].poll(); 389 } 390 if (e != null) { 391 if (currentSize != null) { 392 currentSize.decrementAndGet(); 393 } 394 e.inQueue = false; 395 debug("poll(): [{0}], from P[{1}]", e.getElement().toString(), i); 396 } 397 return e; 398 } 399 finally { 400 lock.unlock(); 401 } 402 } 403 404 /** 405 * Retrieve, but does not remove, the head of this queue, or returns <tt>null</tt> if this queue is empty. Unlike 406 * <tt>poll</tt>, if no expired elements are available in the queue, this method returns the element that will 407 * expire next, if one exists. 408 * 409 * @return the head of this queue, or <tt>null</tt> if this queue is empty. 410 */ 411 @Override 412 public QueueElement<E> peek() { 413 lock.lock(); 414 try { 415 antiStarvation(); 416 QueueElement<E> e = null; 417 418 QueueElement<E> [] seeks = new QueueElement[priorities]; 419 boolean foundElement = false; 420 for (int i = priorities - 1; i > -1; i--) { 421 e = queues[i].peek(); 422 debug("peek(): considering [{0}] from P[{1}]", e, i); 423 seeks[priorities - i - 1] = e; 424 foundElement |= e != null; 425 } 426 if (foundElement) { 427 e = null; 428 for (int i = 0; e == null && i < priorities; i++) { 429 if (seeks[i] != null && seeks[i].getDelay(TimeUnit.MILLISECONDS) > 0) { 430 debug("peek, ignoring [{0}]", seeks[i]); 431 } 432 else { 433 e = seeks[i]; 434 } 435 } 436 if (e != null) { 437 debug("peek(): choosing [{0}]", e); 438 } 439 if (e == null) { 440 int first; 441 for (first = 0; e == null && first < priorities; first++) { 442 e = seeks[first]; 443 } 444 if (e != null) { 445 debug("peek(): initial choosing [{0}]", e); 446 } 447 for (int i = first; i < priorities; i++) { 448 QueueElement<E> ee = seeks[i]; 449 if (ee != null && ee.getDelay(TimeUnit.MILLISECONDS) < e.getDelay(TimeUnit.MILLISECONDS)) { 450 debug("peek(): choosing [{0}] over [{1}]", ee, e); 451 e = ee; 452 } 453 } 454 } 455 } 456 if (e != null) { 457 debug("peek(): [{0}], from P[{1}]", e.getElement().toString(), e.getPriority()); 458 } 459 else { 460 debug("peek(): NULL"); 461 } 462 return e; 463 } 464 finally { 465 lock.unlock(); 466 } 467 } 468 469 /** 470 * Run the anti-starvation check every {@link #ANTI_STARVATION_INTERVAL} milliseconds. 471 * <p> 472 * It promotes elements beyond max wait time to the next higher priority sub-queue. 473 */ 474 protected void antiStarvation() { 475 long now = System.currentTimeMillis(); 476 if (now - lastAntiStarvationCheck > ANTI_STARVATION_INTERVAL) { 477 for (int i = 0; i < queues.length - 1; i++) { 478 antiStarvation(queues[i], queues[i + 1], "from P[" + i + "] to P[" + (i + 1) + "]"); 479 } 480 StringBuilder sb = new StringBuilder(); 481 for (int i = 0; i < queues.length; i++) { 482 sb.append("P[").append(i).append("]=").append(queues[i].size()).append(" "); 483 } 484 debug("sub-queue sizes: {0}", sb.toString()); 485 lastAntiStarvationCheck = System.currentTimeMillis(); 486 } 487 } 488 489 /** 490 * Promote elements beyond max wait time from a lower priority sub-queue to a higher priority sub-queue. 491 * 492 * @param lowerQ lower priority sub-queue. 493 * @param higherQ higher priority sub-queue. 494 * @param msg sub-queues msg (from-to) for debugging purposes. 495 */ 496 private void antiStarvation(DelayQueue<QueueElement<E>> lowerQ, DelayQueue<QueueElement<E>> higherQ, String msg) { 497 int moved = 0; 498 QueueElement<E> e = lowerQ.poll(); 499 while (e != null && e.getDelay(TimeUnit.MILLISECONDS) < -maxWait) { 500 e.setDelay(0, TimeUnit.MILLISECONDS); 501 if (!higherQ.offer(e)) { 502 throw new IllegalStateException("Could not move element to higher sub-queue, element rejected"); 503 } 504 e.priority++; 505 e = lowerQ.poll(); 506 moved++; 507 } 508 if (e != null) { 509 if (!lowerQ.offer(e)) { 510 throw new IllegalStateException("Could not reinsert element to current sub-queue, element rejected"); 511 } 512 } 513 debug("anti-starvation, moved {0} element(s) {1}", moved, msg); 514 } 515 516 /** 517 * Method for debugging purposes. This implementation is a <tt>NOP</tt>. 518 * <p> 519 * This method should be overriden for logging purposes. 520 * <p> 521 * Message templates used by this class are in JDK's <tt>MessageFormat</tt> syntax. 522 * 523 * @param msgTemplate message template. 524 * @param msgArgs arguments for the message template. 525 */ 526 protected void debug(String msgTemplate, Object... msgArgs) { 527 } 528 529 /** 530 * Insert the specified element into this queue, waiting if necessary 531 * for space to become available. 532 * <p> 533 * NOTE: This method is to fulfill the <tt>BlockingQueue</tt> interface. Not implemented in the most optimal way. 534 * 535 * @param e the element to add 536 * @throws InterruptedException if interrupted while waiting 537 * @throws ClassCastException if the class of the specified element 538 * prevents it from being added to this queue 539 * @throws NullPointerException if the specified element is null 540 * @throws IllegalArgumentException if some property of the specified 541 * element prevents it from being added to this queue 542 */ 543 @Override 544 public void put(QueueElement<E> e) throws InterruptedException { 545 while (!offer(e, true)) { 546 Thread.sleep(10); 547 } 548 } 549 550 /** 551 * Insert the specified element into this queue, waiting up to the 552 * specified wait time if necessary for space to become available. 553 * <p> 554 * IMPORTANT: This implementation forces the addition of the element to the queue regardless 555 * of the queue current size. The timeout value is ignored as the element is added immediately. 556 * <p> 557 * NOTE: This method is to fulfill the <tt>BlockingQueue</tt> interface. Not implemented in the most optimal way. 558 * 559 * @param e the element to add 560 * @param timeout how long to wait before giving up, in units of 561 * <tt>unit</tt> 562 * @param unit a <tt>TimeUnit</tt> determining how to interpret the 563 * <tt>timeout</tt> parameter 564 * @return <tt>true</tt> if successful, or <tt>false</tt> if 565 * the specified waiting time elapses before space is available 566 * @throws InterruptedException if interrupted while waiting 567 * @throws ClassCastException if the class of the specified element 568 * prevents it from being added to this queue 569 * @throws NullPointerException if the specified element is null 570 * @throws IllegalArgumentException if some property of the specified 571 * element prevents it from being added to this queue 572 */ 573 @Override 574 public boolean offer(QueueElement<E> e, long timeout, TimeUnit unit) throws InterruptedException { 575 return offer(e, true); 576 } 577 578 /** 579 * Retrieve and removes the head of this queue, waiting if necessary 580 * until an element becomes available. 581 * <p> 582 * IMPORTANT: This implementation has a delay of up to 10ms (when the queue is empty) to detect a new element 583 * is available. It is doing a 10ms sleep. 584 * <p> 585 * NOTE: This method is to fulfill the <tt>BlockingQueue</tt> interface. Not implemented in the most optimal way. 586 * 587 * @return the head of this queue 588 * @throws InterruptedException if interrupted while waiting 589 */ 590 @Override 591 public QueueElement<E> take() throws InterruptedException { 592 QueueElement<E> e = poll(); 593 while (e == null) { 594 Thread.sleep(10); 595 e = poll(); 596 } 597 return e; 598 } 599 600 /** 601 * Retrieve and removes the head of this queue, waiting up to the 602 * specified wait time if necessary for an element to become available. 603 * <p> 604 * NOTE: This method is to fulfill the <tt>BlockingQueue</tt> interface. Not implemented in the most optimal way. 605 * 606 * @param timeout how long to wait before giving up, in units of 607 * <tt>unit</tt> 608 * @param unit a <tt>TimeUnit</tt> determining how to interpret the 609 * <tt>timeout</tt> parameter 610 * @return the head of this queue, or <tt>null</tt> if the 611 * specified waiting time elapses before an element is available 612 * @throws InterruptedException if interrupted while waiting 613 */ 614 @Override 615 public QueueElement<E> poll(long timeout, TimeUnit unit) throws InterruptedException { 616 QueueElement<E> e = poll(); 617 long time = System.currentTimeMillis() + unit.toMillis(timeout); 618 while (e == null && time > System.currentTimeMillis()) { 619 Thread.sleep(10); 620 e = poll(); 621 } 622 return poll(); 623 } 624 625 /** 626 * Return the number of additional elements that this queue can ideally 627 * (in the absence of memory or resource constraints) accept without 628 * blocking, or <tt>Integer.MAX_VALUE</tt> if there is no intrinsic 629 * limit. 630 * 631 * <p>Note that you <em>cannot</em> always tell if an attempt to insert 632 * an element will succeed by inspecting <tt>remainingCapacity</tt> 633 * because it may be the case that another thread is about to 634 * insert or remove an element. 635 * <p> 636 * NOTE: This method is to fulfill the <tt>BlockingQueue</tt> interface. Not implemented in the most optimal way. 637 * 638 * @return the remaining capacity 639 */ 640 @Override 641 public int remainingCapacity() { 642 return (maxSize == -1) ? -1 : maxSize - size(); 643 } 644 645 /** 646 * Remove all available elements from this queue and adds them 647 * to the given collection. This operation may be more 648 * efficient than repeatedly polling this queue. A failure 649 * encountered while attempting to add elements to 650 * collection <tt>c</tt> may result in elements being in neither, 651 * either or both collections when the associated exception is 652 * thrown. Attempt to drain a queue to itself result in 653 * <tt>IllegalArgumentException</tt>. Further, the behavior of 654 * this operation is undefined if the specified collection is 655 * modified while the operation is in progress. 656 * <p> 657 * NOTE: This method is to fulfill the <tt>BlockingQueue</tt> interface. Not implemented in the most optimal way. 658 * 659 * @param c the collection to transfer elements into 660 * @return the number of elements transferred 661 * @throws UnsupportedOperationException if addition of elements 662 * is not supported by the specified collection 663 * @throws ClassCastException if the class of an element of this queue 664 * prevents it from being added to the specified collection 665 * @throws NullPointerException if the specified collection is null 666 * @throws IllegalArgumentException if the specified collection is this 667 * queue, or some property of an element of this queue prevents 668 * it from being added to the specified collection 669 */ 670 @Override 671 public int drainTo(Collection<? super QueueElement<E>> c) { 672 int count = 0; 673 for (DelayQueue<QueueElement<E>> q : queues) { 674 count += q.drainTo(c); 675 } 676 return count; 677 } 678 679 /** 680 * Remove at most the given number of available elements from 681 * this queue and adds them to the given collection. A failure 682 * encountered while attempting to add elements to 683 * collection <tt>c</tt> may result in elements being in neither, 684 * either or both collections when the associated exception is 685 * thrown. Attempt to drain a queue to itself result in 686 * <tt>IllegalArgumentException</tt>. Further, the behavior of 687 * this operation is undefined if the specified collection is 688 * modified while the operation is in progress. 689 * <p> 690 * NOTE: This method is to fulfill the <tt>BlockingQueue</tt> interface. Not implemented in the most optimal way. 691 * 692 * @param c the collection to transfer elements into 693 * @param maxElements the maximum number of elements to transfer 694 * @return the number of elements transferred 695 * @throws UnsupportedOperationException if addition of elements 696 * is not supported by the specified collection 697 * @throws ClassCastException if the class of an element of this queue 698 * prevents it from being added to the specified collection 699 * @throws NullPointerException if the specified collection is null 700 * @throws IllegalArgumentException if the specified collection is this 701 * queue, or some property of an element of this queue prevents 702 * it from being added to the specified collection 703 */ 704 @Override 705 public int drainTo(Collection<? super QueueElement<E>> c, int maxElements) { 706 int left = maxElements; 707 int count = 0; 708 for (DelayQueue<QueueElement<E>> q : queues) { 709 int drained = q.drainTo(c, left); 710 count += drained; 711 left -= drained; 712 } 713 return count; 714 } 715 716 /** 717 * Removes all of the elements from this queue. The queue will be empty after this call returns. 718 */ 719 @Override 720 public void clear() { 721 for (DelayQueue<QueueElement<E>> q : queues) { 722 q.clear(); 723 } 724 } 725}