001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.service;
019
020import java.util.HashMap;
021import java.util.concurrent.TimeUnit;
022import org.apache.curator.framework.recipes.locks.InterProcessMutex;
023import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
024import org.apache.oozie.ErrorCode;
025import org.apache.oozie.util.Instrumentable;
026import org.apache.oozie.util.Instrumentation;
027import org.apache.oozie.lock.LockToken;
028import org.apache.oozie.util.XLog;
029import org.apache.oozie.util.ZKUtils;
030import java.io.IOException;
031import java.util.concurrent.ScheduledExecutorService;
032import org.apache.curator.framework.recipes.locks.ChildReaper;
033import org.apache.curator.framework.recipes.locks.Reaper;
034import org.apache.curator.utils.ThreadUtils;
035import com.google.common.annotations.VisibleForTesting;
036
037/**
038 * Service that provides distributed locks via ZooKeeper.  Requires that a ZooKeeper ensemble is available.  The locks will be
039 * located under a ZNode named "locks" under the namespace (see {@link ZKUtils}).  For example, with default settings, if the
040 * resource we're locking is called "foo", then the ZNode backing the lock will be at /oozie/locks/foo.
041 */
042public class ZKLocksService extends MemoryLocksService implements Service, Instrumentable {
043
044    private ZKUtils zk;
045    private static XLog LOG = XLog.getLog(ZKLocksService.class);
046    public static final String LOCKS_NODE = "/locks";
047
048    final private HashMap<String, InterProcessReadWriteLock> zkLocks = new HashMap<String, InterProcessReadWriteLock>();
049
050    private static final String REAPING_LEADER_PATH = ZKUtils.ZK_BASE_SERVICES_PATH + "/locksChildReaperLeaderPath";
051    public static final int DEFAULT_REAPING_THRESHOLD = 300; // In sec
052    public static final String REAPING_THRESHOLD = CONF_PREFIX + "ZKLocksService.locks.reaper.threshold";
053    public static final String REAPING_THREADS = CONF_PREFIX + "ZKLocksService.locks.reaper.threads";
054    private ChildReaper reaper = null;
055
056    /**
057     * Initialize the zookeeper locks service
058     *
059     * @param services services instance.
060     */
061    @Override
062    public void init(Services services) throws ServiceException {
063        super.init(services);
064        try {
065            zk = ZKUtils.register(this);
066            reaper = new ChildReaper(zk.getClient(), LOCKS_NODE, Reaper.Mode.REAP_INDEFINITELY, getExecutorService(),
067                    services.getConf().getInt(REAPING_THRESHOLD, DEFAULT_REAPING_THRESHOLD) * 1000, REAPING_LEADER_PATH);
068            reaper.start();
069        }
070        catch (Exception ex) {
071            throw new ServiceException(ErrorCode.E1700, ex.getMessage(), ex);
072        }
073    }
074
075    /**
076     * Destroy the zookeeper locks service.
077     */
078    @Override
079    public void destroy() {
080        if (reaper != null) {
081            try {
082                reaper.close();
083            }
084            catch (IOException e) {
085                LOG.error("Error closing childReaper", e);
086            }
087        }
088
089        if (zk != null) {
090            zk.unregister(this);
091        }
092        zk = null;
093        super.destroy();
094    }
095
096    /**
097     * Instruments the zookeeper locks service.
098     *
099     * @param instr instance to instrument the memory locks service to.
100     */
101    @Override
102    public void instrument(Instrumentation instr) {
103        // Similar to MemoryLocksService's instrumentation, though this is only the number of locks this Oozie server currently has
104        instr.addVariable(INSTRUMENTATION_GROUP, "locks", new Instrumentation.Variable<Integer>() {
105            @Override
106            public Integer getValue() {
107                return zkLocks.size();
108            }
109        });
110    }
111
112    /**
113     * Obtain a READ lock for a source.
114     *
115     * @param resource resource name.
116     * @param wait time out in milliseconds to wait for the lock, -1 means no timeout and 0 no wait.
117     * @return the lock token for the resource, or <code>null</code> if the lock could not be obtained.
118     * @throws InterruptedException thrown if the thread was interrupted while waiting.
119     */
120    @Override
121    public LockToken getReadLock(String resource, long wait) throws InterruptedException {
122        InterProcessReadWriteLock lockEntry;
123        synchronized (zkLocks) {
124            if (zkLocks.containsKey(resource)) {
125                lockEntry = zkLocks.get(resource);
126            }
127            else {
128                lockEntry = new InterProcessReadWriteLock(zk.getClient(), LOCKS_NODE + "/" + resource);
129                zkLocks.put(resource, lockEntry);
130            }
131        }
132        InterProcessMutex readLock = lockEntry.readLock();
133        return acquireLock(wait, readLock, resource);
134    }
135
136    /**
137     * Obtain a WRITE lock for a source.
138     *
139     * @param resource resource name.
140     * @param wait time out in milliseconds to wait for the lock, -1 means no timeout and 0 no wait.
141     * @return the lock token for the resource, or <code>null</code> if the lock could not be obtained.
142     * @throws InterruptedException thrown if the thread was interrupted while waiting.
143     */
144    @Override
145    public LockToken getWriteLock(String resource, long wait) throws InterruptedException {
146        InterProcessReadWriteLock lockEntry;
147        synchronized (zkLocks) {
148            if (zkLocks.containsKey(resource)) {
149                lockEntry = zkLocks.get(resource);
150            }
151            else {
152                lockEntry = new InterProcessReadWriteLock(zk.getClient(), LOCKS_NODE + "/" + resource);
153                zkLocks.put(resource, lockEntry);
154            }
155        }
156        InterProcessMutex writeLock = lockEntry.writeLock();
157        return acquireLock(wait, writeLock, resource);
158    }
159
160    private LockToken acquireLock(long wait, InterProcessMutex lock, String resource) {
161        ZKLockToken token = null;
162        try {
163            if (wait == -1) {
164                lock.acquire();
165                token = new ZKLockToken(lock, resource);
166            }
167            else if (lock.acquire(wait, TimeUnit.MILLISECONDS)) {
168                token = new ZKLockToken(lock, resource);
169            }
170        }
171        catch (Exception ex) {
172            throw new RuntimeException(ex);
173        }
174        return token;
175    }
176
177    /**
178     * Implementation of {@link LockToken} for zookeeper locks.
179     */
180    class ZKLockToken implements LockToken {
181        private final InterProcessMutex lock;
182        private final String resource;
183
184        private ZKLockToken(InterProcessMutex lock, String resource) {
185            this.lock = lock;
186            this.resource = resource;
187        }
188
189        /**
190         * Release the lock.
191         */
192        @Override
193        public void release() {
194            try {
195                lock.release();
196                int val = lock.getParticipantNodes().size();
197                //TODO this might break, when count is zero and before we remove lock, same thread may ask for same lock.
198                // Hashmap will return the lock, but eventually release will remove it from hashmap and a immediate getlock will
199                //create a new instance. Will fix this as part of OOZIE-1922
200                if (val == 0) {
201                    synchronized (zkLocks) {
202                        zkLocks.remove(resource);
203                    }
204                }
205            }
206            catch (Exception ex) {
207                LOG.warn("Could not release lock: " + ex.getMessage(), ex);
208            }
209
210        }
211    }
212
213    @VisibleForTesting
214    public HashMap<String, InterProcessReadWriteLock> getLocks(){
215        return zkLocks;
216    }
217
218    private static ScheduledExecutorService getExecutorService() {
219        return ThreadUtils.newFixedThreadScheduledPool(Services.get().getConf().getInt(REAPING_THREADS, 2),
220                "ZKLocksChildReaper");
221    }
222
223}