This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.util;
019
020import com.google.common.annotations.VisibleForTesting;
021
022import java.io.IOException;
023import java.util.Collections;
024import java.util.HashMap;
025import java.util.HashSet;
026import java.util.List;
027import java.util.Map;
028import java.util.Set;
029
030import javax.security.auth.login.Configuration;
031
032import org.apache.curator.RetryPolicy;
033import org.apache.curator.framework.CuratorFramework;
034import org.apache.curator.framework.CuratorFrameworkFactory;
035import org.apache.curator.framework.api.ACLProvider;
036import org.apache.curator.framework.imps.DefaultACLProvider;
037import org.apache.curator.framework.state.ConnectionState;
038import org.apache.curator.retry.ExponentialBackoffRetry;
039import org.apache.curator.utils.EnsurePath;
040import org.apache.curator.x.discovery.ServiceCache;
041import org.apache.curator.x.discovery.ServiceDiscovery;
042import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
043import org.apache.curator.x.discovery.ServiceInstance;
044import org.apache.curator.x.discovery.details.InstanceSerializer;
045import org.apache.oozie.ErrorCode;
046
047import static org.apache.oozie.service.HadoopAccessorService.KERBEROS_KEYTAB;
048import static org.apache.oozie.service.HadoopAccessorService.KERBEROS_PRINCIPAL;
049
050import org.apache.oozie.event.listener.ZKConnectionListener;
051import org.apache.oozie.service.ConfigurationService;
052import org.apache.oozie.service.ServiceException;
053import org.apache.oozie.service.Services;
054import org.apache.zookeeper.ZooDefs.Perms;
055import org.apache.zookeeper.client.ZooKeeperSaslClient;
056import org.apache.zookeeper.data.ACL;
057import org.apache.zookeeper.data.Id;
058import org.apache.zookeeper.data.Stat;
059
060/**
061 * This class provides a singleton for interacting with ZooKeeper that other classes can use.  It handles connecting to ZooKeeper,
062 * service discovery, and publishing metadata about this server.
063 * <p>
064 * Users of this class should call {@link ZKUtils#register(java.lang.Object)} to obtain the singleton.  This will ensure that we're
065 * properly connected and ready to go with ZooKeeper.  When the user is done (i.e. on shutdown), it should call
066 * {@link ZKUtils#unregister(java.lang.Object)} to let this class know; once there are no more users, this class will automatically
067 * remove itself from ZooKeeper.
068 * <p>
069 * Each Oozie Server provides metadata that can be shared with the other Oozie Servers.  To keep things simple and to make it easy
070 * to add additional metadata in the future, we share a Map.  They keys are defined in {@link ZKMetadataKeys}.
071 * <p>
072 * For the service discovery, the structure in ZooKeeper is /oozie.zookeeper.namespace/ZK_BASE_SERVICES_PATH/ (default is
073 * /oozie/services/).  ZKUtils has a service named "servers" under which each Oozie server creates a ZNode named
074 * ${OOZIE_SERVICE_INSTANCE} (default is the hostname) that contains the metadata payload.  For example, with the default settings,
075 * an Oozie server named "foo" would create a ZNode at /oozie/services/servers/foo where the foo ZNode contains the metadata.
076 * <p>
077 * If oozie.zookeeper.secure is set to true, then Oozie will (a) use jaas to connect to ZooKeeper using SASL/Kerberos based on
078 * Oozie's existing security configuration parameters (b) use/convert every znode under the namespace (including the namespace
079 * itself) to have ACLs such that only Oozie servers have access (i.e. if "service/host@REALM" is the Kerberos principal, then
080 * "service" will be used for the ACLs).
081 * <p>
082 * Oozie server will shutdown itself if ZK connection is lost for ${ZK_CONNECTION_TIMEOUT}.
083 */
084public class ZKUtils {
085    /**
086     * oozie-site property for specifying the ZooKeeper connection string.  Comma-separated values of host:port pairs of the
087     * ZooKeeper servers.
088     */
089    public static final String ZK_CONNECTION_STRING = "oozie.zookeeper.connection.string";
090    /**
091     * oozie-site property for specifying the ZooKeeper namespace to use (e.g. "oozie").  All of the Oozie servers that are planning
092     * on talking to each other should have the same value for this.
093     */
094    public static final String ZK_NAMESPACE = "oozie.zookeeper.namespace";
095
096    /**
097     *Default ZK connection timeout ( in sec). If connection is lost for more than timeout, then Oozie server will shutdown itself.
098     */
099    public static final String ZK_CONNECTION_TIMEOUT = "oozie.zookeeper.connection.timeout";
100
101    /**
102     * oozie-env environment variable for specifying the Oozie instance ID
103     */
104    public static final String OOZIE_INSTANCE_ID = "oozie.instance.id";
105
106    /**
107     * oozie-site property for specifying that ZooKeeper is secure.
108     */
109    public static final String ZK_SECURE = "oozie.zookeeper.secure";
110
111    private static final String ZK_OOZIE_SERVICE = "servers";
112    /**
113     * Services that need to put a node in zookeeper should go under here.  Try to keep this area clean and organized.
114     */
115    public static final String ZK_BASE_SERVICES_PATH = "/services";
116
117    private static Set<Object> users = new HashSet<Object>();
118    private CuratorFramework client = null;
119    private String zkId;
120    private long zkRegTime;
121    private ServiceDiscovery<Map> sDiscovery;
122    private ServiceCache<Map> sCache;
123    private List<ACL> saslACL;
124    private XLog log;
125
126    private static ZKUtils zk = null;
127    private static int zkConnectionTimeout;
128
129    /**
130     * Private Constructor for the singleton; it connects to ZooKeeper and advertises this Oozie Server.
131     *
132     * @throws Exception
133     */
134    private ZKUtils() throws Exception {
135        log = XLog.getLog(getClass());
136        zkId = ConfigurationService.get(OOZIE_INSTANCE_ID);
137        if (zkId.isEmpty()) {
138            zkId = ConfigurationService.get("oozie.http.hostname");
139        }
140        createClient();
141        advertiseService();
142        checkAndSetACLs();
143    }
144
145    /**
146     * Classes that want to use ZooKeeper should call this method to get the ZKUtils singleton.
147     *
148     * @param user The calling class
149     * @return the ZKUtils singleton
150     * @throws Exception
151     */
152    public static synchronized ZKUtils register(Object user) throws Exception {
153        if (zk == null) {
154            zk = new ZKUtils();
155        }
156        // Remember the calling class so we can disconnect when everybody is done
157        users.add(user);
158        return zk;
159    }
160
161    /**
162     * Classes should call this when they are done (i.e. shutdown).
163     *
164     * @param user The calling class
165     */
166    public synchronized void unregister(Object user) {
167        // If there are no more classes using ZooKeeper, we should teardown everything.
168        users.remove(user);
169        if (users.isEmpty() && zk != null) {
170            if (ZKConnectionListener.getZKConnectionState() != ConnectionState.LOST) {
171                zk.teardown();
172            }
173            zk = null;
174        }
175    }
176
177    private void createClient() throws Exception {
178        // Connect to the ZooKeeper server
179        RetryPolicy retryPolicy = ZKUtils.getRetryPolicy();
180        String zkConnectionString = ConfigurationService.get(ZK_CONNECTION_STRING);
181        String zkNamespace = getZKNameSpace();
182        zkConnectionTimeout = ConfigurationService.getInt(ZK_CONNECTION_TIMEOUT);
183
184        ACLProvider aclProvider;
185        if (Services.get().getConf().getBoolean(ZK_SECURE, false)) {
186            log.info("Connecting to ZooKeeper with SASL/Kerberos and using 'sasl' ACLs");
187            setJaasConfiguration();
188            System.setProperty(ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY, "Client");
189            System.setProperty("zookeeper.authProvider.1", "org.apache.zookeeper.server.auth.SASLAuthenticationProvider");
190            saslACL = Collections.singletonList(new ACL(Perms.ALL, new Id("sasl", getServicePrincipal())));
191            aclProvider = new SASLOwnerACLProvider();
192        } else {
193            log.info("Connecting to ZooKeeper without authentication");
194            aclProvider = new DefaultACLProvider();     // open to everyone
195        }
196        client = CuratorFrameworkFactory.builder()
197                                            .namespace(zkNamespace)
198                                            .connectString(zkConnectionString)
199                                            .retryPolicy(retryPolicy)
200                                            .aclProvider(aclProvider)
201                                            .connectionTimeoutMs(zkConnectionTimeout * 1000) // in ms
202                                            .build();
203        client.start();
204        client.getConnectionStateListenable().addListener(new ZKConnectionListener());
205    }
206
207    private void advertiseService() throws Exception {
208        // Advertise on the service discovery
209        new EnsurePath(ZK_BASE_SERVICES_PATH).ensure(client.getZookeeperClient());
210        InstanceSerializer<Map> instanceSerializer = new FixedJsonInstanceSerializer<Map>(Map.class);
211        sDiscovery = ServiceDiscoveryBuilder.builder(Map.class)
212                                                .basePath(ZK_BASE_SERVICES_PATH)
213                                                .client(client)
214                                                .serializer(instanceSerializer)
215                                                .build();
216        sDiscovery.start();
217        sDiscovery.registerService(getMetadataInstance());
218
219        // Create the service discovery cache
220        sCache = sDiscovery.serviceCacheBuilder().name(ZK_OOZIE_SERVICE).build();
221        sCache.start();
222
223        zkRegTime = sDiscovery.queryForInstance(ZK_OOZIE_SERVICE, zkId).getRegistrationTimeUTC();
224    }
225
226    private void unadvertiseService() throws Exception {
227        // Stop the service discovery cache
228        sCache.close();
229
230        // Unadvertise on the service discovery
231        sDiscovery.unregisterService(getMetadataInstance());
232        sDiscovery.close();
233    }
234
235    private void teardown() {
236        try {
237            zk.unadvertiseService();
238        }
239        catch (Exception ex) {
240            log.warn("Exception occurred while unadvertising: " + ex.getMessage(), ex);
241        }
242        client.close();
243        client = null;
244    }
245
246    private ServiceInstance<Map> getMetadataInstance() throws Exception {
247        // Creates the metadata that this server is providing to ZooKeeper and other Oozie Servers
248        String url = ConfigUtils.getOozieEffectiveUrl();
249        Map<String, String> map = new HashMap<String, String>();
250        map.put(ZKMetadataKeys.OOZIE_ID, zkId);
251        map.put(ZKMetadataKeys.OOZIE_URL, url);
252
253        return ServiceInstance.<Map>builder()
254            .name(ZK_OOZIE_SERVICE)
255            .id(zkId)
256            .payload(map)
257            .build();
258    }
259
260    /**
261     * Returns a list of the metadata provided by all of the Oozie Servers.  Note that the metadata is cached so it may be a second
262     * or two stale.
263     *
264     * @return a List of the metadata provided by all of the Oozie Servers.
265     */
266    public List<ServiceInstance<Map>> getAllMetaData() {
267        List<ServiceInstance<Map>> instances = null;
268        if (sCache != null) {
269            instances = sCache.getInstances();
270        }
271        return instances;
272    }
273
274    /**
275     * Returns the ID of this Oozie Server as seen by ZooKeeper and other Oozie Servers
276     *
277     * @return the ID of this Oozie Server
278     */
279    public String getZKId() {
280        return zkId;
281    }
282
283    /**
284     * Returns the {@link CuratorFramework} used for managing the ZooKeeper connection; it can be used by calling classes to perform
285     * more direct operations on ZooKeeper.  Most of the time, this shouldn't be needed.
286     * <p>
287     * Be careful not to close the connection.
288     *
289     * @return the CuratorFramework object
290     */
291    public CuratorFramework getClient() {
292        return client;
293    }
294
295    /**
296     * Returns the index of this Oozie Server in ZooKeeper's list of Oozie Servers (ordered by registration time)
297     *
298     * @param oozies The collection of metadata provided by all of the Oozie Servers (from calling {@link ZKUtils#getAllMetaData())
299     * @return the index of this Oozie Server in ZooKeeper's list of Oozie Servers (ordered by registration time)
300     */
301    public int getZKIdIndex(List<ServiceInstance<Map>> oozies) {
302        int index = 0;
303        // We don't actually have to sort all of the IDs, we can simply find out how many are before our zkId
304        for (ServiceInstance<Map> oozie : oozies) {
305            long otherRegTime = oozie.getRegistrationTimeUTC();
306            if (otherRegTime < zkRegTime) {
307                index++;
308            }
309        }
310        return index;
311    }
312
313    private void checkAndSetACLs() throws Exception {
314        if (Services.get().getConf().getBoolean(ZK_SECURE, false)) {
315            // If znodes were previously created without security enabled, and now it is, we need to go through all existing znodes
316            // and set the ACLs for them
317            // We can't get the namespace znode through curator; have to go through zk client
318            String namespace = "/" + client.getNamespace();
319            if (client.getZookeeperClient().getZooKeeper().exists(namespace, null) != null) {
320                List<ACL> acls = client.getZookeeperClient().getZooKeeper().getACL(namespace, new Stat());
321                if (!acls.get(0).getId().getScheme().equals("sasl")) {
322                    log.info("'sasl' ACLs not set; setting...");
323                    List<String> children = client.getZookeeperClient().getZooKeeper().getChildren(namespace, null);
324                    for (String child : children) {
325                        checkAndSetACLs("/" + child);
326                    }
327                    client.getZookeeperClient().getZooKeeper().setACL(namespace, saslACL, -1);
328                }
329            }
330        }
331    }
332
333    private void checkAndSetACLs(String path) throws Exception {
334        List<String> children = client.getChildren().forPath(path);
335        for (String child : children) {
336            checkAndSetACLs(path + "/" + child);
337        }
338        client.setACL().withACL(saslACL).forPath(path);
339    }
340
341    // This gets ignored during most tests, see ZKXTestCaseWithSecurity#setupZKServer()
342    private void setJaasConfiguration() throws ServiceException, IOException {
343        String keytabFile = Services.get().getConf().get(KERBEROS_KEYTAB, System.getProperty("user.home") + "/oozie.keytab").trim();
344        if (keytabFile.length() == 0) {
345            throw new ServiceException(ErrorCode.E0026, KERBEROS_KEYTAB);
346        }
347        String principal = Services.get().getConf().get(KERBEROS_PRINCIPAL, "oozie/localhost@LOCALHOST");
348        if (principal.length() == 0) {
349            throw new ServiceException(ErrorCode.E0026, KERBEROS_PRINCIPAL);
350        }
351
352        // This is equivalent to writing a jaas.conf file and setting the system property, "java.security.auth.login.config", to
353        // point to it (but this way we don't have to write a file, and it works better for the tests)
354        JaasConfiguration.addEntry("Client", principal, keytabFile);
355        Configuration.setConfiguration(JaasConfiguration.getInstance());
356    }
357
358    private String getServicePrincipal() throws ServiceException {
359        String principal = Services.get().getConf().get(KERBEROS_PRINCIPAL, "oozie/localhost@LOCALHOST");
360        if (principal.length() == 0) {
361            throw new ServiceException(ErrorCode.E0026, KERBEROS_PRINCIPAL);
362        }
363        return principal.split("[/@]")[0];
364    }
365
366    /**
367     * Useful for tests to get the registered classes
368     *
369     * @return the set of registered classes
370     */
371    @VisibleForTesting
372    public static Set<Object> getUsers() {
373        return users;
374    }
375
376    /**
377     * Keys used in the metadata provided by each Oozie Server to ZooKeeper and other Oozie Servers
378     */
379    public abstract class ZKMetadataKeys {
380        /**
381         * The ID of the Oozie Server
382         */
383        public static final String OOZIE_ID = "OOZIE_ID";
384        /**
385         * The URL of the Oozie Server
386         */
387        public static final String OOZIE_URL = "OOZIE_URL";
388    }
389
390    /**
391     * Simple implementation of an {@link ACLProvider} that simply returns {@link #saslACL}.
392     */
393    public class SASLOwnerACLProvider implements ACLProvider {
394
395        @Override
396        public List<ACL> getDefaultAcl() {
397            return saslACL;
398        }
399
400        @Override
401        public List<ACL> getAclForPath(String path) {
402            return saslACL;
403        }
404    }
405
406    /**
407     * Returns retry policy
408     *
409     * @return RetryPolicy
410     */
411    public static RetryPolicy getRetryPolicy() {
412        return new ExponentialBackoffRetry(1000, 3);
413    }
414
415    /**
416     * Returns configured zk namesapces
417     * @return oozie.zookeeper.namespace
418     */
419    public static String getZKNameSpace() {
420        return ConfigurationService.get(ZK_NAMESPACE);
421    }
422    /**
423     * Return ZK connection timeout
424     * @return
425     */
426    public static int getZKConnectionTimeout(){
427        return zkConnectionTimeout;
428    }
429}