001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.service; 020 021import java.text.SimpleDateFormat; 022import java.util.Date; 023import java.util.concurrent.TimeUnit; 024 025import org.apache.curator.RetryPolicy; 026import org.apache.curator.framework.recipes.atomic.AtomicValue; 027import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong; 028import org.apache.curator.framework.recipes.atomic.PromotedToLock; 029import org.apache.curator.retry.RetryNTimes; 030import org.apache.oozie.ErrorCode; 031import org.apache.oozie.lock.LockToken; 032import org.apache.oozie.util.XLog; 033import org.apache.oozie.util.ZKUtils; 034 035import com.google.common.annotations.VisibleForTesting; 036 037/** 038 * Service that provides distributed job id sequence via ZooKeeper. Requires that a ZooKeeper ensemble is available. The 039 * sequence path will be located under a ZNode named "job_id_sequence" under the namespace (see {@link ZKUtils}). The 040 * sequence will be reset to 0, once max is reached. 041 */ 042 043public class ZKUUIDService extends UUIDService { 044 045 public static final String CONF_PREFIX = Service.CONF_PREFIX + "ZKUUIDService."; 046 047 public static final String CONF_SEQUENCE_MAX = CONF_PREFIX + "jobid.sequence.max"; 048 public static final String LOCKS_NODE = "/SEQUENCE_LOCK"; 049 050 public static final String ZK_SEQUENCE_PATH = "/job_id_sequence"; 051 052 public static final long RESET_VALUE = 0L; 053 public static final int RETRY_COUNT = 3; 054 055 private final static XLog LOG = XLog.getLog(ZKUUIDService.class); 056 057 private ZKUtils zk; 058 private static Long maxSequence = 9999990L; 059 060 DistributedAtomicLong atomicIdGenerator; 061 062 public static final ThreadLocal<SimpleDateFormat> dt = new ThreadLocal<SimpleDateFormat>() { 063 @Override 064 protected SimpleDateFormat initialValue() { 065 return new SimpleDateFormat("yyMMddHHmmssSSS"); 066 } 067 }; 068 069 070 @Override 071 public void init(Services services) throws ServiceException { 072 073 super.init(services); 074 try { 075 zk = ZKUtils.register(this); 076 PromotedToLock.Builder lockBuilder = PromotedToLock.builder().lockPath(getPromotedLock()) 077 .retryPolicy(getRetryPolicy()).timeout(Service.lockTimeout, TimeUnit.MILLISECONDS); 078 atomicIdGenerator = new DistributedAtomicLong(zk.getClient(), ZK_SEQUENCE_PATH, getRetryPolicy(), 079 lockBuilder.build()); 080 081 } 082 catch (Exception ex) { 083 throw new ServiceException(ErrorCode.E1700, ex.getMessage(), ex); 084 } 085 086 } 087 088 /** 089 * Gets the unique id. 090 * 091 * @return the id 092 */ 093 @Override 094 protected String createSequence() { 095 String localStartTime = super.startTime; 096 long id = 0L; 097 try { 098 id = getZKSequence(); 099 } 100 catch (Exception e) { 101 LOG.error("Error getting jobId, switching to old UUIDService", e); 102 id = super.getCounter(); 103 localStartTime = dt.get().format(new Date()); 104 } 105 return appendTimeToSequence(id, localStartTime); 106 } 107 108 protected synchronized long getZKSequence() throws Exception { 109 long id = getDistributedSequence(); 110 111 if (id >= maxSequence) { 112 resetSequence(); 113 id = getDistributedSequence(); 114 } 115 return id; 116 } 117 118 @SuppressWarnings("finally") 119 private long getDistributedSequence() throws Exception { 120 if (atomicIdGenerator == null) { 121 throw new Exception("Sequence generator can't be null. Path : " + ZK_SEQUENCE_PATH); 122 } 123 AtomicValue<Long> value = null; 124 try { 125 value = atomicIdGenerator.increment(); 126 } 127 catch (Exception e) { 128 throw new Exception("Exception incrementing UID for session ", e); 129 } 130 finally { 131 if (value != null && value.succeeded()) { 132 return value.preValue(); 133 } 134 else { 135 throw new Exception("Exception incrementing UID for session "); 136 } 137 } 138 } 139 140 /** 141 * Once sequence is reached limit, reset to 0. 142 * 143 * @throws Exception 144 */ 145 private void resetSequence() throws Exception { 146 for (int i = 0; i < RETRY_COUNT; i++) { 147 AtomicValue<Long> value = atomicIdGenerator.get(); 148 if (value.succeeded()) { 149 if (value.postValue() < maxSequence) { 150 return; 151 } 152 } 153 // Acquire ZK lock, so that other host doesn't reset sequence. 154 LockToken lock = null; 155 try { 156 lock = Services.get().get(MemoryLocksService.class) 157 .getWriteLock(ZKUUIDService.class.getName(), lockTimeout); 158 } 159 catch (InterruptedException e1) { 160 //ignore 161 } 162 try { 163 if (lock == null) { 164 LOG.info("Lock is held by other system, will sleep and try again"); 165 Thread.sleep(1000); 166 continue; 167 } 168 else { 169 value = atomicIdGenerator.get(); 170 if (value.succeeded()) { 171 if (value.postValue() < maxSequence) { 172 return; 173 } 174 } 175 try { 176 atomicIdGenerator.forceSet(RESET_VALUE); 177 } 178 catch (Exception e) { 179 LOG.info("Exception while resetting sequence, will try again"); 180 continue; 181 } 182 resetStartTime(); 183 return; 184 } 185 } 186 finally { 187 if (lock != null) { 188 lock.release(); 189 } 190 } 191 } 192 throw new Exception("Can't reset ID sequence in ZK. Retried " + RETRY_COUNT + " times"); 193 } 194 195 @Override 196 public void destroy() { 197 if (zk != null) { 198 zk.unregister(this); 199 } 200 zk = null; 201 super.destroy(); 202 } 203 204 public String getPromotedLock() { 205 if (ZKUtils.getZKNameSpace().startsWith("/")) { 206 return ZKUtils.getZKNameSpace() + LOCKS_NODE; 207 208 } 209 else { 210 return "/" + ZKUtils.getZKNameSpace() + LOCKS_NODE; 211 } 212 } 213 214 @VisibleForTesting 215 static void setMaxSequence(long sequence) { 216 maxSequence = sequence; 217 } 218 219 /** 220 * Retries 25 times with delay of 200ms 221 * 222 * @return RetryNTimes 223 */ 224 private static RetryPolicy getRetryPolicy() { 225 return new RetryNTimes(25, 200); 226 } 227 228}