001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.util;
020
021import java.util.Iterator;
022import java.util.concurrent.TimeUnit;
023
024public class PollablePriorityDelayQueue<E> extends PriorityDelayQueue<E> {
025
026    public PollablePriorityDelayQueue(int priorities, long maxWait, TimeUnit unit, int maxSize) {
027        super(priorities, maxWait, unit, maxSize);
028    }
029
030    /**
031     * Retrieve and remove the head of this queue if it is eligible to poll. If not, iterate next element until find
032     * the one which is eligible to poll from queue.
033     *
034     * Return <tt>null</tt> if this queue has no elements eligible to run.
035     *
036     * <p>
037     * Invocations to this method run the anti-starvation (once every interval check).
038     *
039     * @return the element of this queue, for which eligibleToPoll is true.
040     */
041    @Override
042    public QueueElement<E> poll() {
043        lock.lock();
044        try {
045            antiStarvation();
046            QueueElement<E> e = null;
047            int i = priorities;
048            for (; e == null && i > 0; i--) {
049                e = queues[i - 1].peek();
050                if (eligibleToPoll(e)) {
051                    e = queues[i - 1].poll();
052                }
053                else {
054                    if (e != null) {
055                        debug("poll(): the peek element [{0}], from P[{1}] is not eligible to poll", e.getElement().toString(), i);
056                    }
057                    e = null;
058                    Iterator<QueueElement<E>> iter = queues[i - 1].iterator();
059                    while(e == null && iter.hasNext()) {
060                        e = iter.next();
061                        if (e.getDelay(TimeUnit.MILLISECONDS) <= 0 && eligibleToPoll(e)) {
062                            queues[i - 1].remove(e);
063                        }
064                        else {
065                            debug("poll(): the iterator element [{0}], from P[{1}] is not eligible to poll", e.getElement().toString(), i);
066                            e = null;
067                        }
068                    }
069                }
070            }
071            if (e != null) {
072                if (currentSize != null) {
073                    currentSize.decrementAndGet();
074                }
075                e.inQueue = false;
076                debug("poll(): [{0}], from P[{1}]", e.getElement().toString(), i);
077            }
078            return e;
079        }
080        finally {
081            lock.unlock();
082        }
083    }
084
085    /**
086     * Method for checking the QueueElement eligible to poll before remove it from queue.
087     * <p>
088     * This method should be overriden for checking purposes.
089     *
090     * @param element the element to check
091     */
092    protected boolean eligibleToPoll(QueueElement<?> element) {
093        return true;
094    }
095}