001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.util;
019    
020    import java.util.Iterator;
021    import java.util.concurrent.TimeUnit;
022    
023    public class PollablePriorityDelayQueue<E> extends PriorityDelayQueue<E> {
024    
025        public PollablePriorityDelayQueue(int priorities, long maxWait, TimeUnit unit, int maxSize) {
026            super(priorities, maxWait, unit, maxSize);
027        }
028    
029        /**
030         * Retrieve and remove the head of this queue if it is eligible to poll. If not, iterate next element until find
031         * the one which is eligible to poll from queue.
032         *
033         * Return <tt>null</tt> if this queue has no elements eligible to run.
034         *
035         * <p/>
036         * Invocations to this method run the anti-starvation (once every interval check).
037         *
038         * @return the element of this queue, for which eligibleToPoll is true.
039         */
040        @Override
041        public QueueElement<E> poll() {
042            try {
043                lock.lock();
044                antiStarvation();
045                QueueElement<E> e = null;
046                int i = priorities;
047                for (; e == null && i > 0; i--) {
048                    e = queues[i - 1].peek();
049                    if (eligibleToPoll(e)) {
050                        e = queues[i - 1].poll();
051                    }
052                    else {
053                        if (e != null) {
054                            debug("poll(): the peek element [{0}], from P[{1}] is not eligible to poll", e.getElement().toString(), i);
055                        }
056                        e = null;
057                        Iterator<QueueElement<E>> iter = queues[i - 1].iterator();
058                        while(e == null && iter.hasNext()) {
059                            e = iter.next();
060                            if (eligibleToPoll(e)) {
061                                queues[i - 1].remove(e);
062                            }
063                            else {
064                                debug("poll(): the iterator element [{0}], from P[{1}] is not eligible to poll", e.getElement().toString(), i);
065                                e = null;
066                            }
067                        }
068                    }
069                }
070                if (e != null) {
071                    if (currentSize != null) {
072                        currentSize.decrementAndGet();
073                    }
074                    e.inQueue = false;
075                    debug("poll(): [{0}], from P[{1}]", e.getElement().toString(), i);
076                }
077                return e;
078            }
079            finally {
080                lock.unlock();
081            }
082        }
083    
084        /**
085         * Method for checking the QueueElement eligible to poll before remove it from queue.
086         * <p/>
087         * This method should be overriden for checking purposes.
088         *
089         * @param element the element to check
090         */
091        protected boolean eligibleToPoll(QueueElement<?> element) {
092            return true;
093        }
094    }