001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.service;
019
020import java.util.concurrent.ConcurrentMap;
021import java.util.concurrent.TimeUnit;
022
023import org.apache.curator.framework.recipes.locks.InterProcessMutex;
024import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
025import org.apache.oozie.ErrorCode;
026import org.apache.oozie.util.Instrumentable;
027import org.apache.oozie.util.Instrumentation;
028import org.apache.oozie.event.listener.ZKConnectionListener;
029import org.apache.oozie.lock.LockToken;
030import org.apache.oozie.util.XLog;
031import org.apache.oozie.util.ZKUtils;
032
033import java.io.IOException;
034import java.util.concurrent.ScheduledExecutorService;
035
036import org.apache.curator.framework.recipes.locks.ChildReaper;
037import org.apache.curator.framework.recipes.locks.Reaper;
038import org.apache.curator.framework.state.ConnectionState;
039import org.apache.curator.utils.ThreadUtils;
040
041import com.google.common.annotations.VisibleForTesting;
042import com.google.common.collect.MapMaker;
043
044/**
045 * Service that provides distributed locks via ZooKeeper.  Requires that a ZooKeeper ensemble is available.  The locks will be
046 * located under a ZNode named "locks" under the namespace (see {@link ZKUtils}).  For example, with default settings, if the
047 * resource we're locking is called "foo", then the ZNode backing the lock will be at /oozie/locks/foo.
048 */
049public class ZKLocksService extends MemoryLocksService implements Service, Instrumentable {
050
051    private ZKUtils zk;
052    private static XLog LOG = XLog.getLog(ZKLocksService.class);
053    public static final String LOCKS_NODE = "/locks";
054
055    private ConcurrentMap<String, InterProcessReadWriteLock> zkLocks = new MapMaker().weakValues().makeMap();
056
057
058    private static final String REAPING_LEADER_PATH = ZKUtils.ZK_BASE_SERVICES_PATH + "/locksChildReaperLeaderPath";
059    public static final String REAPING_THRESHOLD = CONF_PREFIX + "ZKLocksService.locks.reaper.threshold";
060    public static final String REAPING_THREADS = CONF_PREFIX + "ZKLocksService.locks.reaper.threads";
061    private ChildReaper reaper = null;
062
063    /**
064     * Initialize the zookeeper locks service
065     *
066     * @param services services instance.
067     */
068    @Override
069    public void init(Services services) throws ServiceException {
070        super.init(services);
071        try {
072            zk = ZKUtils.register(this);
073            reaper = new ChildReaper(zk.getClient(), LOCKS_NODE, Reaper.Mode.REAP_UNTIL_GONE, getExecutorService(),
074                    ConfigurationService.getInt(services.getConf(), REAPING_THRESHOLD) * 1000, REAPING_LEADER_PATH);
075            reaper.start();
076        }
077        catch (Exception ex) {
078            throw new ServiceException(ErrorCode.E1700, ex.getMessage(), ex);
079        }
080    }
081
082    /**
083     * Destroy the zookeeper locks service.
084     */
085    @Override
086    public void destroy() {
087        if (reaper != null) {
088            try {
089                reaper.close();
090            }
091            catch (IOException e) {
092                LOG.error("Error closing childReaper", e);
093            }
094        }
095        if (zk != null) {
096            zk.unregister(this);
097        }
098        zk = null;
099        super.destroy();
100    }
101
102    /**
103     * Instruments the zookeeper locks service.
104     *
105     * @param instr instance to instrument the memory locks service to.
106     */
107    @Override
108    public void instrument(Instrumentation instr) {
109        // Similar to MemoryLocksService's instrumentation, though this is only the number of locks this Oozie server currently has
110        instr.addVariable(INSTRUMENTATION_GROUP, "locks", new Instrumentation.Variable<Integer>() {
111            @Override
112            public Integer getValue() {
113                return zkLocks.size();
114            }
115        });
116    }
117
118    /**
119     * Obtain a READ lock for a source.
120     *
121     * @param resource resource name.
122     * @param wait time out in milliseconds to wait for the lock, -1 means no timeout and 0 no wait.
123     * @return the lock token for the resource, or <code>null</code> if the lock could not be obtained.
124     * @throws InterruptedException thrown if the thread was interrupted while waiting.
125     */
126    @Override
127    public LockToken getReadLock(String resource, long wait) throws InterruptedException {
128        return acquireLock(resource, Type.READ, wait);
129    }
130
131    /**
132     * Obtain a WRITE lock for a source.
133     *
134     * @param resource resource name.
135     * @param wait time out in milliseconds to wait for the lock, -1 means no timeout and 0 no wait.
136     * @return the lock token for the resource, or <code>null</code> if the lock could not be obtained.
137     * @throws InterruptedException thrown if the thread was interrupted while waiting.
138     */
139    @Override
140    public LockToken getWriteLock(String resource, long wait) throws InterruptedException {
141        return acquireLock(resource, Type.WRITE, wait);
142    }
143
144    private LockToken acquireLock(final String resource, Type type, long wait) throws InterruptedException {
145        InterProcessReadWriteLock lockEntry = zkLocks.get(resource);
146        if (lockEntry == null) {
147            InterProcessReadWriteLock newLock = new InterProcessReadWriteLock(zk.getClient(), LOCKS_NODE + "/" + resource);
148            lockEntry = zkLocks.putIfAbsent(resource, newLock);
149            if (lockEntry == null) {
150                lockEntry = newLock;
151            }
152        }
153        InterProcessMutex lock = (type.equals(Type.READ)) ? lockEntry.readLock() : lockEntry.writeLock();
154        ZKLockToken token = null;
155        try {
156            if (wait == -1) {
157                lock.acquire();
158                token = new ZKLockToken(lockEntry, type);
159            }
160            else if (lock.acquire(wait, TimeUnit.MILLISECONDS)) {
161                token = new ZKLockToken(lockEntry, type);
162            }
163        }
164        catch (Exception ex) {
165            //Not throwing exception. Should return null, so that command can be requeued
166            LOG.error("Error while acquiring lock", ex);
167        }
168        return token;
169    }
170
171    /**
172     * Implementation of {@link LockToken} for zookeeper locks.
173     */
174    class ZKLockToken implements LockToken {
175        private final InterProcessReadWriteLock lockEntry;
176        private final Type type;
177
178        private ZKLockToken(InterProcessReadWriteLock lockEntry, Type type) {
179            this.lockEntry = lockEntry;
180            this.type = type;
181        }
182
183        /**
184         * Release the lock.
185         */
186        @Override
187        public void release() {
188            try {
189                switch (type) {
190                    case WRITE:
191                        lockEntry.writeLock().release();
192                        break;
193                    case READ:
194                        lockEntry.readLock().release();
195                        break;
196                }
197            }
198            catch (Exception ex) {
199                LOG.warn("Could not release lock: " + ex.getMessage(), ex);
200            }
201        }
202    }
203
204    @VisibleForTesting
205    public ConcurrentMap<String, InterProcessReadWriteLock> getLocks(){
206        return zkLocks;
207    }
208
209    private static ScheduledExecutorService getExecutorService() {
210        return ThreadUtils.newFixedThreadScheduledPool(ConfigurationService.getInt(REAPING_THREADS),
211                "ZKLocksChildReaper");
212    }
213
214}