001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.lock; 019 020import java.util.HashMap; 021import java.util.concurrent.TimeUnit; 022import java.util.concurrent.locks.ReentrantReadWriteLock; 023import java.util.concurrent.locks.Lock; 024 025/** 026 * In memory resource locking that provides READ/WRITE lock capabilities. 027 */ 028public class MemoryLocks { 029 final private HashMap<String, ReentrantReadWriteLock> locks = new HashMap<String, ReentrantReadWriteLock>(); 030 031 private static enum Type { 032 READ, WRITE 033 } 034 035 /** 036 * Implementation of {@link LockToken} for in memory locks. 037 */ 038 class MemoryLockToken implements LockToken { 039 private final ReentrantReadWriteLock rwLock; 040 private final java.util.concurrent.locks.Lock lock; 041 private final String resource; 042 043 private MemoryLockToken(ReentrantReadWriteLock rwLock, java.util.concurrent.locks.Lock lock, String resource) { 044 this.rwLock = rwLock; 045 this.lock = lock; 046 this.resource = resource; 047 } 048 049 /** 050 * Release the lock. 051 */ 052 @Override 053 public void release() { 054 int val = rwLock.getQueueLength(); 055 if (val == 0) { 056 synchronized (locks) { 057 locks.remove(resource); 058 } 059 } 060 lock.unlock(); 061 } 062 } 063 064 /** 065 * Return the number of active locks. 066 * 067 * @return the number of active locks. 068 */ 069 public int size() { 070 return locks.size(); 071 } 072 073 /** 074 * Obtain a READ lock for a source. 075 * 076 * @param resource resource name. 077 * @param wait time out in milliseconds to wait for the lock, -1 means no timeout and 0 no wait. 078 * @return the lock token for the resource, or <code>null</code> if the lock could not be obtained. 079 * @throws InterruptedException thrown if the thread was interrupted while waiting. 080 */ 081 public MemoryLockToken getReadLock(String resource, long wait) throws InterruptedException { 082 return getLock(resource, Type.READ, wait); 083 } 084 085 /** 086 * Obtain a WRITE lock for a source. 087 * 088 * @param resource resource name. 089 * @param wait time out in milliseconds to wait for the lock, -1 means no timeout and 0 no wait. 090 * @return the lock token for the resource, or <code>null</code> if the lock could not be obtained. 091 * @throws InterruptedException thrown if the thread was interrupted while waiting. 092 */ 093 public MemoryLockToken getWriteLock(String resource, long wait) throws InterruptedException { 094 return getLock(resource, Type.WRITE, wait); 095 } 096 097 private MemoryLockToken getLock(String resource, Type type, long wait) throws InterruptedException { 098 ReentrantReadWriteLock lockEntry; 099 synchronized (locks) { 100 if (locks.containsKey(resource)) { 101 lockEntry = locks.get(resource); 102 } 103 else { 104 lockEntry = new ReentrantReadWriteLock(true); 105 locks.put(resource, lockEntry); 106 } 107 } 108 109 Lock lock = (type.equals(Type.READ)) ? lockEntry.readLock() : lockEntry.writeLock(); 110 111 if (wait == -1) { 112 lock.lock(); 113 } 114 else { 115 if (wait > 0) { 116 if (!lock.tryLock(wait, TimeUnit.MILLISECONDS)) { 117 return null; 118 } 119 } 120 else { 121 if (!lock.tryLock()) { 122 return null; 123 } 124 } 125 } 126 synchronized (locks) { 127 if (!locks.containsKey(resource)) { 128 locks.put(resource, lockEntry); 129 } 130 } 131 return new MemoryLockToken(lockEntry, lock, resource); 132 } 133}