This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.service;
019
020import java.util.HashMap;
021import java.util.concurrent.TimeUnit;
022
023import org.apache.curator.framework.recipes.locks.InterProcessMutex;
024import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
025import org.apache.oozie.ErrorCode;
026import org.apache.oozie.util.Instrumentable;
027import org.apache.oozie.util.Instrumentation;
028import org.apache.oozie.event.listener.ZKConnectionListener;
029import org.apache.oozie.lock.LockToken;
030import org.apache.oozie.util.XLog;
031import org.apache.oozie.util.ZKUtils;
032
033import java.io.IOException;
034import java.util.concurrent.ScheduledExecutorService;
035
036import org.apache.curator.framework.recipes.locks.ChildReaper;
037import org.apache.curator.framework.recipes.locks.Reaper;
038import org.apache.curator.framework.state.ConnectionState;
039import org.apache.curator.utils.ThreadUtils;
040
041import com.google.common.annotations.VisibleForTesting;
042
043/**
044 * Service that provides distributed locks via ZooKeeper.  Requires that a ZooKeeper ensemble is available.  The locks will be
045 * located under a ZNode named "locks" under the namespace (see {@link ZKUtils}).  For example, with default settings, if the
046 * resource we're locking is called "foo", then the ZNode backing the lock will be at /oozie/locks/foo.
047 */
048public class ZKLocksService extends MemoryLocksService implements Service, Instrumentable {
049
050    private ZKUtils zk;
051    private static XLog LOG = XLog.getLog(ZKLocksService.class);
052    public static final String LOCKS_NODE = "/locks";
053
054    final private HashMap<String, InterProcessReadWriteLock> zkLocks = new HashMap<String, InterProcessReadWriteLock>();
055
056    private static final String REAPING_LEADER_PATH = ZKUtils.ZK_BASE_SERVICES_PATH + "/locksChildReaperLeaderPath";
057    public static final String REAPING_THRESHOLD = CONF_PREFIX + "ZKLocksService.locks.reaper.threshold";
058    public static final String REAPING_THREADS = CONF_PREFIX + "ZKLocksService.locks.reaper.threads";
059    private ChildReaper reaper = null;
060
061    /**
062     * Initialize the zookeeper locks service
063     *
064     * @param services services instance.
065     */
066    @Override
067    public void init(Services services) throws ServiceException {
068        super.init(services);
069        try {
070            zk = ZKUtils.register(this);
071            reaper = new ChildReaper(zk.getClient(), LOCKS_NODE, Reaper.Mode.REAP_UNTIL_GONE, getExecutorService(),
072                    ConfigurationService.getInt(services.getConf(), REAPING_THRESHOLD) * 1000, REAPING_LEADER_PATH);
073            reaper.start();
074        }
075        catch (Exception ex) {
076            throw new ServiceException(ErrorCode.E1700, ex.getMessage(), ex);
077        }
078    }
079
080    /**
081     * Destroy the zookeeper locks service.
082     */
083    @Override
084    public void destroy() {
085        if (reaper != null && ZKConnectionListener.getZKConnectionState() != ConnectionState.LOST) {
086            try {
087                reaper.close();
088            }
089            catch (IOException e) {
090                LOG.error("Error closing childReaper", e);
091            }
092        }
093        if (zk != null) {
094            zk.unregister(this);
095        }
096        zk = null;
097        super.destroy();
098    }
099
100    /**
101     * Instruments the zookeeper locks service.
102     *
103     * @param instr instance to instrument the memory locks service to.
104     */
105    @Override
106    public void instrument(Instrumentation instr) {
107        // Similar to MemoryLocksService's instrumentation, though this is only the number of locks this Oozie server currently has
108        instr.addVariable(INSTRUMENTATION_GROUP, "locks", new Instrumentation.Variable<Integer>() {
109            @Override
110            public Integer getValue() {
111                return zkLocks.size();
112            }
113        });
114    }
115
116    /**
117     * Obtain a READ lock for a source.
118     *
119     * @param resource resource name.
120     * @param wait time out in milliseconds to wait for the lock, -1 means no timeout and 0 no wait.
121     * @return the lock token for the resource, or <code>null</code> if the lock could not be obtained.
122     * @throws InterruptedException thrown if the thread was interrupted while waiting.
123     */
124    @Override
125    public LockToken getReadLock(String resource, long wait) throws InterruptedException {
126        InterProcessReadWriteLock lockEntry;
127        synchronized (zkLocks) {
128            if (zkLocks.containsKey(resource)) {
129                lockEntry = zkLocks.get(resource);
130            }
131            else {
132                lockEntry = new InterProcessReadWriteLock(zk.getClient(), LOCKS_NODE + "/" + resource);
133                zkLocks.put(resource, lockEntry);
134            }
135        }
136        InterProcessMutex readLock = lockEntry.readLock();
137        return acquireLock(wait, readLock, resource);
138    }
139
140    /**
141     * Obtain a WRITE lock for a source.
142     *
143     * @param resource resource name.
144     * @param wait time out in milliseconds to wait for the lock, -1 means no timeout and 0 no wait.
145     * @return the lock token for the resource, or <code>null</code> if the lock could not be obtained.
146     * @throws InterruptedException thrown if the thread was interrupted while waiting.
147     */
148    @Override
149    public LockToken getWriteLock(String resource, long wait) throws InterruptedException {
150        InterProcessReadWriteLock lockEntry;
151        synchronized (zkLocks) {
152            if (zkLocks.containsKey(resource)) {
153                lockEntry = zkLocks.get(resource);
154            }
155            else {
156                lockEntry = new InterProcessReadWriteLock(zk.getClient(), LOCKS_NODE + "/" + resource);
157                zkLocks.put(resource, lockEntry);
158            }
159        }
160        InterProcessMutex writeLock = lockEntry.writeLock();
161        return acquireLock(wait, writeLock, resource);
162    }
163
164    private LockToken acquireLock(long wait, InterProcessMutex lock, String resource) {
165        ZKLockToken token = null;
166        try {
167            if (wait == -1) {
168                lock.acquire();
169                token = new ZKLockToken(lock, resource);
170            }
171            else if (lock.acquire(wait, TimeUnit.MILLISECONDS)) {
172                token = new ZKLockToken(lock, resource);
173            }
174        }
175        catch (Exception ex) {
176            throw new RuntimeException(ex);
177        }
178        return token;
179    }
180
181    /**
182     * Implementation of {@link LockToken} for zookeeper locks.
183     */
184    class ZKLockToken implements LockToken {
185        private final InterProcessMutex lock;
186        private final String resource;
187
188        private ZKLockToken(InterProcessMutex lock, String resource) {
189            this.lock = lock;
190            this.resource = resource;
191        }
192
193        /**
194         * Release the lock.
195         */
196        @Override
197        public void release() {
198            try {
199                lock.release();
200                int val = lock.getParticipantNodes().size();
201                //TODO this might break, when count is zero and before we remove lock, same thread may ask for same lock.
202                // Hashmap will return the lock, but eventually release will remove it from hashmap and a immediate getlock will
203                //create a new instance. Will fix this as part of OOZIE-1922
204                if (val == 0) {
205                    synchronized (zkLocks) {
206                        zkLocks.remove(resource);
207                    }
208                }
209            }
210            catch (Exception ex) {
211                LOG.warn("Could not release lock: " + ex.getMessage(), ex);
212            }
213
214        }
215    }
216
217    @VisibleForTesting
218    public HashMap<String, InterProcessReadWriteLock> getLocks(){
219        return zkLocks;
220    }
221
222    private static ScheduledExecutorService getExecutorService() {
223        return ThreadUtils.newFixedThreadScheduledPool(ConfigurationService.getInt(REAPING_THREADS),
224                "ZKLocksChildReaper");
225    }
226
227}