001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.service; 019 020import java.util.HashMap; 021import java.util.concurrent.TimeUnit; 022 023import org.apache.curator.framework.recipes.locks.InterProcessMutex; 024import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock; 025import org.apache.oozie.ErrorCode; 026import org.apache.oozie.util.Instrumentable; 027import org.apache.oozie.util.Instrumentation; 028import org.apache.oozie.event.listener.ZKConnectionListener; 029import org.apache.oozie.lock.LockToken; 030import org.apache.oozie.util.XLog; 031import org.apache.oozie.util.ZKUtils; 032 033import java.io.IOException; 034import java.util.concurrent.ScheduledExecutorService; 035 036import org.apache.curator.framework.recipes.locks.ChildReaper; 037import org.apache.curator.framework.recipes.locks.Reaper; 038import org.apache.curator.framework.state.ConnectionState; 039import org.apache.curator.utils.ThreadUtils; 040 041import com.google.common.annotations.VisibleForTesting; 042 043/** 044 * Service that provides distributed locks via ZooKeeper. Requires that a ZooKeeper ensemble is available. The locks will be 045 * located under a ZNode named "locks" under the namespace (see {@link ZKUtils}). For example, with default settings, if the 046 * resource we're locking is called "foo", then the ZNode backing the lock will be at /oozie/locks/foo. 047 */ 048public class ZKLocksService extends MemoryLocksService implements Service, Instrumentable { 049 050 private ZKUtils zk; 051 private static XLog LOG = XLog.getLog(ZKLocksService.class); 052 public static final String LOCKS_NODE = "/locks"; 053 054 final private HashMap<String, InterProcessReadWriteLock> zkLocks = new HashMap<String, InterProcessReadWriteLock>(); 055 056 private static final String REAPING_LEADER_PATH = ZKUtils.ZK_BASE_SERVICES_PATH + "/locksChildReaperLeaderPath"; 057 public static final String REAPING_THRESHOLD = CONF_PREFIX + "ZKLocksService.locks.reaper.threshold"; 058 public static final String REAPING_THREADS = CONF_PREFIX + "ZKLocksService.locks.reaper.threads"; 059 private ChildReaper reaper = null; 060 061 /** 062 * Initialize the zookeeper locks service 063 * 064 * @param services services instance. 065 */ 066 @Override 067 public void init(Services services) throws ServiceException { 068 super.init(services); 069 try { 070 zk = ZKUtils.register(this); 071 reaper = new ChildReaper(zk.getClient(), LOCKS_NODE, Reaper.Mode.REAP_UNTIL_GONE, getExecutorService(), 072 ConfigurationService.getInt(services.getConf(), REAPING_THRESHOLD) * 1000, REAPING_LEADER_PATH); 073 reaper.start(); 074 } 075 catch (Exception ex) { 076 throw new ServiceException(ErrorCode.E1700, ex.getMessage(), ex); 077 } 078 } 079 080 /** 081 * Destroy the zookeeper locks service. 082 */ 083 @Override 084 public void destroy() { 085 if (reaper != null && ZKConnectionListener.getZKConnectionState() != ConnectionState.LOST) { 086 try { 087 reaper.close(); 088 } 089 catch (IOException e) { 090 LOG.error("Error closing childReaper", e); 091 } 092 } 093 if (zk != null) { 094 zk.unregister(this); 095 } 096 zk = null; 097 super.destroy(); 098 } 099 100 /** 101 * Instruments the zookeeper locks service. 102 * 103 * @param instr instance to instrument the memory locks service to. 104 */ 105 @Override 106 public void instrument(Instrumentation instr) { 107 // Similar to MemoryLocksService's instrumentation, though this is only the number of locks this Oozie server currently has 108 instr.addVariable(INSTRUMENTATION_GROUP, "locks", new Instrumentation.Variable<Integer>() { 109 @Override 110 public Integer getValue() { 111 return zkLocks.size(); 112 } 113 }); 114 } 115 116 /** 117 * Obtain a READ lock for a source. 118 * 119 * @param resource resource name. 120 * @param wait time out in milliseconds to wait for the lock, -1 means no timeout and 0 no wait. 121 * @return the lock token for the resource, or <code>null</code> if the lock could not be obtained. 122 * @throws InterruptedException thrown if the thread was interrupted while waiting. 123 */ 124 @Override 125 public LockToken getReadLock(String resource, long wait) throws InterruptedException { 126 InterProcessReadWriteLock lockEntry; 127 synchronized (zkLocks) { 128 if (zkLocks.containsKey(resource)) { 129 lockEntry = zkLocks.get(resource); 130 } 131 else { 132 lockEntry = new InterProcessReadWriteLock(zk.getClient(), LOCKS_NODE + "/" + resource); 133 zkLocks.put(resource, lockEntry); 134 } 135 } 136 InterProcessMutex readLock = lockEntry.readLock(); 137 return acquireLock(wait, readLock, resource); 138 } 139 140 /** 141 * Obtain a WRITE lock for a source. 142 * 143 * @param resource resource name. 144 * @param wait time out in milliseconds to wait for the lock, -1 means no timeout and 0 no wait. 145 * @return the lock token for the resource, or <code>null</code> if the lock could not be obtained. 146 * @throws InterruptedException thrown if the thread was interrupted while waiting. 147 */ 148 @Override 149 public LockToken getWriteLock(String resource, long wait) throws InterruptedException { 150 InterProcessReadWriteLock lockEntry; 151 synchronized (zkLocks) { 152 if (zkLocks.containsKey(resource)) { 153 lockEntry = zkLocks.get(resource); 154 } 155 else { 156 lockEntry = new InterProcessReadWriteLock(zk.getClient(), LOCKS_NODE + "/" + resource); 157 zkLocks.put(resource, lockEntry); 158 } 159 } 160 InterProcessMutex writeLock = lockEntry.writeLock(); 161 return acquireLock(wait, writeLock, resource); 162 } 163 164 private LockToken acquireLock(long wait, InterProcessMutex lock, String resource) { 165 ZKLockToken token = null; 166 try { 167 if (wait == -1) { 168 lock.acquire(); 169 token = new ZKLockToken(lock, resource); 170 } 171 else if (lock.acquire(wait, TimeUnit.MILLISECONDS)) { 172 token = new ZKLockToken(lock, resource); 173 } 174 } 175 catch (Exception ex) { 176 throw new RuntimeException(ex); 177 } 178 return token; 179 } 180 181 /** 182 * Implementation of {@link LockToken} for zookeeper locks. 183 */ 184 class ZKLockToken implements LockToken { 185 private final InterProcessMutex lock; 186 private final String resource; 187 188 private ZKLockToken(InterProcessMutex lock, String resource) { 189 this.lock = lock; 190 this.resource = resource; 191 } 192 193 /** 194 * Release the lock. 195 */ 196 @Override 197 public void release() { 198 try { 199 lock.release(); 200 int val = lock.getParticipantNodes().size(); 201 //TODO this might break, when count is zero and before we remove lock, same thread may ask for same lock. 202 // Hashmap will return the lock, but eventually release will remove it from hashmap and a immediate getlock will 203 //create a new instance. Will fix this as part of OOZIE-1922 204 if (val == 0) { 205 synchronized (zkLocks) { 206 zkLocks.remove(resource); 207 } 208 } 209 } 210 catch (Exception ex) { 211 LOG.warn("Could not release lock: " + ex.getMessage(), ex); 212 } 213 214 } 215 } 216 217 @VisibleForTesting 218 public HashMap<String, InterProcessReadWriteLock> getLocks(){ 219 return zkLocks; 220 } 221 222 private static ScheduledExecutorService getExecutorService() { 223 return ThreadUtils.newFixedThreadScheduledPool(ConfigurationService.getInt(REAPING_THREADS), 224 "ZKLocksChildReaper"); 225 } 226 227}