001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.service; 019 020import java.util.HashMap; 021import java.util.concurrent.TimeUnit; 022import org.apache.curator.framework.recipes.locks.InterProcessMutex; 023import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock; 024import org.apache.oozie.ErrorCode; 025import org.apache.oozie.util.Instrumentable; 026import org.apache.oozie.util.Instrumentation; 027import org.apache.oozie.lock.LockToken; 028import org.apache.oozie.util.XLog; 029import org.apache.oozie.util.ZKUtils; 030import java.io.IOException; 031import java.util.concurrent.ScheduledExecutorService; 032import org.apache.curator.framework.recipes.locks.ChildReaper; 033import org.apache.curator.framework.recipes.locks.Reaper; 034import org.apache.curator.utils.ThreadUtils; 035import com.google.common.annotations.VisibleForTesting; 036 037/** 038 * Service that provides distributed locks via ZooKeeper. Requires that a ZooKeeper ensemble is available. The locks will be 039 * located under a ZNode named "locks" under the namespace (see {@link ZKUtils}). For example, with default settings, if the 040 * resource we're locking is called "foo", then the ZNode backing the lock will be at /oozie/locks/foo. 041 */ 042public class ZKLocksService extends MemoryLocksService implements Service, Instrumentable { 043 044 private ZKUtils zk; 045 private static XLog LOG = XLog.getLog(ZKLocksService.class); 046 public static final String LOCKS_NODE = "/locks"; 047 048 final private HashMap<String, InterProcessReadWriteLock> zkLocks = new HashMap<String, InterProcessReadWriteLock>(); 049 050 private static final String REAPING_LEADER_PATH = ZKUtils.ZK_BASE_SERVICES_PATH + "/locksChildReaperLeaderPath"; 051 public static final int DEFAULT_REAPING_THRESHOLD = 300; // In sec 052 public static final String REAPING_THRESHOLD = CONF_PREFIX + "ZKLocksService.locks.reaper.threshold"; 053 public static final String REAPING_THREADS = CONF_PREFIX + "ZKLocksService.locks.reaper.threads"; 054 private ChildReaper reaper = null; 055 056 /** 057 * Initialize the zookeeper locks service 058 * 059 * @param services services instance. 060 */ 061 @Override 062 public void init(Services services) throws ServiceException { 063 super.init(services); 064 try { 065 zk = ZKUtils.register(this); 066 reaper = new ChildReaper(zk.getClient(), LOCKS_NODE, Reaper.Mode.REAP_INDEFINITELY, getExecutorService(), 067 services.getConf().getInt(REAPING_THRESHOLD, DEFAULT_REAPING_THRESHOLD) * 1000, REAPING_LEADER_PATH); 068 reaper.start(); 069 } 070 catch (Exception ex) { 071 throw new ServiceException(ErrorCode.E1700, ex.getMessage(), ex); 072 } 073 } 074 075 /** 076 * Destroy the zookeeper locks service. 077 */ 078 @Override 079 public void destroy() { 080 if (reaper != null) { 081 try { 082 reaper.close(); 083 } 084 catch (IOException e) { 085 LOG.error("Error closing childReaper", e); 086 } 087 } 088 089 if (zk != null) { 090 zk.unregister(this); 091 } 092 zk = null; 093 super.destroy(); 094 } 095 096 /** 097 * Instruments the zookeeper locks service. 098 * 099 * @param instr instance to instrument the memory locks service to. 100 */ 101 @Override 102 public void instrument(Instrumentation instr) { 103 // Similar to MemoryLocksService's instrumentation, though this is only the number of locks this Oozie server currently has 104 instr.addVariable(INSTRUMENTATION_GROUP, "locks", new Instrumentation.Variable<Integer>() { 105 @Override 106 public Integer getValue() { 107 return zkLocks.size(); 108 } 109 }); 110 } 111 112 /** 113 * Obtain a READ lock for a source. 114 * 115 * @param resource resource name. 116 * @param wait time out in milliseconds to wait for the lock, -1 means no timeout and 0 no wait. 117 * @return the lock token for the resource, or <code>null</code> if the lock could not be obtained. 118 * @throws InterruptedException thrown if the thread was interrupted while waiting. 119 */ 120 @Override 121 public LockToken getReadLock(String resource, long wait) throws InterruptedException { 122 InterProcessReadWriteLock lockEntry; 123 synchronized (zkLocks) { 124 if (zkLocks.containsKey(resource)) { 125 lockEntry = zkLocks.get(resource); 126 } 127 else { 128 lockEntry = new InterProcessReadWriteLock(zk.getClient(), LOCKS_NODE + "/" + resource); 129 zkLocks.put(resource, lockEntry); 130 } 131 } 132 InterProcessMutex readLock = lockEntry.readLock(); 133 return acquireLock(wait, readLock, resource); 134 } 135 136 /** 137 * Obtain a WRITE lock for a source. 138 * 139 * @param resource resource name. 140 * @param wait time out in milliseconds to wait for the lock, -1 means no timeout and 0 no wait. 141 * @return the lock token for the resource, or <code>null</code> if the lock could not be obtained. 142 * @throws InterruptedException thrown if the thread was interrupted while waiting. 143 */ 144 @Override 145 public LockToken getWriteLock(String resource, long wait) throws InterruptedException { 146 InterProcessReadWriteLock lockEntry; 147 synchronized (zkLocks) { 148 if (zkLocks.containsKey(resource)) { 149 lockEntry = zkLocks.get(resource); 150 } 151 else { 152 lockEntry = new InterProcessReadWriteLock(zk.getClient(), LOCKS_NODE + "/" + resource); 153 zkLocks.put(resource, lockEntry); 154 } 155 } 156 InterProcessMutex writeLock = lockEntry.writeLock(); 157 return acquireLock(wait, writeLock, resource); 158 } 159 160 private LockToken acquireLock(long wait, InterProcessMutex lock, String resource) { 161 ZKLockToken token = null; 162 try { 163 if (wait == -1) { 164 lock.acquire(); 165 token = new ZKLockToken(lock, resource); 166 } 167 else if (lock.acquire(wait, TimeUnit.MILLISECONDS)) { 168 token = new ZKLockToken(lock, resource); 169 } 170 } 171 catch (Exception ex) { 172 throw new RuntimeException(ex); 173 } 174 return token; 175 } 176 177 /** 178 * Implementation of {@link LockToken} for zookeeper locks. 179 */ 180 class ZKLockToken implements LockToken { 181 private final InterProcessMutex lock; 182 private final String resource; 183 184 private ZKLockToken(InterProcessMutex lock, String resource) { 185 this.lock = lock; 186 this.resource = resource; 187 } 188 189 /** 190 * Release the lock. 191 */ 192 @Override 193 public void release() { 194 try { 195 lock.release(); 196 int val = lock.getParticipantNodes().size(); 197 //TODO this might break, when count is zero and before we remove lock, same thread may ask for same lock. 198 // Hashmap will return the lock, but eventually release will remove it from hashmap and a immediate getlock will 199 //create a new instance. Will fix this as part of OOZIE-1922 200 if (val == 0) { 201 synchronized (zkLocks) { 202 zkLocks.remove(resource); 203 } 204 } 205 } 206 catch (Exception ex) { 207 LOG.warn("Could not release lock: " + ex.getMessage(), ex); 208 } 209 210 } 211 } 212 213 @VisibleForTesting 214 public HashMap<String, InterProcessReadWriteLock> getLocks(){ 215 return zkLocks; 216 } 217 218 private static ScheduledExecutorService getExecutorService() { 219 return ThreadUtils.newFixedThreadScheduledPool(Services.get().getConf().getInt(REAPING_THREADS, 2), 220 "ZKLocksChildReaper"); 221 } 222 223}