This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.util;
019
020 import java.util.Iterator;
021 import java.util.concurrent.TimeUnit;
022
023 public class PollablePriorityDelayQueue<E> extends PriorityDelayQueue<E> {
024
025 public PollablePriorityDelayQueue(int priorities, long maxWait, TimeUnit unit, int maxSize) {
026 super(priorities, maxWait, unit, maxSize);
027 }
028
029 /**
030 * Retrieve and remove the head of this queue if it is eligible to poll. If not, iterate next element until find
031 * the one which is eligible to poll from queue.
032 *
033 * Return <tt>null</tt> if this queue has no elements eligible to run.
034 *
035 * <p/>
036 * Invocations to this method run the anti-starvation (once every interval check).
037 *
038 * @return the element of this queue, for which eligibleToPoll is true.
039 */
040 @Override
041 public QueueElement<E> poll() {
042 try {
043 lock.lock();
044 antiStarvation();
045 QueueElement<E> e = null;
046 int i = priorities;
047 for (; e == null && i > 0; i--) {
048 e = queues[i - 1].peek();
049 if (eligibleToPoll(e)) {
050 e = queues[i - 1].poll();
051 }
052 else {
053 if (e != null) {
054 debug("poll(): the peek element [{0}], from P[{1}] is not eligible to poll", e.getElement().toString(), i);
055 }
056 e = null;
057 Iterator<QueueElement<E>> iter = queues[i - 1].iterator();
058 while(e == null && iter.hasNext()) {
059 e = iter.next();
060 if (eligibleToPoll(e)) {
061 queues[i - 1].remove(e);
062 }
063 else {
064 debug("poll(): the iterator element [{0}], from P[{1}] is not eligible to poll", e.getElement().toString(), i);
065 e = null;
066 }
067 }
068 }
069 }
070 if (e != null) {
071 if (currentSize != null) {
072 currentSize.decrementAndGet();
073 }
074 e.inQueue = false;
075 debug("poll(): [{0}], from P[{1}]", e.getElement().toString(), i);
076 }
077 return e;
078 }
079 finally {
080 lock.unlock();
081 }
082 }
083
084 /**
085 * Method for checking the QueueElement eligible to poll before remove it from queue.
086 * <p/>
087 * This method should be overriden for checking purposes.
088 *
089 * @param element the element to check
090 */
091 protected boolean eligibleToPoll(QueueElement<?> element) {
092 return true;
093 }
094 }