001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.util; 019 020 import java.util.Iterator; 021 import java.util.concurrent.TimeUnit; 022 023 public class PollablePriorityDelayQueue<E> extends PriorityDelayQueue<E> { 024 025 public PollablePriorityDelayQueue(int priorities, long maxWait, TimeUnit unit, int maxSize) { 026 super(priorities, maxWait, unit, maxSize); 027 } 028 029 /** 030 * Retrieve and remove the head of this queue if it is eligible to poll. If not, iterate next element until find 031 * the one which is eligible to poll from queue. 032 * 033 * Return <tt>null</tt> if this queue has no elements eligible to run. 034 * 035 * <p/> 036 * Invocations to this method run the anti-starvation (once every interval check). 037 * 038 * @return the element of this queue, for which eligibleToPoll is true. 039 */ 040 @Override 041 public QueueElement<E> poll() { 042 try { 043 lock.lock(); 044 antiStarvation(); 045 QueueElement<E> e = null; 046 int i = priorities; 047 for (; e == null && i > 0; i--) { 048 e = queues[i - 1].peek(); 049 if (eligibleToPoll(e)) { 050 e = queues[i - 1].poll(); 051 } 052 else { 053 if (e != null) { 054 debug("poll(): the peek element [{0}], from P[{1}] is not eligible to poll", e.getElement().toString(), i); 055 } 056 e = null; 057 Iterator<QueueElement<E>> iter = queues[i - 1].iterator(); 058 while(e == null && iter.hasNext()) { 059 e = iter.next(); 060 if (eligibleToPoll(e)) { 061 queues[i - 1].remove(e); 062 } 063 else { 064 debug("poll(): the iterator element [{0}], from P[{1}] is not eligible to poll", e.getElement().toString(), i); 065 e = null; 066 } 067 } 068 } 069 } 070 if (e != null) { 071 if (currentSize != null) { 072 currentSize.decrementAndGet(); 073 } 074 e.inQueue = false; 075 debug("poll(): [{0}], from P[{1}]", e.getElement().toString(), i); 076 } 077 return e; 078 } 079 finally { 080 lock.unlock(); 081 } 082 } 083 084 /** 085 * Method for checking the QueueElement eligible to poll before remove it from queue. 086 * <p/> 087 * This method should be overriden for checking purposes. 088 * 089 * @param element the element to check 090 */ 091 protected boolean eligibleToPoll(QueueElement<?> element) { 092 return true; 093 } 094 }