001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.service;
020
021import java.text.SimpleDateFormat;
022import java.util.Date;
023import java.util.concurrent.TimeUnit;
024
025import org.apache.curator.RetryPolicy;
026import org.apache.curator.framework.recipes.atomic.AtomicValue;
027import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong;
028import org.apache.curator.framework.recipes.atomic.PromotedToLock;
029import org.apache.curator.retry.RetryNTimes;
030import org.apache.oozie.ErrorCode;
031import org.apache.oozie.lock.LockToken;
032import org.apache.oozie.util.XLog;
033import org.apache.oozie.util.ZKUtils;
034
035import com.google.common.annotations.VisibleForTesting;
036
037/**
038 * Service that provides distributed job id sequence via ZooKeeper. Requires that a ZooKeeper ensemble is available. The
039 * sequence path will be located under a ZNode named "job_id_sequence" under the namespace (see {@link ZKUtils}). The
040 * sequence will be reset to 0, once max is reached.
041 */
042
043public class ZKUUIDService extends UUIDService {
044
045    public static final String CONF_PREFIX = Service.CONF_PREFIX + "ZKUUIDService.";
046
047    public static final String CONF_SEQUENCE_MAX = CONF_PREFIX + "jobid.sequence.max";
048    public static final String LOCKS_NODE = "/SEQUENCE_LOCK";
049
050    public static final String ZK_SEQUENCE_PATH = "job_id_sequence";
051
052    public static final long RESET_VALUE = 0L;
053    public static final int RETRY_COUNT = 3;
054
055    private final static XLog LOG = XLog.getLog(ZKUUIDService.class);
056
057    private ZKUtils zk;
058    private static Long maxSequence = 9999990L;
059
060    DistributedAtomicLong atomicIdGenerator;
061
062    public static final ThreadLocal<SimpleDateFormat> dt = new ThreadLocal<SimpleDateFormat>() {
063        @Override
064        protected SimpleDateFormat initialValue() {
065            return new SimpleDateFormat("yyMMddHHmmssSSS");
066        }
067    };
068
069
070    @Override
071    public void init(Services services) throws ServiceException {
072
073        super.init(services);
074        try {
075            zk = ZKUtils.register(this);
076            PromotedToLock.Builder lockBuilder = PromotedToLock.builder().lockPath(getPromotedLock())
077                    .retryPolicy(getRetryPolicy()).timeout(Service.lockTimeout, TimeUnit.MILLISECONDS);
078            atomicIdGenerator = new DistributedAtomicLong(zk.getClient(), ZK_SEQUENCE_PATH, getRetryPolicy(),
079                    lockBuilder.build());
080
081        }
082        catch (Exception ex) {
083            throw new ServiceException(ErrorCode.E1700, ex.getMessage(), ex);
084        }
085
086    }
087
088    /**
089     * Gets the unique id.
090     *
091     * @return the id
092     * @throws Exception the exception
093     */
094    @Override
095    protected String createSequence() {
096        String localStartTime = super.startTime;
097        long id = 0L;
098        try {
099            id = getZKSequence();
100        }
101        catch (Exception e) {
102            LOG.error("Error getting jobId, switching to old UUIDService", e);
103            id = super.getCounter();
104            localStartTime = dt.get().format(new Date());
105        }
106        return appendTimeToSequence(id, localStartTime);
107    }
108
109    protected synchronized long getZKSequence() throws Exception {
110        long id = getDistributedSequence();
111
112        if (id >= maxSequence) {
113            resetSequence();
114            id = getDistributedSequence();
115        }
116        return id;
117    }
118
119    @SuppressWarnings("finally")
120    private long getDistributedSequence() throws Exception {
121        if (atomicIdGenerator == null) {
122            throw new Exception("Sequence generator can't be null. Path : " + ZK_SEQUENCE_PATH);
123        }
124        AtomicValue<Long> value = null;
125        try {
126            value = atomicIdGenerator.increment();
127        }
128        catch (Exception e) {
129            throw new Exception("Exception incrementing UID for session ", e);
130        }
131        finally {
132            if (value != null && value.succeeded()) {
133                return value.preValue();
134            }
135            else {
136                throw new Exception("Exception incrementing UID for session ");
137            }
138        }
139    }
140
141    /**
142     * Once sequence is reached limit, reset to 0.
143     *
144     * @throws Exception
145     */
146    private  void resetSequence() throws Exception {
147        for (int i = 0; i < RETRY_COUNT; i++) {
148            AtomicValue<Long> value = atomicIdGenerator.get();
149            if (value.succeeded()) {
150                if (value.postValue() < maxSequence) {
151                    return;
152                }
153            }
154            // Acquire ZK lock, so that other host doesn't reset sequence.
155            LockToken lock = null;
156            try {
157                lock = Services.get().get(MemoryLocksService.class)
158                        .getWriteLock(ZKUUIDService.class.getName(), lockTimeout);
159            }
160            catch (InterruptedException e1) {
161                //ignore
162            }
163            try {
164                if (lock == null) {
165                    LOG.info("Lock is held by other system, will sleep and try again");
166                    Thread.sleep(1000);
167                    continue;
168                }
169                else {
170                    value = atomicIdGenerator.get();
171                    if (value.succeeded()) {
172                        if (value.postValue() < maxSequence) {
173                            return;
174                        }
175                    }
176                    try {
177                        atomicIdGenerator.forceSet(RESET_VALUE);
178                    }
179                    catch (Exception e) {
180                        LOG.info("Exception while resetting sequence, will try again");
181                        continue;
182                    }
183                    resetStartTime();
184                    return;
185                }
186            }
187            finally {
188                if (lock != null) {
189                    lock.release();
190                }
191            }
192        }
193        throw new Exception("Can't reset ID sequence in ZK. Retried " + RETRY_COUNT + " times");
194    }
195
196    @Override
197    public void destroy() {
198        if (zk != null) {
199            zk.unregister(this);
200        }
201        zk = null;
202        super.destroy();
203    }
204
205    public String getPromotedLock() {
206        if (ZKUtils.getZKNameSpace().startsWith("/")) {
207            return ZKUtils.getZKNameSpace() + LOCKS_NODE;
208
209        }
210        else {
211            return "/" + ZKUtils.getZKNameSpace() + LOCKS_NODE;
212        }
213    }
214
215    @VisibleForTesting
216    public void setMaxSequence(long sequence) {
217        maxSequence = sequence;
218    }
219
220    /**
221     * Retries 25 times with delay of 200ms
222     *
223     * @return RetryNTimes
224     */
225    private static RetryPolicy getRetryPolicy() {
226        return new RetryNTimes(25, 200);
227    }
228
229}