001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.lock;
019
020import java.util.HashMap;
021import java.util.concurrent.TimeUnit;
022import java.util.concurrent.locks.ReentrantReadWriteLock;
023import java.util.concurrent.locks.Lock;
024
025/**
026 * In memory resource locking that provides READ/WRITE lock capabilities.
027 */
028public class MemoryLocks {
029    final private HashMap<String, ReentrantReadWriteLock> locks = new HashMap<String, ReentrantReadWriteLock>();
030
031    private static enum Type {
032        READ, WRITE
033    }
034
035    /**
036     * Implementation of {@link LockToken} for in memory locks.
037     */
038    class MemoryLockToken implements LockToken {
039        private final ReentrantReadWriteLock rwLock;
040        private final java.util.concurrent.locks.Lock lock;
041        private final String resource;
042
043        private MemoryLockToken(ReentrantReadWriteLock rwLock, java.util.concurrent.locks.Lock lock, String resource) {
044            this.rwLock = rwLock;
045            this.lock = lock;
046            this.resource = resource;
047        }
048
049        /**
050         * Release the lock.
051         */
052        @Override
053        public void release() {
054            int val = rwLock.getQueueLength();
055            if (val == 0) {
056                synchronized (locks) {
057                    locks.remove(resource);
058                }
059            }
060            lock.unlock();
061        }
062    }
063
064    /**
065     * Return the number of active locks.
066     *
067     * @return the number of active locks.
068     */
069    public int size() {
070        return locks.size();
071    }
072
073    /**
074     * Obtain a READ lock for a source.
075     *
076     * @param resource resource name.
077     * @param wait time out in milliseconds to wait for the lock, -1 means no timeout and 0 no wait.
078     * @return the lock token for the resource, or <code>null</code> if the lock could not be obtained.
079     * @throws InterruptedException thrown if the thread was interrupted while waiting.
080     */
081    public MemoryLockToken getReadLock(String resource, long wait) throws InterruptedException {
082        return getLock(resource, Type.READ, wait);
083    }
084
085    /**
086     * Obtain a WRITE lock for a source.
087     *
088     * @param resource resource name.
089     * @param wait time out in milliseconds to wait for the lock, -1 means no timeout and 0 no wait.
090     * @return the lock token for the resource, or <code>null</code> if the lock could not be obtained.
091     * @throws InterruptedException thrown if the thread was interrupted while waiting.
092     */
093    public MemoryLockToken getWriteLock(String resource, long wait) throws InterruptedException {
094        return getLock(resource, Type.WRITE, wait);
095    }
096
097    private MemoryLockToken getLock(String resource, Type type, long wait) throws InterruptedException {
098        ReentrantReadWriteLock lockEntry;
099        synchronized (locks) {
100            if (locks.containsKey(resource)) {
101                lockEntry = locks.get(resource);
102            }
103            else {
104                lockEntry = new ReentrantReadWriteLock(true);
105                locks.put(resource, lockEntry);
106            }
107        }
108
109        Lock lock = (type.equals(Type.READ)) ? lockEntry.readLock() : lockEntry.writeLock();
110
111        if (wait == -1) {
112            lock.lock();
113        }
114        else {
115            if (wait > 0) {
116                if (!lock.tryLock(wait, TimeUnit.MILLISECONDS)) {
117                    return null;
118                }
119            }
120            else {
121                if (!lock.tryLock()) {
122                    return null;
123                }
124            }
125        }
126        synchronized (locks) {
127            if (!locks.containsKey(resource)) {
128                locks.put(resource, lockEntry);
129            }
130        }
131        return new MemoryLockToken(lockEntry, lock, resource);
132    }
133}