001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.service; 019 020import java.util.concurrent.ConcurrentMap; 021import java.util.concurrent.TimeUnit; 022 023import org.apache.curator.framework.recipes.locks.InterProcessMutex; 024import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock; 025import org.apache.oozie.ErrorCode; 026import org.apache.oozie.util.Instrumentable; 027import org.apache.oozie.util.Instrumentation; 028import org.apache.oozie.event.listener.ZKConnectionListener; 029import org.apache.oozie.lock.LockToken; 030import org.apache.oozie.util.XLog; 031import org.apache.oozie.util.ZKUtils; 032 033import java.io.IOException; 034import java.util.concurrent.ScheduledExecutorService; 035 036import org.apache.curator.framework.recipes.locks.ChildReaper; 037import org.apache.curator.framework.recipes.locks.Reaper; 038import org.apache.curator.framework.state.ConnectionState; 039import org.apache.curator.utils.ThreadUtils; 040 041import com.google.common.annotations.VisibleForTesting; 042import com.google.common.collect.MapMaker; 043 044/** 045 * Service that provides distributed locks via ZooKeeper. Requires that a ZooKeeper ensemble is available. The locks will be 046 * located under a ZNode named "locks" under the namespace (see {@link ZKUtils}). For example, with default settings, if the 047 * resource we're locking is called "foo", then the ZNode backing the lock will be at /oozie/locks/foo. 048 */ 049public class ZKLocksService extends MemoryLocksService implements Service, Instrumentable { 050 051 private ZKUtils zk; 052 private static XLog LOG = XLog.getLog(ZKLocksService.class); 053 public static final String LOCKS_NODE = "/locks"; 054 055 private ConcurrentMap<String, InterProcessReadWriteLock> zkLocks = new MapMaker().weakValues().makeMap(); 056 057 058 private static final String REAPING_LEADER_PATH = ZKUtils.ZK_BASE_SERVICES_PATH + "/locksChildReaperLeaderPath"; 059 public static final String REAPING_THRESHOLD = CONF_PREFIX + "ZKLocksService.locks.reaper.threshold"; 060 public static final String REAPING_THREADS = CONF_PREFIX + "ZKLocksService.locks.reaper.threads"; 061 private ChildReaper reaper = null; 062 063 /** 064 * Initialize the zookeeper locks service 065 * 066 * @param services services instance. 067 */ 068 @Override 069 public void init(Services services) throws ServiceException { 070 super.init(services); 071 try { 072 zk = ZKUtils.register(this); 073 reaper = new ChildReaper(zk.getClient(), LOCKS_NODE, Reaper.Mode.REAP_UNTIL_GONE, getExecutorService(), 074 ConfigurationService.getInt(services.getConf(), REAPING_THRESHOLD) * 1000, REAPING_LEADER_PATH); 075 reaper.start(); 076 } 077 catch (Exception ex) { 078 throw new ServiceException(ErrorCode.E1700, ex.getMessage(), ex); 079 } 080 } 081 082 /** 083 * Destroy the zookeeper locks service. 084 */ 085 @Override 086 public void destroy() { 087 if (reaper != null) { 088 try { 089 reaper.close(); 090 } 091 catch (IOException e) { 092 LOG.error("Error closing childReaper", e); 093 } 094 } 095 if (zk != null) { 096 zk.unregister(this); 097 } 098 zk = null; 099 super.destroy(); 100 } 101 102 /** 103 * Instruments the zookeeper locks service. 104 * 105 * @param instr instance to instrument the memory locks service to. 106 */ 107 @Override 108 public void instrument(Instrumentation instr) { 109 // Similar to MemoryLocksService's instrumentation, though this is only the number of locks this Oozie server currently has 110 instr.addVariable(INSTRUMENTATION_GROUP, "locks", new Instrumentation.Variable<Integer>() { 111 @Override 112 public Integer getValue() { 113 return zkLocks.size(); 114 } 115 }); 116 } 117 118 /** 119 * Obtain a READ lock for a source. 120 * 121 * @param resource resource name. 122 * @param wait time out in milliseconds to wait for the lock, -1 means no timeout and 0 no wait. 123 * @return the lock token for the resource, or <code>null</code> if the lock could not be obtained. 124 * @throws InterruptedException thrown if the thread was interrupted while waiting. 125 */ 126 @Override 127 public LockToken getReadLock(String resource, long wait) throws InterruptedException { 128 return acquireLock(resource, Type.READ, wait); 129 } 130 131 /** 132 * Obtain a WRITE lock for a source. 133 * 134 * @param resource resource name. 135 * @param wait time out in milliseconds to wait for the lock, -1 means no timeout and 0 no wait. 136 * @return the lock token for the resource, or <code>null</code> if the lock could not be obtained. 137 * @throws InterruptedException thrown if the thread was interrupted while waiting. 138 */ 139 @Override 140 public LockToken getWriteLock(String resource, long wait) throws InterruptedException { 141 return acquireLock(resource, Type.WRITE, wait); 142 } 143 144 private LockToken acquireLock(final String resource, Type type, long wait) throws InterruptedException { 145 InterProcessReadWriteLock lockEntry = zkLocks.get(resource); 146 if (lockEntry == null) { 147 InterProcessReadWriteLock newLock = new InterProcessReadWriteLock(zk.getClient(), LOCKS_NODE + "/" + resource); 148 lockEntry = zkLocks.putIfAbsent(resource, newLock); 149 if (lockEntry == null) { 150 lockEntry = newLock; 151 } 152 } 153 InterProcessMutex lock = (type.equals(Type.READ)) ? lockEntry.readLock() : lockEntry.writeLock(); 154 ZKLockToken token = null; 155 try { 156 if (wait == -1) { 157 lock.acquire(); 158 token = new ZKLockToken(lockEntry, type); 159 } 160 else if (lock.acquire(wait, TimeUnit.MILLISECONDS)) { 161 token = new ZKLockToken(lockEntry, type); 162 } 163 } 164 catch (Exception ex) { 165 //Not throwing exception. Should return null, so that command can be requeued 166 LOG.error("Error while acquiring lock", ex); 167 } 168 return token; 169 } 170 171 /** 172 * Implementation of {@link LockToken} for zookeeper locks. 173 */ 174 class ZKLockToken implements LockToken { 175 private final InterProcessReadWriteLock lockEntry; 176 private final Type type; 177 178 private ZKLockToken(InterProcessReadWriteLock lockEntry, Type type) { 179 this.lockEntry = lockEntry; 180 this.type = type; 181 } 182 183 /** 184 * Release the lock. 185 */ 186 @Override 187 public void release() { 188 try { 189 switch (type) { 190 case WRITE: 191 lockEntry.writeLock().release(); 192 break; 193 case READ: 194 lockEntry.readLock().release(); 195 break; 196 } 197 } 198 catch (Exception ex) { 199 LOG.warn("Could not release lock: " + ex.getMessage(), ex); 200 } 201 } 202 } 203 204 @VisibleForTesting 205 public ConcurrentMap<String, InterProcessReadWriteLock> getLocks(){ 206 return zkLocks; 207 } 208 209 private static ScheduledExecutorService getExecutorService() { 210 return ThreadUtils.newFixedThreadScheduledPool(ConfigurationService.getInt(REAPING_THREADS), 211 "ZKLocksChildReaper"); 212 } 213 214}