001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.util; 019 020 import java.util.HashMap; 021 import java.util.concurrent.TimeUnit; 022 import java.util.concurrent.locks.ReentrantReadWriteLock; 023 import java.util.concurrent.locks.Lock; 024 025 /** 026 * In memory resource locking that provides READ/WRITE lock capabilities. 027 */ 028 public class MemoryLocks { 029 final private HashMap<String, ReentrantReadWriteLock> locks = new HashMap<String, ReentrantReadWriteLock>(); 030 031 private static enum Type { 032 READ, WRITE 033 } 034 035 /** 036 * Lock token returned when obtaining a lock, the token must be released when the lock is not needed anymore. 037 */ 038 public class LockToken { 039 private final ReentrantReadWriteLock rwLock; 040 private final java.util.concurrent.locks.Lock lock; 041 private final String resource; 042 043 private LockToken(ReentrantReadWriteLock rwLock, java.util.concurrent.locks.Lock lock, String resource) { 044 this.rwLock = rwLock; 045 this.lock = lock; 046 this.resource = resource; 047 } 048 049 /** 050 * Release the lock. 051 */ 052 public void release() { 053 int val = rwLock.getQueueLength(); 054 if (val == 0) { 055 synchronized (locks) { 056 locks.remove(resource); 057 } 058 } 059 lock.unlock(); 060 } 061 } 062 063 /** 064 * Return the number of active locks. 065 * 066 * @return the number of active locks. 067 */ 068 public int size() { 069 return locks.size(); 070 } 071 072 /** 073 * Obtain a READ lock for a source. 074 * 075 * @param resource resource name. 076 * @param wait time out in milliseconds to wait for the lock, -1 means no timeout and 0 no wait. 077 * @return the lock token for the resource, or <code>null</code> if the lock could not be obtained. 078 * @throws InterruptedException thrown if the thread was interrupted while waiting. 079 */ 080 public LockToken getReadLock(String resource, long wait) throws InterruptedException { 081 return getLock(resource, Type.READ, wait); 082 } 083 084 /** 085 * Obtain a WRITE lock for a source. 086 * 087 * @param resource resource name. 088 * @param wait time out in milliseconds to wait for the lock, -1 means no timeout and 0 no wait. 089 * @return the lock token for the resource, or <code>null</code> if the lock could not be obtained. 090 * @throws InterruptedException thrown if the thread was interrupted while waiting. 091 */ 092 public LockToken getWriteLock(String resource, long wait) throws InterruptedException { 093 return getLock(resource, Type.WRITE, wait); 094 } 095 096 private LockToken getLock(String resource, Type type, long wait) throws InterruptedException { 097 ReentrantReadWriteLock lockEntry; 098 synchronized (locks) { 099 if (locks.containsKey(resource)) { 100 lockEntry = locks.get(resource); 101 } 102 else { 103 lockEntry = new ReentrantReadWriteLock(true); 104 locks.put(resource, lockEntry); 105 } 106 } 107 108 Lock lock = (type.equals(Type.READ)) ? lockEntry.readLock() : lockEntry.writeLock(); 109 110 if (wait == -1) { 111 lock.lock(); 112 } 113 else { 114 if (wait > 0) { 115 if (!lock.tryLock(wait, TimeUnit.MILLISECONDS)) { 116 return null; 117 } 118 } 119 else { 120 if (!lock.tryLock()) { 121 return null; 122 } 123 } 124 } 125 synchronized (locks) { 126 if (!locks.containsKey(resource)) { 127 locks.put(resource, lockEntry); 128 } 129 } 130 return new LockToken(lockEntry, lock, resource); 131 } 132 133 }