001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.service; 019 020import java.util.concurrent.ConcurrentMap; 021import java.util.concurrent.TimeUnit; 022 023import org.apache.curator.framework.recipes.locks.InterProcessMutex; 024import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock; 025import org.apache.oozie.ErrorCode; 026import org.apache.oozie.util.Instrumentable; 027import org.apache.oozie.util.Instrumentation; 028import org.apache.oozie.lock.LockToken; 029import org.apache.oozie.util.XLog; 030import org.apache.oozie.util.ZKUtils; 031 032import java.io.IOException; 033import java.util.concurrent.ScheduledExecutorService; 034 035import org.apache.curator.framework.recipes.locks.ChildReaper; 036import org.apache.curator.framework.recipes.locks.Reaper; 037import org.apache.curator.utils.ThreadUtils; 038 039import com.google.common.annotations.VisibleForTesting; 040import com.google.common.collect.MapMaker; 041 042/** 043 * Service that provides distributed locks via ZooKeeper. Requires that a ZooKeeper ensemble is available. The locks will be 044 * located under a ZNode named "locks" under the namespace (see {@link ZKUtils}). For example, with default settings, if the 045 * resource we're locking is called "foo", then the ZNode backing the lock will be at /oozie/locks/foo. 046 */ 047public class ZKLocksService extends MemoryLocksService implements Service, Instrumentable { 048 049 private ZKUtils zk; 050 public static final String LOCKS_NODE = "/locks"; 051 052 private static final XLog LOG = XLog.getLog(ZKLocksService.class); 053 private final ConcurrentMap<String, InterProcessReadWriteLock> zkLocks = new MapMaker().weakValues().makeMap(); 054 private ChildReaper reaper = null; 055 056 private static final String REAPING_LEADER_PATH = ZKUtils.ZK_BASE_SERVICES_PATH + "/locksChildReaperLeaderPath"; 057 static final String REAPING_THRESHOLD = CONF_PREFIX + "ZKLocksService.locks.reaper.threshold"; 058 static final String REAPING_THREADS = CONF_PREFIX + "ZKLocksService.locks.reaper.threads"; 059 060 /** 061 * Initialize the zookeeper locks service 062 * 063 * @param services services instance. 064 */ 065 @Override 066 public void init(Services services) throws ServiceException { 067 super.init(services); 068 try { 069 zk = ZKUtils.register(this); 070 reaper = new ChildReaper(zk.getClient(), LOCKS_NODE, Reaper.Mode.REAP_UNTIL_GONE, getExecutorService(), 071 ConfigurationService.getInt(services.getConf(), REAPING_THRESHOLD) * 1000, REAPING_LEADER_PATH); 072 reaper.start(); 073 } 074 catch (Exception ex) { 075 throw new ServiceException(ErrorCode.E1700, ex.getMessage(), ex); 076 } 077 } 078 079 /** 080 * Destroy the zookeeper locks service. 081 */ 082 @Override 083 public void destroy() { 084 if (reaper != null) { 085 try { 086 reaper.close(); 087 } 088 catch (IOException e) { 089 LOG.error("Error closing childReaper", e); 090 } 091 } 092 if (zk != null) { 093 zk.unregister(this); 094 } 095 zk = null; 096 super.destroy(); 097 } 098 099 /** 100 * Instruments the zookeeper locks service. 101 * 102 * @param instr instance to instrument the memory locks service to. 103 */ 104 @Override 105 public void instrument(Instrumentation instr) { 106 // Similar to MemoryLocksService's instrumentation, though this is only the number of locks this Oozie server currently has 107 instr.addVariable(INSTRUMENTATION_GROUP, "locks", new Instrumentation.Variable<Integer>() { 108 @Override 109 public Integer getValue() { 110 return zkLocks.size(); 111 } 112 }); 113 } 114 115 /** 116 * Obtain a READ lock for a source. 117 * 118 * @param resource resource name. 119 * @param wait time out in milliseconds to wait for the lock, -1 means no timeout and 0 no wait. 120 * @return the lock token for the resource, or <code>null</code> if the lock could not be obtained. 121 * @throws InterruptedException thrown if the thread was interrupted while waiting. 122 */ 123 @Override 124 public LockToken getReadLock(String resource, long wait) throws InterruptedException { 125 return acquireLock(resource, Type.READ, wait); 126 } 127 128 /** 129 * Obtain a WRITE lock for a source. 130 * 131 * @param resource resource name. 132 * @param wait time out in milliseconds to wait for the lock, -1 means no timeout and 0 no wait. 133 * @return the lock token for the resource, or <code>null</code> if the lock could not be obtained. 134 * @throws InterruptedException thrown if the thread was interrupted while waiting. 135 */ 136 @Override 137 public LockToken getWriteLock(String resource, long wait) throws InterruptedException { 138 return acquireLock(resource, Type.WRITE, wait); 139 } 140 141 private LockToken acquireLock(final String resource, final Type type, final long wait) throws InterruptedException { 142 LOG.debug("Acquiring ZooKeeper lock. [resource={};type={};wait={}]", resource, type, wait); 143 144 InterProcessReadWriteLock lockEntry; 145 final String zkPath = LOCKS_NODE + "/" + resource; 146 LOG.debug("Checking existing Curator lock or creating new one. [zkPath={}]", zkPath); 147 148 // Creating a Curator InterProcessReadWriteLock is lightweight - only calling acquire() costs real ZooKeeper calls 149 final InterProcessReadWriteLock newLockEntry = new InterProcessReadWriteLock(zk.getClient(), zkPath); 150 final InterProcessReadWriteLock existingLockEntry = zkLocks.putIfAbsent(resource, newLockEntry); 151 if (existingLockEntry == null) { 152 lockEntry = newLockEntry; 153 LOG.debug("No existing Curator lock present, new one created successfully. [zkPath={}]", zkPath); 154 } 155 else { 156 // We can't destoy newLockEntry and we don't have to - it's taken care of by Curator and JVM GC 157 lockEntry = existingLockEntry; 158 LOG.debug("Reusing existing Curator lock. [zkPath={}]", zkPath); 159 } 160 161 ZKLockToken token = null; 162 try { 163 LOG.debug("Calling Curator to acquire ZooKeeper lock. [resource={};type={};wait={}]", resource, type, wait); 164 final InterProcessMutex lock = (type.equals(Type.READ)) ? lockEntry.readLock() : lockEntry.writeLock(); 165 if (wait == -1) { 166 lock.acquire(); 167 token = new ZKLockToken(lockEntry, type); 168 LOG.debug("ZooKeeper lock acquired successfully. [resource={};type={}]", resource, type); 169 } 170 else if (lock.acquire(wait, TimeUnit.MILLISECONDS)) { 171 token = new ZKLockToken(lockEntry, type); 172 LOG.debug("ZooKeeper lock acquired successfully waiting. [resource={};type={};wait={}]", resource, type, wait); 173 } 174 else { 175 LOG.warn("Could not acquire ZooKeeper lock, timed out. [resource={};type={};wait={}]", resource, type, wait); 176 } 177 } 178 catch (final Exception ex) { 179 //Not throwing exception. Should return null, so that command can be requeued 180 LOG.warn("Could not acquire lock due to a ZooKeeper error. " + 181 "[ex={};resource={};type={};wait={}]", ex, resource, type, wait); 182 LOG.error("Error while acquiring lock", ex); 183 } 184 185 return token; 186 } 187 188 /** 189 * Implementation of {@link LockToken} for zookeeper locks. 190 */ 191 class ZKLockToken implements LockToken { 192 private final InterProcessReadWriteLock lockEntry; 193 private final Type type; 194 195 private ZKLockToken(InterProcessReadWriteLock lockEntry, Type type) { 196 this.lockEntry = lockEntry; 197 this.type = type; 198 } 199 200 /** 201 * Release the lock. 202 */ 203 @Override 204 public void release() { 205 try { 206 switch (type) { 207 case WRITE: 208 lockEntry.writeLock().release(); 209 break; 210 case READ: 211 lockEntry.readLock().release(); 212 break; 213 } 214 } 215 catch (Exception ex) { 216 LOG.warn("Could not release lock: " + ex.getMessage(), ex); 217 } 218 } 219 } 220 221 @VisibleForTesting 222 public ConcurrentMap<String, InterProcessReadWriteLock> getLocks(){ 223 return zkLocks; 224 } 225 226 private static ScheduledExecutorService getExecutorService() { 227 return ThreadUtils.newFixedThreadScheduledPool(ConfigurationService.getInt(REAPING_THREADS), 228 "ZKLocksChildReaper"); 229 } 230 231}