001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.util; 020 021import java.util.Iterator; 022import java.util.concurrent.TimeUnit; 023 024public class PollablePriorityDelayQueue<E> extends PriorityDelayQueue<E> { 025 026 public PollablePriorityDelayQueue(int priorities, long maxWait, TimeUnit unit, int maxSize) { 027 super(priorities, maxWait, unit, maxSize); 028 } 029 030 /** 031 * Retrieve and remove the head of this queue if it is eligible to poll. If not, iterate next element until find 032 * the one which is eligible to poll from queue. 033 * 034 * Return <tt>null</tt> if this queue has no elements eligible to run. 035 * 036 * <p> 037 * Invocations to this method run the anti-starvation (once every interval check). 038 * 039 * @return the element of this queue, for which eligibleToPoll is true. 040 */ 041 @Override 042 public QueueElement<E> poll() { 043 lock.lock(); 044 try { 045 antiStarvation(); 046 QueueElement<E> e = null; 047 int i = priorities; 048 for (; e == null && i > 0; i--) { 049 e = queues[i - 1].peek(); 050 if (eligibleToPoll(e)) { 051 e = queues[i - 1].poll(); 052 } 053 else { 054 if (e != null) { 055 debug("poll(): the peek element [{0}], from P[{1}] is not eligible to poll", e.getElement().toString(), i); 056 } 057 e = null; 058 Iterator<QueueElement<E>> iter = queues[i - 1].iterator(); 059 while(e == null && iter.hasNext()) { 060 e = iter.next(); 061 if (e.getDelay(TimeUnit.MILLISECONDS) <= 0 && eligibleToPoll(e)) { 062 queues[i - 1].remove(e); 063 } 064 else { 065 debug("poll(): the iterator element [{0}], from P[{1}] is not eligible to poll", e.getElement().toString(), i); 066 e = null; 067 } 068 } 069 } 070 } 071 if (e != null) { 072 if (currentSize != null) { 073 currentSize.decrementAndGet(); 074 } 075 e.inQueue = false; 076 debug("poll(): [{0}], from P[{1}]", e.getElement().toString(), i); 077 } 078 return e; 079 } 080 finally { 081 lock.unlock(); 082 } 083 } 084 085 /** 086 * Method for checking the QueueElement eligible to poll before remove it from queue. 087 * <p> 088 * This method should be overriden for checking purposes. 089 * 090 * @param element the element to check 091 */ 092 protected boolean eligibleToPoll(QueueElement<?> element) { 093 return true; 094 } 095}