001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.service; 020 021import java.text.SimpleDateFormat; 022import java.util.Date; 023import java.util.concurrent.TimeUnit; 024 025import org.apache.curator.RetryPolicy; 026import org.apache.curator.framework.recipes.atomic.AtomicValue; 027import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong; 028import org.apache.curator.framework.recipes.atomic.PromotedToLock; 029import org.apache.curator.retry.RetryNTimes; 030import org.apache.oozie.ErrorCode; 031import org.apache.oozie.lock.LockToken; 032import org.apache.oozie.util.XLog; 033import org.apache.oozie.util.ZKUtils; 034 035import com.google.common.annotations.VisibleForTesting; 036 037/** 038 * Service that provides distributed job id sequence via ZooKeeper. Requires that a ZooKeeper ensemble is available. The 039 * sequence path will be located under a ZNode named "job_id_sequence" under the namespace (see {@link ZKUtils}). The 040 * sequence will be reset to 0, once max is reached. 041 */ 042 043public class ZKUUIDService extends UUIDService { 044 045 public static final String CONF_PREFIX = Service.CONF_PREFIX + "ZKUUIDService."; 046 047 public static final String CONF_SEQUENCE_MAX = CONF_PREFIX + "jobid.sequence.max"; 048 public static final String LOCKS_NODE = "/SEQUENCE_LOCK"; 049 050 public static final String ZK_SEQUENCE_PATH = "/job_id_sequence"; 051 052 public static final long RESET_VALUE = 0L; 053 public static final int RETRY_COUNT = 3; 054 055 private final static XLog LOG = XLog.getLog(ZKUUIDService.class); 056 057 private ZKUtils zk; 058 private static Long maxSequence = 9999990L; 059 060 DistributedAtomicLong atomicIdGenerator; 061 062 public static final ThreadLocal<SimpleDateFormat> dt = new ThreadLocal<SimpleDateFormat>() { 063 @Override 064 protected SimpleDateFormat initialValue() { 065 return new SimpleDateFormat("yyMMddHHmmssSSS"); 066 } 067 }; 068 069 070 @Override 071 public void init(Services services) throws ServiceException { 072 073 super.init(services); 074 try { 075 zk = ZKUtils.register(this); 076 PromotedToLock.Builder lockBuilder = PromotedToLock.builder().lockPath(getPromotedLock()) 077 .retryPolicy(getRetryPolicy()).timeout(Service.lockTimeout, TimeUnit.MILLISECONDS); 078 atomicIdGenerator = new DistributedAtomicLong(zk.getClient(), ZK_SEQUENCE_PATH, getRetryPolicy(), 079 lockBuilder.build()); 080 081 } 082 catch (Exception ex) { 083 throw new ServiceException(ErrorCode.E1700, ex.getMessage(), ex); 084 } 085 086 } 087 088 /** 089 * Gets the unique id. 090 * 091 * @return the id 092 * @throws Exception the exception 093 */ 094 @Override 095 protected String createSequence() { 096 String localStartTime = super.startTime; 097 long id = 0L; 098 try { 099 id = getZKSequence(); 100 } 101 catch (Exception e) { 102 LOG.error("Error getting jobId, switching to old UUIDService", e); 103 id = super.getCounter(); 104 localStartTime = dt.get().format(new Date()); 105 } 106 return appendTimeToSequence(id, localStartTime); 107 } 108 109 protected synchronized long getZKSequence() throws Exception { 110 long id = getDistributedSequence(); 111 112 if (id >= maxSequence) { 113 resetSequence(); 114 id = getDistributedSequence(); 115 } 116 return id; 117 } 118 119 @SuppressWarnings("finally") 120 private long getDistributedSequence() throws Exception { 121 if (atomicIdGenerator == null) { 122 throw new Exception("Sequence generator can't be null. Path : " + ZK_SEQUENCE_PATH); 123 } 124 AtomicValue<Long> value = null; 125 try { 126 value = atomicIdGenerator.increment(); 127 } 128 catch (Exception e) { 129 throw new Exception("Exception incrementing UID for session ", e); 130 } 131 finally { 132 if (value != null && value.succeeded()) { 133 return value.preValue(); 134 } 135 else { 136 throw new Exception("Exception incrementing UID for session "); 137 } 138 } 139 } 140 141 /** 142 * Once sequence is reached limit, reset to 0. 143 * 144 * @throws Exception 145 */ 146 private void resetSequence() throws Exception { 147 for (int i = 0; i < RETRY_COUNT; i++) { 148 AtomicValue<Long> value = atomicIdGenerator.get(); 149 if (value.succeeded()) { 150 if (value.postValue() < maxSequence) { 151 return; 152 } 153 } 154 // Acquire ZK lock, so that other host doesn't reset sequence. 155 LockToken lock = null; 156 try { 157 lock = Services.get().get(MemoryLocksService.class) 158 .getWriteLock(ZKUUIDService.class.getName(), lockTimeout); 159 } 160 catch (InterruptedException e1) { 161 //ignore 162 } 163 try { 164 if (lock == null) { 165 LOG.info("Lock is held by other system, will sleep and try again"); 166 Thread.sleep(1000); 167 continue; 168 } 169 else { 170 value = atomicIdGenerator.get(); 171 if (value.succeeded()) { 172 if (value.postValue() < maxSequence) { 173 return; 174 } 175 } 176 try { 177 atomicIdGenerator.forceSet(RESET_VALUE); 178 } 179 catch (Exception e) { 180 LOG.info("Exception while resetting sequence, will try again"); 181 continue; 182 } 183 resetStartTime(); 184 return; 185 } 186 } 187 finally { 188 if (lock != null) { 189 lock.release(); 190 } 191 } 192 } 193 throw new Exception("Can't reset ID sequence in ZK. Retried " + RETRY_COUNT + " times"); 194 } 195 196 @Override 197 public void destroy() { 198 if (zk != null) { 199 zk.unregister(this); 200 } 201 zk = null; 202 super.destroy(); 203 } 204 205 public String getPromotedLock() { 206 if (ZKUtils.getZKNameSpace().startsWith("/")) { 207 return ZKUtils.getZKNameSpace() + LOCKS_NODE; 208 209 } 210 else { 211 return "/" + ZKUtils.getZKNameSpace() + LOCKS_NODE; 212 } 213 } 214 215 @VisibleForTesting 216 public void setMaxSequence(long sequence) { 217 maxSequence = sequence; 218 } 219 220 /** 221 * Retries 25 times with delay of 200ms 222 * 223 * @return RetryNTimes 224 */ 225 private static RetryPolicy getRetryPolicy() { 226 return new RetryNTimes(25, 200); 227 } 228 229}