001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.service;
020
021import java.text.SimpleDateFormat;
022import java.util.Date;
023import java.util.concurrent.TimeUnit;
024
025import org.apache.curator.RetryPolicy;
026import org.apache.curator.framework.recipes.atomic.AtomicValue;
027import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong;
028import org.apache.curator.framework.recipes.atomic.PromotedToLock;
029import org.apache.curator.retry.RetryNTimes;
030import org.apache.oozie.ErrorCode;
031import org.apache.oozie.lock.LockToken;
032import org.apache.oozie.util.XLog;
033import org.apache.oozie.util.ZKUtils;
034
035import com.google.common.annotations.VisibleForTesting;
036
037/**
038 * Service that provides distributed job id sequence via ZooKeeper. Requires that a ZooKeeper ensemble is available. The
039 * sequence path will be located under a ZNode named "job_id_sequence" under the namespace (see {@link ZKUtils}). The
040 * sequence will be reset to 0, once max is reached.
041 */
042
043public class ZKUUIDService extends UUIDService {
044
045    public static final String CONF_PREFIX = Service.CONF_PREFIX + "ZKUUIDService.";
046
047    public static final String CONF_SEQUENCE_MAX = CONF_PREFIX + "jobid.sequence.max";
048    public static final String LOCKS_NODE = "/SEQUENCE_LOCK";
049
050    public static final String ZK_SEQUENCE_PATH = "/job_id_sequence";
051
052    public static final long RESET_VALUE = 0L;
053    public static final int RETRY_COUNT = 3;
054
055    private final static XLog LOG = XLog.getLog(ZKUUIDService.class);
056
057    private ZKUtils zk;
058    private static Long maxSequence = 9999990L;
059
060    DistributedAtomicLong atomicIdGenerator;
061
062    public static final ThreadLocal<SimpleDateFormat> dt = new ThreadLocal<SimpleDateFormat>() {
063        @Override
064        protected SimpleDateFormat initialValue() {
065            return new SimpleDateFormat("yyMMddHHmmssSSS");
066        }
067    };
068
069
070    @Override
071    public void init(Services services) throws ServiceException {
072
073        super.init(services);
074        try {
075            zk = ZKUtils.register(this);
076            PromotedToLock.Builder lockBuilder = PromotedToLock.builder().lockPath(getPromotedLock())
077                    .retryPolicy(getRetryPolicy()).timeout(Service.lockTimeout, TimeUnit.MILLISECONDS);
078            atomicIdGenerator = new DistributedAtomicLong(zk.getClient(), ZK_SEQUENCE_PATH, getRetryPolicy(),
079                    lockBuilder.build());
080
081        }
082        catch (Exception ex) {
083            throw new ServiceException(ErrorCode.E1700, ex.getMessage(), ex);
084        }
085
086    }
087
088    /**
089     * Gets the unique id.
090     *
091     * @return the id
092     */
093    @Override
094    protected String createSequence() {
095        String localStartTime = super.startTime;
096        long id = 0L;
097        try {
098            id = getZKSequence();
099        }
100        catch (Exception e) {
101            LOG.error("Error getting jobId, switching to old UUIDService", e);
102            id = super.getCounter();
103            localStartTime = dt.get().format(new Date());
104        }
105        return appendTimeToSequence(id, localStartTime);
106    }
107
108    protected synchronized long getZKSequence() throws Exception {
109        long id = getDistributedSequence();
110
111        if (id >= maxSequence) {
112            resetSequence();
113            id = getDistributedSequence();
114        }
115        return id;
116    }
117
118    @SuppressWarnings("finally")
119    private long getDistributedSequence() throws Exception {
120        if (atomicIdGenerator == null) {
121            throw new Exception("Sequence generator can't be null. Path : " + ZK_SEQUENCE_PATH);
122        }
123        AtomicValue<Long> value = null;
124        try {
125            value = atomicIdGenerator.increment();
126        }
127        catch (Exception e) {
128            throw new Exception("Exception incrementing UID for session ", e);
129        }
130        finally {
131            if (value != null && value.succeeded()) {
132                return value.preValue();
133            }
134            else {
135                throw new Exception("Exception incrementing UID for session ");
136            }
137        }
138    }
139
140    /**
141     * Once sequence is reached limit, reset to 0.
142     *
143     * @throws Exception
144     */
145    private  void resetSequence() throws Exception {
146        for (int i = 0; i < RETRY_COUNT; i++) {
147            AtomicValue<Long> value = atomicIdGenerator.get();
148            if (value.succeeded()) {
149                if (value.postValue() < maxSequence) {
150                    return;
151                }
152            }
153            // Acquire ZK lock, so that other host doesn't reset sequence.
154            LockToken lock = null;
155            try {
156                lock = Services.get().get(MemoryLocksService.class)
157                        .getWriteLock(ZKUUIDService.class.getName(), lockTimeout);
158            }
159            catch (InterruptedException e1) {
160                //ignore
161            }
162            try {
163                if (lock == null) {
164                    LOG.info("Lock is held by other system, will sleep and try again");
165                    Thread.sleep(1000);
166                    continue;
167                }
168                else {
169                    value = atomicIdGenerator.get();
170                    if (value.succeeded()) {
171                        if (value.postValue() < maxSequence) {
172                            return;
173                        }
174                    }
175                    try {
176                        atomicIdGenerator.forceSet(RESET_VALUE);
177                    }
178                    catch (Exception e) {
179                        LOG.info("Exception while resetting sequence, will try again");
180                        continue;
181                    }
182                    resetStartTime();
183                    return;
184                }
185            }
186            finally {
187                if (lock != null) {
188                    lock.release();
189                }
190            }
191        }
192        throw new Exception("Can't reset ID sequence in ZK. Retried " + RETRY_COUNT + " times");
193    }
194
195    @Override
196    public void destroy() {
197        if (zk != null) {
198            zk.unregister(this);
199        }
200        zk = null;
201        super.destroy();
202    }
203
204    public String getPromotedLock() {
205        if (ZKUtils.getZKNameSpace().startsWith("/")) {
206            return ZKUtils.getZKNameSpace() + LOCKS_NODE;
207
208        }
209        else {
210            return "/" + ZKUtils.getZKNameSpace() + LOCKS_NODE;
211        }
212    }
213
214    @VisibleForTesting
215    static void setMaxSequence(long sequence) {
216        maxSequence = sequence;
217    }
218
219    /**
220     * Retries 25 times with delay of 200ms
221     *
222     * @return RetryNTimes
223     */
224    private static RetryPolicy getRetryPolicy() {
225        return new RetryNTimes(25, 200);
226    }
227
228}