001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.lock;
020
021import java.util.concurrent.ConcurrentMap;
022import java.util.concurrent.TimeUnit;
023import java.util.concurrent.locks.ReentrantReadWriteLock;
024import java.util.concurrent.locks.Lock;
025import org.apache.oozie.service.MemoryLocksService.Type;
026
027import com.google.common.collect.MapMaker;
028
029/**
030 * In memory resource locking that provides READ/WRITE lock capabilities.
031 */
032public class MemoryLocks {
033
034    final private ConcurrentMap<String, ReentrantReadWriteLock> locks = new MapMaker().weakValues().makeMap();
035
036    /**
037     * Implementation of {@link LockToken} for in memory locks.
038     */
039    class MemoryLockToken implements LockToken {
040        private final ReentrantReadWriteLock lockEntry;
041        private final Type type;
042
043        public MemoryLockToken(ReentrantReadWriteLock lockEntry, Type type) {
044            this.lockEntry = lockEntry;
045            this.type = type;
046
047        }
048
049        /**
050         * Release the lock.
051         */
052        @Override
053        public void release() {
054            switch (type) {
055                case WRITE:
056                    lockEntry.writeLock().unlock();
057                    break;
058                case READ:
059                    lockEntry.readLock().unlock();
060                    break;
061            }
062        }
063    }
064
065    /**
066     * Return the number of active locks.
067     *
068     * @return the number of active locks.
069     */
070    public int size() {
071        return locks.size();
072    }
073
074    /**
075     * Obtain a lock for a source.
076     *
077     * @param resource resource name.
078     * @param type lock type.
079     * @param wait time out in milliseconds to wait for the lock, -1 means no timeout and 0 no wait.
080     * @return the lock token for the resource, or <code>null</code> if the lock could not be obtained.
081     * @throws InterruptedException thrown if the thread was interrupted while waiting.
082     */
083    public MemoryLockToken getLock(final String resource, Type type, long wait) throws InterruptedException {
084        ReentrantReadWriteLock lockEntry = locks.get(resource);
085        if (lockEntry == null) {
086            ReentrantReadWriteLock newLock = new ReentrantReadWriteLock(true);
087            lockEntry = locks.putIfAbsent(resource, newLock);
088            if (lockEntry == null) {
089                lockEntry = newLock;
090            }
091        }
092        Lock lock = (type.equals(Type.READ)) ? lockEntry.readLock() : lockEntry.writeLock();
093
094        if (wait == -1) {
095            lock.lock();
096        }
097        else {
098            if (wait > 0) {
099                if (!lock.tryLock(wait, TimeUnit.MILLISECONDS)) {
100                    return null;
101                }
102            }
103            else {
104                if (!lock.tryLock()) {
105                    return null;
106                }
107            }
108        }
109        synchronized (locks) {
110            if (!locks.containsKey(resource)) {
111                locks.put(resource, lockEntry);
112            }
113        }
114        return new MemoryLockToken(lockEntry, type);
115    }
116
117    public ConcurrentMap<String, ReentrantReadWriteLock> getLockMap(){
118        return locks;
119    }
120}