001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 * 
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 * 
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.util;
019
020import java.util.Iterator;
021import java.util.concurrent.TimeUnit;
022
023public class PollablePriorityDelayQueue<E> extends PriorityDelayQueue<E> {
024
025    public PollablePriorityDelayQueue(int priorities, long maxWait, TimeUnit unit, int maxSize) {
026        super(priorities, maxWait, unit, maxSize);
027    }
028
029    /**
030     * Retrieve and remove the head of this queue if it is eligible to poll. If not, iterate next element until find
031     * the one which is eligible to poll from queue.
032     *
033     * Return <tt>null</tt> if this queue has no elements eligible to run.
034     *
035     * <p/>
036     * Invocations to this method run the anti-starvation (once every interval check).
037     *
038     * @return the element of this queue, for which eligibleToPoll is true.
039     */
040    @Override
041    public QueueElement<E> poll() {
042        lock.lock();
043        try {
044            antiStarvation();
045            QueueElement<E> e = null;
046            int i = priorities;
047            for (; e == null && i > 0; i--) {
048                e = queues[i - 1].peek();
049                if (eligibleToPoll(e)) {
050                    e = queues[i - 1].poll();
051                }
052                else {
053                    if (e != null) {
054                        debug("poll(): the peek element [{0}], from P[{1}] is not eligible to poll", e.getElement().toString(), i);
055                    }
056                    e = null;
057                    Iterator<QueueElement<E>> iter = queues[i - 1].iterator();
058                    while(e == null && iter.hasNext()) {
059                        e = iter.next();
060                        if (e.getDelay(TimeUnit.MILLISECONDS) <= 0 && eligibleToPoll(e)) {
061                            queues[i - 1].remove(e);
062                        }
063                        else {
064                            debug("poll(): the iterator element [{0}], from P[{1}] is not eligible to poll", e.getElement().toString(), i);
065                            e = null;
066                        }
067                    }
068                }
069            }
070            if (e != null) {
071                if (currentSize != null) {
072                    currentSize.decrementAndGet();
073                }
074                e.inQueue = false;
075                debug("poll(): [{0}], from P[{1}]", e.getElement().toString(), i);
076            }
077            return e;
078        }
079        finally {
080            lock.unlock();
081        }
082    }
083
084    /**
085     * Method for checking the QueueElement eligible to poll before remove it from queue.
086     * <p/>
087     * This method should be overriden for checking purposes.
088     *
089     * @param element the element to check
090     */
091    protected boolean eligibleToPoll(QueueElement<?> element) {
092        return true;
093    }
094}