001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.service; 020 021import org.apache.oozie.util.Instrumentable; 022import org.apache.oozie.util.Instrumentation; 023import org.apache.oozie.lock.LockToken; 024import org.apache.oozie.lock.MemoryLocks; 025 026import com.google.common.annotations.VisibleForTesting; 027import org.apache.oozie.util.XLog; 028 029/** 030 * Service that provides in-memory locks. Assumes no other Oozie servers are using the database. 031 */ 032public class MemoryLocksService implements Service, Instrumentable { 033 034 private static final XLog LOG = XLog.getLog(MemoryLocksService.class); 035 036 public static enum Type { 037 READ, WRITE 038 } 039 040 protected static final String INSTRUMENTATION_GROUP = "locks"; 041 private MemoryLocks locks; 042 043 /** 044 * Initialize the memory locks service 045 * 046 * @param services services instance. 047 */ 048 @Override 049 public void init(Services services) throws ServiceException { 050 locks = new MemoryLocks(); 051 } 052 053 /** 054 * Destroy the memory locks service. 055 */ 056 @Override 057 public void destroy() { 058 locks = null; 059 } 060 061 /** 062 * Return the public interface for the memory locks services 063 * 064 * @return {@link MemoryLocksService}. 065 */ 066 @Override 067 public Class<? extends Service> getInterface() { 068 return MemoryLocksService.class; 069 } 070 071 /** 072 * Instruments the memory locks service. 073 * 074 * @param instr instance to instrument the memory locks service to. 075 */ 076 public void instrument(Instrumentation instr) { 077 final MemoryLocks finalLocks = this.locks; 078 instr.addVariable(INSTRUMENTATION_GROUP, "locks", new Instrumentation.Variable<Long>() { 079 public Long getValue() { 080 return (long) finalLocks.size(); 081 } 082 }); 083 } 084 085 /** 086 * Obtain a READ lock for a source. 087 * 088 * @param resource resource name. 089 * @param wait time out in milliseconds to wait for the lock, -1 means no timeout and 0 no wait. 090 * @return the lock token for the resource, or <code>null</code> if the lock could not be obtained. 091 * @throws InterruptedException thrown if the thread was interrupted while waiting. 092 */ 093 public LockToken getReadLock(final String resource, final long wait) throws InterruptedException { 094 LOG.trace("Acquiring in-memory read lock. [resource={0};wait={1}]", resource, wait); 095 return locks.getLock(resource, Type.READ, wait); 096 } 097 098 /** 099 * Obtain a WRITE lock for a source. 100 * 101 * @param resource resource name. 102 * @param wait time out in milliseconds to wait for the lock, -1 means no timeout and 0 no wait. 103 * @return the lock token for the resource, or <code>null</code> if the lock could not be obtained. 104 * @throws InterruptedException thrown if the thread was interrupted while waiting. 105 */ 106 public LockToken getWriteLock(final String resource, final long wait) throws InterruptedException { 107 LOG.trace("Acquiring in-memory write lock. [resource={0};wait={1}]", resource, wait); 108 return locks.getLock(resource, Type.WRITE, wait); 109 } 110 111 @VisibleForTesting 112 public MemoryLocks getMemoryLocks() { 113 return locks; 114 } 115}