001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.lock; 020 021import java.util.concurrent.ConcurrentMap; 022import java.util.concurrent.TimeUnit; 023import java.util.concurrent.locks.ReentrantReadWriteLock; 024import java.util.concurrent.locks.Lock; 025import org.apache.oozie.service.MemoryLocksService.Type; 026 027import com.google.common.collect.MapMaker; 028 029/** 030 * In memory resource locking that provides READ/WRITE lock capabilities. 031 */ 032public class MemoryLocks { 033 034 final private ConcurrentMap<String, ReentrantReadWriteLock> locks = new MapMaker().weakValues().makeMap(); 035 036 /** 037 * Implementation of {@link LockToken} for in memory locks. 038 */ 039 class MemoryLockToken implements LockToken { 040 private final ReentrantReadWriteLock lockEntry; 041 private final Type type; 042 043 public MemoryLockToken(ReentrantReadWriteLock lockEntry, Type type) { 044 this.lockEntry = lockEntry; 045 this.type = type; 046 047 } 048 049 /** 050 * Release the lock. 051 */ 052 @Override 053 public void release() { 054 switch (type) { 055 case WRITE: 056 lockEntry.writeLock().unlock(); 057 break; 058 case READ: 059 lockEntry.readLock().unlock(); 060 break; 061 } 062 } 063 } 064 065 /** 066 * Return the number of active locks. 067 * 068 * @return the number of active locks. 069 */ 070 public int size() { 071 return locks.size(); 072 } 073 074 /** 075 * Obtain a lock for a source. 076 * 077 * @param resource resource name. 078 * @param type lock type. 079 * @param wait time out in milliseconds to wait for the lock, -1 means no timeout and 0 no wait. 080 * @return the lock token for the resource, or <code>null</code> if the lock could not be obtained. 081 * @throws InterruptedException thrown if the thread was interrupted while waiting. 082 */ 083 public MemoryLockToken getLock(final String resource, Type type, long wait) throws InterruptedException { 084 ReentrantReadWriteLock lockEntry = locks.get(resource); 085 if (lockEntry == null) { 086 ReentrantReadWriteLock newLock = new ReentrantReadWriteLock(true); 087 lockEntry = locks.putIfAbsent(resource, newLock); 088 if (lockEntry == null) { 089 lockEntry = newLock; 090 } 091 } 092 Lock lock = (type.equals(Type.READ)) ? lockEntry.readLock() : lockEntry.writeLock(); 093 094 if (wait == -1) { 095 lock.lock(); 096 } 097 else { 098 if (wait > 0) { 099 if (!lock.tryLock(wait, TimeUnit.MILLISECONDS)) { 100 return null; 101 } 102 } 103 else { 104 if (!lock.tryLock()) { 105 return null; 106 } 107 } 108 } 109 synchronized (locks) { 110 if (!locks.containsKey(resource)) { 111 locks.put(resource, lockEntry); 112 } 113 } 114 return new MemoryLockToken(lockEntry, type); 115 } 116 117 public ConcurrentMap<String, ReentrantReadWriteLock> getLockMap(){ 118 return locks; 119 } 120}