001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.service;
019
020import java.util.concurrent.ConcurrentMap;
021import java.util.concurrent.TimeUnit;
022
023import org.apache.curator.framework.recipes.locks.InterProcessMutex;
024import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
025import org.apache.oozie.ErrorCode;
026import org.apache.oozie.util.Instrumentable;
027import org.apache.oozie.util.Instrumentation;
028import org.apache.oozie.lock.LockToken;
029import org.apache.oozie.util.XLog;
030import org.apache.oozie.util.ZKUtils;
031
032import java.io.IOException;
033import java.util.concurrent.ScheduledExecutorService;
034
035import org.apache.curator.framework.recipes.locks.ChildReaper;
036import org.apache.curator.framework.recipes.locks.Reaper;
037import org.apache.curator.utils.ThreadUtils;
038
039import com.google.common.annotations.VisibleForTesting;
040import com.google.common.collect.MapMaker;
041
042/**
043 * Service that provides distributed locks via ZooKeeper.  Requires that a ZooKeeper ensemble is available.  The locks will be
044 * located under a ZNode named "locks" under the namespace (see {@link ZKUtils}).  For example, with default settings, if the
045 * resource we're locking is called "foo", then the ZNode backing the lock will be at /oozie/locks/foo.
046 */
047public class ZKLocksService extends MemoryLocksService implements Service, Instrumentable {
048
049    private ZKUtils zk;
050    public static final String LOCKS_NODE = "/locks";
051
052    private static final XLog LOG = XLog.getLog(ZKLocksService.class);
053    private final ConcurrentMap<String, InterProcessReadWriteLock> zkLocks = new MapMaker().weakValues().makeMap();
054    private ChildReaper reaper = null;
055
056    private static final String REAPING_LEADER_PATH = ZKUtils.ZK_BASE_SERVICES_PATH + "/locksChildReaperLeaderPath";
057    static final String REAPING_THRESHOLD = CONF_PREFIX + "ZKLocksService.locks.reaper.threshold";
058    static final String REAPING_THREADS = CONF_PREFIX + "ZKLocksService.locks.reaper.threads";
059
060    /**
061     * Initialize the zookeeper locks service
062     *
063     * @param services services instance.
064     */
065    @Override
066    public void init(Services services) throws ServiceException {
067        super.init(services);
068        try {
069            zk = ZKUtils.register(this);
070            reaper = new ChildReaper(zk.getClient(), LOCKS_NODE, Reaper.Mode.REAP_UNTIL_GONE, getExecutorService(),
071                    ConfigurationService.getInt(services.getConf(), REAPING_THRESHOLD) * 1000, REAPING_LEADER_PATH);
072            reaper.start();
073        }
074        catch (Exception ex) {
075            throw new ServiceException(ErrorCode.E1700, ex.getMessage(), ex);
076        }
077    }
078
079    /**
080     * Destroy the zookeeper locks service.
081     */
082    @Override
083    public void destroy() {
084        if (reaper != null) {
085            try {
086                reaper.close();
087            }
088            catch (IOException e) {
089                LOG.error("Error closing childReaper", e);
090            }
091        }
092        if (zk != null) {
093            zk.unregister(this);
094        }
095        zk = null;
096        super.destroy();
097    }
098
099    /**
100     * Instruments the zookeeper locks service.
101     *
102     * @param instr instance to instrument the memory locks service to.
103     */
104    @Override
105    public void instrument(Instrumentation instr) {
106        // Similar to MemoryLocksService's instrumentation, though this is only the number of locks this Oozie server currently has
107        instr.addVariable(INSTRUMENTATION_GROUP, "locks", new Instrumentation.Variable<Integer>() {
108            @Override
109            public Integer getValue() {
110                return zkLocks.size();
111            }
112        });
113    }
114
115    /**
116     * Obtain a READ lock for a source.
117     *
118     * @param resource resource name.
119     * @param wait time out in milliseconds to wait for the lock, -1 means no timeout and 0 no wait.
120     * @return the lock token for the resource, or <code>null</code> if the lock could not be obtained.
121     * @throws InterruptedException thrown if the thread was interrupted while waiting.
122     */
123    @Override
124    public LockToken getReadLock(String resource, long wait) throws InterruptedException {
125        return acquireLock(resource, Type.READ, wait);
126    }
127
128    /**
129     * Obtain a WRITE lock for a source.
130     *
131     * @param resource resource name.
132     * @param wait time out in milliseconds to wait for the lock, -1 means no timeout and 0 no wait.
133     * @return the lock token for the resource, or <code>null</code> if the lock could not be obtained.
134     * @throws InterruptedException thrown if the thread was interrupted while waiting.
135     */
136    @Override
137    public LockToken getWriteLock(String resource, long wait) throws InterruptedException {
138        return acquireLock(resource, Type.WRITE, wait);
139    }
140
141    private LockToken acquireLock(final String resource, final Type type, final long wait) throws InterruptedException {
142        LOG.debug("Acquiring ZooKeeper lock. [resource={};type={};wait={}]", resource, type, wait);
143
144        InterProcessReadWriteLock lockEntry;
145        final String zkPath = LOCKS_NODE + "/" + resource;
146        LOG.debug("Checking existing Curator lock or creating new one. [zkPath={}]", zkPath);
147
148        // Creating a Curator InterProcessReadWriteLock is lightweight - only calling acquire() costs real ZooKeeper calls
149        final InterProcessReadWriteLock newLockEntry = new InterProcessReadWriteLock(zk.getClient(), zkPath);
150        final InterProcessReadWriteLock existingLockEntry = zkLocks.putIfAbsent(resource, newLockEntry);
151        if (existingLockEntry == null) {
152            lockEntry = newLockEntry;
153            LOG.debug("No existing Curator lock present, new one created successfully. [zkPath={}]", zkPath);
154        }
155        else {
156            // We can't destoy newLockEntry and we don't have to - it's taken care of by Curator and JVM GC
157            lockEntry = existingLockEntry;
158            LOG.debug("Reusing existing Curator lock. [zkPath={}]", zkPath);
159        }
160
161        ZKLockToken token = null;
162        try {
163            LOG.debug("Calling Curator to acquire ZooKeeper lock. [resource={};type={};wait={}]", resource, type, wait);
164            final InterProcessMutex lock = (type.equals(Type.READ)) ? lockEntry.readLock() : lockEntry.writeLock();
165            if (wait == -1) {
166                lock.acquire();
167                token = new ZKLockToken(lockEntry, type);
168                LOG.debug("ZooKeeper lock acquired successfully. [resource={};type={}]", resource, type);
169            }
170            else if (lock.acquire(wait, TimeUnit.MILLISECONDS)) {
171                token = new ZKLockToken(lockEntry, type);
172                LOG.debug("ZooKeeper lock acquired successfully waiting. [resource={};type={};wait={}]", resource, type, wait);
173            }
174            else {
175                LOG.warn("Could not acquire ZooKeeper lock, timed out. [resource={};type={};wait={}]", resource, type, wait);
176            }
177        }
178        catch (final Exception ex) {
179            //Not throwing exception. Should return null, so that command can be requeued
180            LOG.warn("Could not acquire lock due to a ZooKeeper error. " +
181                    "[ex={};resource={};type={};wait={}]", ex, resource, type, wait);
182            LOG.error("Error while acquiring lock", ex);
183        }
184
185        return token;
186    }
187
188    /**
189     * Implementation of {@link LockToken} for zookeeper locks.
190     */
191    class ZKLockToken implements LockToken {
192        private final InterProcessReadWriteLock lockEntry;
193        private final Type type;
194
195        private ZKLockToken(InterProcessReadWriteLock lockEntry, Type type) {
196            this.lockEntry = lockEntry;
197            this.type = type;
198        }
199
200        /**
201         * Release the lock.
202         */
203        @Override
204        public void release() {
205            try {
206                switch (type) {
207                    case WRITE:
208                        lockEntry.writeLock().release();
209                        break;
210                    case READ:
211                        lockEntry.readLock().release();
212                        break;
213                }
214            }
215            catch (Exception ex) {
216                LOG.warn("Could not release lock: " + ex.getMessage(), ex);
217            }
218        }
219    }
220
221    @VisibleForTesting
222    public ConcurrentMap<String, InterProcessReadWriteLock> getLocks(){
223        return zkLocks;
224    }
225
226    private static ScheduledExecutorService getExecutorService() {
227        return ThreadUtils.newFixedThreadScheduledPool(ConfigurationService.getInt(REAPING_THREADS),
228                "ZKLocksChildReaper");
229    }
230
231}