001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.util;
019
020import com.google.common.annotations.VisibleForTesting;
021
022import java.io.IOException;
023import java.util.Collections;
024import java.util.HashMap;
025import java.util.HashSet;
026import java.util.List;
027import java.util.Map;
028import java.util.Set;
029
030import javax.security.auth.login.Configuration;
031
032import org.apache.curator.RetryPolicy;
033import org.apache.curator.framework.CuratorFramework;
034import org.apache.curator.framework.CuratorFrameworkFactory;
035import org.apache.curator.framework.api.ACLProvider;
036import org.apache.curator.framework.imps.DefaultACLProvider;
037import org.apache.curator.framework.state.ConnectionState;
038import org.apache.curator.retry.ExponentialBackoffRetry;
039import org.apache.curator.utils.EnsurePath;
040import org.apache.curator.x.discovery.ServiceCache;
041import org.apache.curator.x.discovery.ServiceDiscovery;
042import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
043import org.apache.curator.x.discovery.ServiceInstance;
044import org.apache.curator.x.discovery.details.InstanceSerializer;
045import org.apache.oozie.ErrorCode;
046
047import static org.apache.oozie.service.HadoopAccessorService.KERBEROS_KEYTAB;
048import static org.apache.oozie.service.HadoopAccessorService.KERBEROS_PRINCIPAL;
049
050import org.apache.oozie.event.listener.ZKConnectionListener;
051import org.apache.oozie.service.ConfigurationService;
052import org.apache.oozie.service.ServiceException;
053import org.apache.oozie.service.Services;
054import org.apache.zookeeper.ZooDefs.Perms;
055import org.apache.zookeeper.client.ZooKeeperSaslClient;
056import org.apache.zookeeper.data.ACL;
057import org.apache.zookeeper.data.Id;
058import org.apache.zookeeper.data.Stat;
059
060/**
061 * This class provides a singleton for interacting with ZooKeeper that other classes can use.  It handles connecting to ZooKeeper,
062 * service discovery, and publishing metadata about this server.
063 * <p>
064 * Users of this class should call {@link ZKUtils#register(java.lang.Object)} to obtain the singleton.  This will ensure that we're
065 * properly connected and ready to go with ZooKeeper.  When the user is done (i.e. on shutdown), it should call
066 * {@link ZKUtils#unregister(java.lang.Object)} to let this class know; once there are no more users, this class will automatically
067 * remove itself from ZooKeeper.
068 * <p>
069 * Each Oozie Server provides metadata that can be shared with the other Oozie Servers.  To keep things simple and to make it easy
070 * to add additional metadata in the future, we share a Map.  They keys are defined in {@link ZKMetadataKeys}.
071 * <p>
072 * For the service discovery, the structure in ZooKeeper is /oozie.zookeeper.namespace/ZK_BASE_SERVICES_PATH/ (default is
073 * /oozie/services/).  ZKUtils has a service named "servers" under which each Oozie server creates a ZNode named
074 * ${OOZIE_SERVICE_INSTANCE} (default is the hostname) that contains the metadata payload.  For example, with the default settings,
075 * an Oozie server named "foo" would create a ZNode at /oozie/services/servers/foo where the foo ZNode contains the metadata.
076 * <p>
077 * If oozie.zookeeper.secure is set to true, then Oozie will (a) use jaas to connect to ZooKeeper using SASL/Kerberos based on
078 * Oozie's existing security configuration parameters (b) use/convert every znode under the namespace (including the namespace
079 * itself) to have ACLs such that only Oozie servers have access (i.e. if "service/host@REALM" is the Kerberos principal, then
080 * "service" will be used for the ACLs).
081 * <p>
082 * Oozie server will shutdown itself if ZK connection is lost for ${ZK_CONNECTION_TIMEOUT}.
083 */
084public class ZKUtils {
085    /**
086     * oozie-site property for specifying the ZooKeeper connection string.  Comma-separated values of host:port pairs of the
087     * ZooKeeper servers.
088     */
089    public static final String ZK_CONNECTION_STRING = "oozie.zookeeper.connection.string";
090    /**
091     * oozie-site property for specifying the ZooKeeper namespace to use (e.g. "oozie").  All of the Oozie servers that are planning
092     * on talking to each other should have the same value for this.
093     */
094    public static final String ZK_NAMESPACE = "oozie.zookeeper.namespace";
095
096    /**
097     *Default ZK connection timeout ( in sec).
098     */
099    public static final String ZK_CONNECTION_TIMEOUT = "oozie.zookeeper.connection.timeout";
100
101    /**
102     *Default ZK session timeout ( in sec). If connection is lost after retry, then Oozie server will shutdown itself.
103     */
104    public static final String ZK_SESSION_TIMEOUT = "oozie.zookeeper.session.timeout";
105
106    /**
107     * Maximum number of times to retry.
108     */
109    public static final String ZK_MAX_RETRIES = "oozie.zookeeper.max.retries";
110
111    /**
112     * oozie-env environment variable for specifying the Oozie instance ID
113     */
114    public static final String OOZIE_INSTANCE_ID = "oozie.instance.id";
115
116    /**
117     * oozie-site property for specifying that ZooKeeper is secure.
118     */
119    public static final String ZK_SECURE = "oozie.zookeeper.secure";
120
121    private static final String ZK_OOZIE_SERVICE = "servers";
122    /**
123     * Services that need to put a node in zookeeper should go under here.  Try to keep this area clean and organized.
124     */
125    public static final String ZK_BASE_SERVICES_PATH = "/services";
126
127    private static Set<Object> users = new HashSet<Object>();
128    private CuratorFramework client = null;
129    private String zkId;
130    private long zkRegTime;
131    private ServiceDiscovery<Map> sDiscovery;
132    private ServiceCache<Map> sCache;
133    private List<ACL> saslACL;
134    private XLog log;
135
136    private static ZKUtils zk = null;
137
138
139    /**
140     * Private Constructor for the singleton; it connects to ZooKeeper and advertises this Oozie Server.
141     *
142     * @throws Exception
143     */
144    private ZKUtils() throws Exception {
145        log = XLog.getLog(getClass());
146        zkId = ConfigurationService.get(OOZIE_INSTANCE_ID);
147        if (zkId.isEmpty()) {
148            zkId = ConfigurationService.get("oozie.http.hostname");
149        }
150        createClient();
151        advertiseService();
152        checkAndSetACLs();
153    }
154
155    /**
156     * Classes that want to use ZooKeeper should call this method to get the ZKUtils singleton.
157     *
158     * @param user The calling class
159     * @return the ZKUtils singleton
160     * @throws Exception
161     */
162    public static synchronized ZKUtils register(Object user) throws Exception {
163        if (zk == null) {
164            zk = new ZKUtils();
165        }
166        // Remember the calling class so we can disconnect when everybody is done
167        users.add(user);
168        return zk;
169    }
170
171    /**
172     * Classes should call this when they are done (i.e. shutdown).
173     *
174     * @param user The calling class
175     */
176    public synchronized void unregister(Object user) {
177        // If there are no more classes using ZooKeeper, we should teardown everything.
178        users.remove(user);
179        if (users.isEmpty() && zk != null) {
180            if (ZKConnectionListener.getZKConnectionState() != ConnectionState.LOST) {
181                zk.teardown();
182            }
183            zk = null;
184        }
185    }
186
187    private void createClient() throws Exception {
188        // Connect to the ZooKeeper server
189        RetryPolicy retryPolicy = ZKUtils.getRetryPolicy();
190        String zkConnectionString = ConfigurationService.get(ZK_CONNECTION_STRING);
191        String zkNamespace = getZKNameSpace();
192        int zkConnectionTimeout = ConfigurationService.getInt(ZK_CONNECTION_TIMEOUT);
193        int zkSessionTimeout = ConfigurationService.getInt(ZK_SESSION_TIMEOUT, 300);
194
195        ACLProvider aclProvider;
196        if (Services.get().getConf().getBoolean(ZK_SECURE, false)) {
197            log.info("Connecting to ZooKeeper with SASL/Kerberos and using 'sasl' ACLs");
198            setJaasConfiguration();
199            System.setProperty(ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY, "Client");
200            System.setProperty("zookeeper.authProvider.1", "org.apache.zookeeper.server.auth.SASLAuthenticationProvider");
201            saslACL = Collections.singletonList(new ACL(Perms.ALL, new Id("sasl", getServicePrincipal())));
202            aclProvider = new SASLOwnerACLProvider();
203        } else {
204            log.info("Connecting to ZooKeeper without authentication");
205            aclProvider = new DefaultACLProvider();     // open to everyone
206        }
207        client = CuratorFrameworkFactory.builder()
208                                            .namespace(zkNamespace)
209                                            .connectString(zkConnectionString)
210                                            .retryPolicy(retryPolicy)
211                                            .aclProvider(aclProvider)
212                                            .connectionTimeoutMs(zkConnectionTimeout * 1000) // in ms
213                                            .sessionTimeoutMs(zkSessionTimeout * 1000) //in ms
214                                            .build();
215        client.start();
216        client.getConnectionStateListenable().addListener(new ZKConnectionListener());
217    }
218
219    private void advertiseService() throws Exception {
220        // Advertise on the service discovery
221        new EnsurePath(ZK_BASE_SERVICES_PATH).ensure(client.getZookeeperClient());
222        InstanceSerializer<Map> instanceSerializer = new FixedJsonInstanceSerializer<Map>(Map.class);
223        sDiscovery = ServiceDiscoveryBuilder.builder(Map.class)
224                                                .basePath(ZK_BASE_SERVICES_PATH)
225                                                .client(client)
226                                                .serializer(instanceSerializer)
227                                                .build();
228        sDiscovery.start();
229        sDiscovery.registerService(getMetadataInstance());
230
231        // Create the service discovery cache
232        sCache = sDiscovery.serviceCacheBuilder().name(ZK_OOZIE_SERVICE).build();
233        sCache.start();
234
235        zkRegTime = sDiscovery.queryForInstance(ZK_OOZIE_SERVICE, zkId).getRegistrationTimeUTC();
236    }
237
238    private void unadvertiseService() throws Exception {
239        // Stop the service discovery cache
240        sCache.close();
241
242        // Unadvertise on the service discovery
243        sDiscovery.unregisterService(getMetadataInstance());
244        sDiscovery.close();
245    }
246
247    private void teardown() {
248        try {
249            zk.unadvertiseService();
250        }
251        catch (Exception ex) {
252            log.warn("Exception occurred while unadvertising: " + ex.getMessage(), ex);
253        }
254        client.close();
255        client = null;
256    }
257
258    private ServiceInstance<Map> getMetadataInstance() throws Exception {
259        // Creates the metadata that this server is providing to ZooKeeper and other Oozie Servers
260        String url = ConfigUtils.getOozieEffectiveUrl();
261        Map<String, String> map = new HashMap<String, String>();
262        map.put(ZKMetadataKeys.OOZIE_ID, zkId);
263        map.put(ZKMetadataKeys.OOZIE_URL, url);
264
265        return ServiceInstance.<Map>builder()
266            .name(ZK_OOZIE_SERVICE)
267            .id(zkId)
268            .payload(map)
269            .build();
270    }
271
272    /**
273     * Returns a list of the metadata provided by all of the Oozie Servers.  Note that the metadata is cached so it may be a second
274     * or two stale.
275     *
276     * @return a List of the metadata provided by all of the Oozie Servers.
277     */
278    public List<ServiceInstance<Map>> getAllMetaData() {
279        List<ServiceInstance<Map>> instances = null;
280        if (sCache != null) {
281            instances = sCache.getInstances();
282        }
283        return instances;
284    }
285
286    /**
287     * Returns the ID of this Oozie Server as seen by ZooKeeper and other Oozie Servers
288     *
289     * @return the ID of this Oozie Server
290     */
291    public String getZKId() {
292        return zkId;
293    }
294
295    /**
296     * Returns the {@link CuratorFramework} used for managing the ZooKeeper connection; it can be used by calling classes to perform
297     * more direct operations on ZooKeeper.  Most of the time, this shouldn't be needed.
298     * <p>
299     * Be careful not to close the connection.
300     *
301     * @return the CuratorFramework object
302     */
303    public CuratorFramework getClient() {
304        return client;
305    }
306
307    /**
308     * Returns the index of this Oozie Server in ZooKeeper's list of Oozie Servers (ordered by registration time)
309     *
310     * @param oozies The collection of metadata provided by all of the Oozie Servers (from calling {@link ZKUtils#getAllMetaData()}
311     * @return the index of this Oozie Server in ZooKeeper's list of Oozie Servers (ordered by registration time)
312     */
313    public int getZKIdIndex(List<ServiceInstance<Map>> oozies) {
314        int index = 0;
315        // We don't actually have to sort all of the IDs, we can simply find out how many are before our zkId
316        for (ServiceInstance<Map> oozie : oozies) {
317            long otherRegTime = oozie.getRegistrationTimeUTC();
318            if (otherRegTime < zkRegTime) {
319                index++;
320            }
321        }
322        return index;
323    }
324
325    private void checkAndSetACLs() throws Exception {
326        if (Services.get().getConf().getBoolean(ZK_SECURE, false)) {
327            // If znodes were previously created without security enabled, and now it is, we need to go through all existing znodes
328            // and set the ACLs for them
329            // We can't get the namespace znode through curator; have to go through zk client
330            String namespace = "/" + client.getNamespace();
331            if (client.getZookeeperClient().getZooKeeper().exists(namespace, null) != null) {
332                List<ACL> acls = client.getZookeeperClient().getZooKeeper().getACL(namespace, new Stat());
333                if (!acls.get(0).getId().getScheme().equals("sasl")) {
334                    log.info("'sasl' ACLs not set; setting...");
335                    List<String> children = client.getZookeeperClient().getZooKeeper().getChildren(namespace, null);
336                    for (String child : children) {
337                        checkAndSetACLs("/" + child);
338                    }
339                    client.getZookeeperClient().getZooKeeper().setACL(namespace, saslACL, -1);
340                }
341            }
342        }
343    }
344
345    private void checkAndSetACLs(String path) throws Exception {
346        List<String> children = client.getChildren().forPath(path);
347        for (String child : children) {
348            checkAndSetACLs(path + "/" + child);
349        }
350        client.setACL().withACL(saslACL).forPath(path);
351    }
352
353    // This gets ignored during most tests, see ZKXTestCaseWithSecurity#setupZKServer()
354    private void setJaasConfiguration() throws ServiceException, IOException {
355        String keytabFile = Services.get().getConf().get(KERBEROS_KEYTAB, System.getProperty("user.home") + "/oozie.keytab").trim();
356        if (keytabFile.length() == 0) {
357            throw new ServiceException(ErrorCode.E0026, KERBEROS_KEYTAB);
358        }
359        String principal = Services.get().getConf().get(KERBEROS_PRINCIPAL, "oozie/localhost@LOCALHOST");
360        if (principal.length() == 0) {
361            throw new ServiceException(ErrorCode.E0026, KERBEROS_PRINCIPAL);
362        }
363
364        // This is equivalent to writing a jaas.conf file and setting the system property, "java.security.auth.login.config", to
365        // point to it (but this way we don't have to write a file, and it works better for the tests)
366        JaasConfiguration.addEntry("Client", principal, keytabFile);
367        Configuration.setConfiguration(JaasConfiguration.getInstance());
368    }
369
370    private String getServicePrincipal() throws ServiceException {
371        String principal = Services.get().getConf().get(KERBEROS_PRINCIPAL, "oozie/localhost@LOCALHOST");
372        if (principal.length() == 0) {
373            throw new ServiceException(ErrorCode.E0026, KERBEROS_PRINCIPAL);
374        }
375        return principal.split("[/@]")[0];
376    }
377
378    /**
379     * Useful for tests to get the registered classes
380     *
381     * @return the set of registered classes
382     */
383    @VisibleForTesting
384    public static Set<Object> getUsers() {
385        return users;
386    }
387
388    /**
389     * Keys used in the metadata provided by each Oozie Server to ZooKeeper and other Oozie Servers
390     */
391    public abstract class ZKMetadataKeys {
392        /**
393         * The ID of the Oozie Server
394         */
395        public static final String OOZIE_ID = "OOZIE_ID";
396        /**
397         * The URL of the Oozie Server
398         */
399        public static final String OOZIE_URL = "OOZIE_URL";
400    }
401
402    /**
403     * Simple implementation of an {@link ACLProvider} that simply returns {@link #saslACL}.
404     */
405    public class SASLOwnerACLProvider implements ACLProvider {
406
407        @Override
408        public List<ACL> getDefaultAcl() {
409            return saslACL;
410        }
411
412        @Override
413        public List<ACL> getAclForPath(String path) {
414            return saslACL;
415        }
416    }
417
418    /**
419     * Returns retry policy
420     *
421     * @return RetryPolicy
422     */
423    public static RetryPolicy getRetryPolicy() {
424        return new ExponentialBackoffRetry(1000, ConfigurationService.getInt(ZK_MAX_RETRIES, 10));
425    }
426
427    /**
428     * Returns configured zk namesapces
429     * @return oozie.zookeeper.namespace
430     */
431    public static String getZKNameSpace() {
432        return ConfigurationService.get(ZK_NAMESPACE);
433    }
434}