001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.util;
019    
020    import java.util.HashMap;
021    import java.util.concurrent.TimeUnit;
022    import java.util.concurrent.locks.ReentrantReadWriteLock;
023    import java.util.concurrent.locks.Lock;
024    
025    /**
026     * In memory resource locking that provides READ/WRITE lock capabilities.
027     */
028    public class MemoryLocks {
029        final private HashMap<String, ReentrantReadWriteLock> locks = new HashMap<String, ReentrantReadWriteLock>();
030    
031        private static enum Type {
032            READ, WRITE
033        }
034    
035        /**
036         * Lock token returned when obtaining a lock, the token must be released when the lock is not needed anymore.
037         */
038        public class LockToken {
039            private final ReentrantReadWriteLock rwLock;
040            private final java.util.concurrent.locks.Lock lock;
041            private final String resource;
042    
043            private LockToken(ReentrantReadWriteLock rwLock, java.util.concurrent.locks.Lock lock, String resource) {
044                this.rwLock = rwLock;
045                this.lock = lock;
046                this.resource = resource;
047            }
048    
049            /**
050             * Release the lock.
051             */
052            public void release() {
053                int val = rwLock.getQueueLength();
054                if (val == 0) {
055                    synchronized (locks) {
056                        locks.remove(resource);
057                    }
058                }
059                lock.unlock();
060            }
061        }
062    
063        /**
064         * Return the number of active locks.
065         *
066         * @return the number of active locks.
067         */
068        public int size() {
069            return locks.size();
070        }
071    
072        /**
073         * Obtain a READ lock for a source.
074         *
075         * @param resource resource name.
076         * @param wait time out in milliseconds to wait for the lock, -1 means no timeout and 0 no wait.
077         * @return the lock token for the resource, or <code>null</code> if the lock could not be obtained.
078         * @throws InterruptedException thrown if the thread was interrupted while waiting.
079         */
080        public LockToken getReadLock(String resource, long wait) throws InterruptedException {
081            return getLock(resource, Type.READ, wait);
082        }
083    
084        /**
085         * Obtain a WRITE lock for a source.
086         *
087         * @param resource resource name.
088         * @param wait time out in milliseconds to wait for the lock, -1 means no timeout and 0 no wait.
089         * @return the lock token for the resource, or <code>null</code> if the lock could not be obtained.
090         * @throws InterruptedException thrown if the thread was interrupted while waiting.
091         */
092        public LockToken getWriteLock(String resource, long wait) throws InterruptedException {
093            return getLock(resource, Type.WRITE, wait);
094        }
095    
096        private LockToken getLock(String resource, Type type, long wait) throws InterruptedException {
097            ReentrantReadWriteLock lockEntry;
098            synchronized (locks) {
099                if (locks.containsKey(resource)) {
100                    lockEntry = locks.get(resource);
101                }
102                else {
103                    lockEntry = new ReentrantReadWriteLock(true);
104                    locks.put(resource, lockEntry);
105                }
106            }
107    
108            Lock lock = (type.equals(Type.READ)) ? lockEntry.readLock() : lockEntry.writeLock();
109    
110            if (wait == -1) {
111                lock.lock();
112            }
113            else {
114                if (wait > 0) {
115                    if (!lock.tryLock(wait, TimeUnit.MILLISECONDS)) {
116                        return null;
117                    }
118                }
119                else {
120                    if (!lock.tryLock()) {
121                        return null;
122                    }
123                }
124            }
125            synchronized (locks) {
126                if (!locks.containsKey(resource)) {
127                    locks.put(resource, lockEntry);
128                }
129            }
130            return new LockToken(lockEntry, lock, resource);
131        }
132    
133    }