This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.util;
019
020 import java.util.HashMap;
021 import java.util.concurrent.TimeUnit;
022 import java.util.concurrent.locks.ReentrantReadWriteLock;
023 import java.util.concurrent.locks.Lock;
024
025 /**
026 * In memory resource locking that provides READ/WRITE lock capabilities.
027 */
028 public class MemoryLocks {
029 final private HashMap<String, ReentrantReadWriteLock> locks = new HashMap<String, ReentrantReadWriteLock>();
030
031 private static enum Type {
032 READ, WRITE
033 }
034
035 /**
036 * Lock token returned when obtaining a lock, the token must be released when the lock is not needed anymore.
037 */
038 public class LockToken {
039 private final ReentrantReadWriteLock rwLock;
040 private final java.util.concurrent.locks.Lock lock;
041 private final String resource;
042
043 private LockToken(ReentrantReadWriteLock rwLock, java.util.concurrent.locks.Lock lock, String resource) {
044 this.rwLock = rwLock;
045 this.lock = lock;
046 this.resource = resource;
047 }
048
049 /**
050 * Release the lock.
051 */
052 public void release() {
053 int val = rwLock.getQueueLength();
054 if (val == 0) {
055 synchronized (locks) {
056 locks.remove(resource);
057 }
058 }
059 lock.unlock();
060 }
061 }
062
063 /**
064 * Return the number of active locks.
065 *
066 * @return the number of active locks.
067 */
068 public int size() {
069 return locks.size();
070 }
071
072 /**
073 * Obtain a READ lock for a source.
074 *
075 * @param resource resource name.
076 * @param wait time out in milliseconds to wait for the lock, -1 means no timeout and 0 no wait.
077 * @return the lock token for the resource, or <code>null</code> if the lock could not be obtained.
078 * @throws InterruptedException thrown if the thread was interrupted while waiting.
079 */
080 public LockToken getReadLock(String resource, long wait) throws InterruptedException {
081 return getLock(resource, Type.READ, wait);
082 }
083
084 /**
085 * Obtain a WRITE lock for a source.
086 *
087 * @param resource resource name.
088 * @param wait time out in milliseconds to wait for the lock, -1 means no timeout and 0 no wait.
089 * @return the lock token for the resource, or <code>null</code> if the lock could not be obtained.
090 * @throws InterruptedException thrown if the thread was interrupted while waiting.
091 */
092 public LockToken getWriteLock(String resource, long wait) throws InterruptedException {
093 return getLock(resource, Type.WRITE, wait);
094 }
095
096 private LockToken getLock(String resource, Type type, long wait) throws InterruptedException {
097 ReentrantReadWriteLock lockEntry;
098 synchronized (locks) {
099 if (locks.containsKey(resource)) {
100 lockEntry = locks.get(resource);
101 }
102 else {
103 lockEntry = new ReentrantReadWriteLock(true);
104 locks.put(resource, lockEntry);
105 }
106 }
107
108 Lock lock = (type.equals(Type.READ)) ? lockEntry.readLock() : lockEntry.writeLock();
109
110 if (wait == -1) {
111 lock.lock();
112 }
113 else {
114 if (wait > 0) {
115 if (!lock.tryLock(wait, TimeUnit.MILLISECONDS)) {
116 return null;
117 }
118 }
119 else {
120 if (!lock.tryLock()) {
121 return null;
122 }
123 }
124 }
125 synchronized (locks) {
126 if (!locks.containsKey(resource)) {
127 locks.put(resource, lockEntry);
128 }
129 }
130 return new LockToken(lockEntry, lock, resource);
131 }
132
133 }