001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.util;
019
020import com.google.common.annotations.VisibleForTesting;
021
022import java.io.IOException;
023import java.util.Collections;
024import java.util.HashMap;
025import java.util.HashSet;
026import java.util.List;
027import java.util.Map;
028import java.util.Set;
029
030import javax.security.auth.login.Configuration;
031
032import org.apache.curator.RetryPolicy;
033import org.apache.curator.framework.CuratorFramework;
034import org.apache.curator.framework.CuratorFrameworkFactory;
035import org.apache.curator.framework.api.ACLProvider;
036import org.apache.curator.framework.imps.DefaultACLProvider;
037import org.apache.curator.framework.state.ConnectionState;
038import org.apache.curator.retry.ExponentialBackoffRetry;
039import org.apache.curator.utils.EnsurePath;
040import org.apache.curator.x.discovery.ServiceCache;
041import org.apache.curator.x.discovery.ServiceDiscovery;
042import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
043import org.apache.curator.x.discovery.ServiceInstance;
044import org.apache.curator.x.discovery.details.InstanceSerializer;
045import org.apache.oozie.ErrorCode;
046
047import static org.apache.oozie.service.HadoopAccessorService.KERBEROS_KEYTAB;
048import static org.apache.oozie.service.HadoopAccessorService.KERBEROS_PRINCIPAL;
049
050import org.apache.oozie.event.listener.ZKConnectionListener;
051import org.apache.oozie.service.ConfigurationService;
052import org.apache.oozie.service.ServiceException;
053import org.apache.oozie.service.Services;
054import org.apache.zookeeper.ZooDefs.Perms;
055import org.apache.zookeeper.client.ZooKeeperSaslClient;
056import org.apache.zookeeper.data.ACL;
057import org.apache.zookeeper.data.Id;
058import org.apache.zookeeper.data.Stat;
059
060/**
061 * This class provides a singleton for interacting with ZooKeeper that other classes can use.  It handles connecting to ZooKeeper,
062 * service discovery, and publishing metadata about this server.
063 * <p>
064 * Users of this class should call {@link ZKUtils#register(java.lang.Object)} to obtain the singleton.  This will ensure that we're
065 * properly connected and ready to go with ZooKeeper.  When the user is done (i.e. on shutdown), it should call
066 * {@link ZKUtils#unregister(java.lang.Object)} to let this class know; once there are no more users, this class will automatically
067 * remove itself from ZooKeeper.
068 * <p>
069 * Each Oozie Server provides metadata that can be shared with the other Oozie Servers.  To keep things simple and to make it easy
070 * to add additional metadata in the future, we share a Map.  They keys are defined in {@link ZKMetadataKeys}.
071 * <p>
072 * For the service discovery, the structure in ZooKeeper is /oozie.zookeeper.namespace/ZK_BASE_SERVICES_PATH/ (default is
073 * /oozie/services/).  ZKUtils has a service named "servers" under which each Oozie server creates a ZNode named
074 * ${OOZIE_SERVICE_INSTANCE} (default is the hostname) that contains the metadata payload.  For example, with the default settings,
075 * an Oozie server named "foo" would create a ZNode at /oozie/services/servers/foo where the foo ZNode contains the metadata.
076 * <p>
077 * If oozie.zookeeper.secure is set to true, then Oozie will (a) use jaas to connect to ZooKeeper using SASL/Kerberos based on
078 * Oozie's existing security configuration parameters (b) use/convert every znode under the namespace (including the namespace
079 * itself) to have ACLs such that only Oozie servers have access (i.e. if "service/host@REALM" is the Kerberos principal, then
080 * "service" will be used for the ACLs).
081 * <p>
082 * Oozie server will shutdown itself if ZK connection is lost for ${ZK_CONNECTION_TIMEOUT}.
083 */
084public class ZKUtils {
085    /**
086     * oozie-site property for specifying the ZooKeeper connection string.  Comma-separated values of host:port pairs of the
087     * ZooKeeper servers.
088     */
089    public static final String ZK_CONNECTION_STRING = "oozie.zookeeper.connection.string";
090    /**
091     * oozie-site property for specifying the ZooKeeper namespace to use (e.g. "oozie").  All of the Oozie servers that are planning
092     * on talking to each other should have the same value for this.
093     */
094    public static final String ZK_NAMESPACE = "oozie.zookeeper.namespace";
095
096    /**
097     *Default ZK connection timeout ( in sec).
098     */
099    public static final String ZK_CONNECTION_TIMEOUT = "oozie.zookeeper.connection.timeout";
100
101    /**
102     *Default ZK session timeout ( in sec). If connection is lost after retry, then Oozie server will shutdown itself.
103     */
104    public static final String ZK_SESSION_TIMEOUT = "oozie.zookeeper.session.timeout";
105
106    /**
107     * Maximum number of times to retry.
108     */
109    public static final String ZK_MAX_RETRIES = "oozie.zookeeper.max.retries";
110
111    /**
112     * oozie-env environment variable for specifying the Oozie instance ID
113     */
114    public static final String OOZIE_INSTANCE_ID = "oozie.instance.id";
115
116    /**
117     * oozie-site property for specifying that ZooKeeper is secure.
118     */
119    public static final String ZK_SECURE = "oozie.zookeeper.secure";
120
121    private static final String ZK_OOZIE_SERVICE = "servers";
122    /**
123     * Services that need to put a node in zookeeper should go under here.  Try to keep this area clean and organized.
124     */
125    public static final String ZK_BASE_SERVICES_PATH = "/services";
126
127    private static Set<Object> users = new HashSet<Object>();
128    private CuratorFramework client = null;
129    private String zkId;
130    private long zkRegTime;
131    private ServiceDiscovery<Map> sDiscovery;
132    private ServiceCache<Map> sCache;
133    private List<ACL> saslACL;
134    private XLog log;
135
136    private static ZKUtils zk = null;
137
138
139    /**
140     * Private Constructor for the singleton; it connects to ZooKeeper and advertises this Oozie Server.
141     *
142     * @throws Exception
143     */
144    private ZKUtils() throws Exception {
145        log = XLog.getLog(getClass());
146        zkId = ConfigurationService.get(OOZIE_INSTANCE_ID);
147        if (zkId.isEmpty()) {
148            zkId = ConfigurationService.get("oozie.http.hostname");
149        }
150        createClient();
151        advertiseService();
152        checkAndSetACLs();
153    }
154
155    /**
156     * Classes that want to use ZooKeeper should call this method to get the ZKUtils singleton.
157     *
158     * @param user The calling class
159     * @return the ZKUtils singleton
160     * @throws Exception
161     */
162    public static synchronized ZKUtils register(Object user) throws Exception {
163        if (zk == null) {
164            zk = new ZKUtils();
165        }
166        // Remember the calling class so we can disconnect when everybody is done
167        users.add(user);
168        return zk;
169    }
170
171    /**
172     * Classes should call this when they are done (i.e. shutdown).
173     *
174     * @param user The calling class
175     */
176    public synchronized void unregister(Object user) {
177        // If there are no more classes using ZooKeeper, we should teardown everything.
178        users.remove(user);
179        if (users.isEmpty() && zk != null) {
180            zk.teardown();
181            zk = null;
182        }
183    }
184
185    private void createClient() throws Exception {
186        // Connect to the ZooKeeper server
187        RetryPolicy retryPolicy = ZKUtils.getRetryPolicy();
188        String zkConnectionString = ConfigurationService.get(ZK_CONNECTION_STRING);
189        String zkNamespace = getZKNameSpace();
190        int zkConnectionTimeout = ConfigurationService.getInt(ZK_CONNECTION_TIMEOUT);
191        int zkSessionTimeout = ConfigurationService.getInt(ZK_SESSION_TIMEOUT, 300);
192
193        ACLProvider aclProvider;
194        if (Services.get().getConf().getBoolean(ZK_SECURE, false)) {
195            log.info("Connecting to ZooKeeper with SASL/Kerberos and using 'sasl' ACLs");
196            setJaasConfiguration();
197            System.setProperty(ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY, "Client");
198            System.setProperty("zookeeper.authProvider.1", "org.apache.zookeeper.server.auth.SASLAuthenticationProvider");
199            saslACL = Collections.singletonList(new ACL(Perms.ALL, new Id("sasl", getServicePrincipal())));
200            aclProvider = new SASLOwnerACLProvider();
201        } else {
202            log.info("Connecting to ZooKeeper without authentication");
203            aclProvider = new DefaultACLProvider();     // open to everyone
204        }
205        client = CuratorFrameworkFactory.builder()
206                                            .namespace(zkNamespace)
207                                            .connectString(zkConnectionString)
208                                            .retryPolicy(retryPolicy)
209                                            .aclProvider(aclProvider)
210                                            .connectionTimeoutMs(zkConnectionTimeout * 1000) // in ms
211                                            .sessionTimeoutMs(zkSessionTimeout * 1000) //in ms
212                                            .build();
213        client.start();
214        client.getConnectionStateListenable().addListener(new ZKConnectionListener());
215    }
216
217    private void advertiseService() throws Exception {
218        // Advertise on the service discovery
219        new EnsurePath(ZK_BASE_SERVICES_PATH).ensure(client.getZookeeperClient());
220        InstanceSerializer<Map> instanceSerializer = new FixedJsonInstanceSerializer<Map>(Map.class);
221        sDiscovery = ServiceDiscoveryBuilder.builder(Map.class)
222                                                .basePath(ZK_BASE_SERVICES_PATH)
223                                                .client(client)
224                                                .serializer(instanceSerializer)
225                                                .build();
226        sDiscovery.start();
227        sDiscovery.registerService(getMetadataInstance());
228
229        // Create the service discovery cache
230        sCache = sDiscovery.serviceCacheBuilder().name(ZK_OOZIE_SERVICE).build();
231        sCache.start();
232
233        zkRegTime = sDiscovery.queryForInstance(ZK_OOZIE_SERVICE, zkId).getRegistrationTimeUTC();
234    }
235
236    private void unadvertiseService() throws Exception {
237        // Stop the service discovery cache
238        sCache.close();
239
240        // Unadvertise on the service discovery
241        sDiscovery.unregisterService(getMetadataInstance());
242        sDiscovery.close();
243    }
244
245    private void teardown() {
246        try {
247            zk.unadvertiseService();
248        }
249        catch (Exception ex) {
250            log.warn("Exception occurred while unadvertising: " + ex.getMessage(), ex);
251        }
252        client.close();
253        client = null;
254    }
255
256    private ServiceInstance<Map> getMetadataInstance() throws Exception {
257        // Creates the metadata that this server is providing to ZooKeeper and other Oozie Servers
258        String url = ConfigUtils.getOozieEffectiveUrl();
259        Map<String, String> map = new HashMap<String, String>();
260        map.put(ZKMetadataKeys.OOZIE_ID, zkId);
261        map.put(ZKMetadataKeys.OOZIE_URL, url);
262
263        return ServiceInstance.<Map>builder()
264            .name(ZK_OOZIE_SERVICE)
265            .id(zkId)
266            .payload(map)
267            .build();
268    }
269
270    /**
271     * Returns a list of the metadata provided by all of the Oozie Servers.  Note that the metadata is cached so it may be a second
272     * or two stale.
273     *
274     * @return a List of the metadata provided by all of the Oozie Servers.
275     */
276    public List<ServiceInstance<Map>> getAllMetaData() {
277        List<ServiceInstance<Map>> instances = null;
278        if (sCache != null) {
279            instances = sCache.getInstances();
280        }
281        return instances;
282    }
283
284    /**
285     * Returns the ID of this Oozie Server as seen by ZooKeeper and other Oozie Servers
286     *
287     * @return the ID of this Oozie Server
288     */
289    public String getZKId() {
290        return zkId;
291    }
292
293    /**
294     * Returns the {@link CuratorFramework} used for managing the ZooKeeper connection; it can be used by calling classes to perform
295     * more direct operations on ZooKeeper.  Most of the time, this shouldn't be needed.
296     * <p>
297     * Be careful not to close the connection.
298     *
299     * @return the CuratorFramework object
300     */
301    public CuratorFramework getClient() {
302        return client;
303    }
304
305    /**
306     * Returns the index of this Oozie Server in ZooKeeper's list of Oozie Servers (ordered by registration time)
307     *
308     * @param oozies The collection of metadata provided by all of the Oozie Servers (from calling {@link ZKUtils#getAllMetaData()}
309     * @return the index of this Oozie Server in ZooKeeper's list of Oozie Servers (ordered by registration time)
310     */
311    public int getZKIdIndex(List<ServiceInstance<Map>> oozies) {
312        int index = 0;
313        // We don't actually have to sort all of the IDs, we can simply find out how many are before our zkId
314        for (ServiceInstance<Map> oozie : oozies) {
315            long otherRegTime = oozie.getRegistrationTimeUTC();
316            if (otherRegTime < zkRegTime) {
317                index++;
318            }
319        }
320        return index;
321    }
322
323    private void checkAndSetACLs() throws Exception {
324        if (Services.get().getConf().getBoolean(ZK_SECURE, false)) {
325            // If znodes were previously created without security enabled, and now it is, we need to go through all existing znodes
326            // and set the ACLs for them
327            // We can't get the namespace znode through curator; have to go through zk client
328            String namespace = "/" + client.getNamespace();
329            if (client.getZookeeperClient().getZooKeeper().exists(namespace, null) != null) {
330                List<ACL> acls = client.getZookeeperClient().getZooKeeper().getACL(namespace, new Stat());
331                if (!acls.get(0).getId().getScheme().equals("sasl")) {
332                    log.info("'sasl' ACLs not set; setting...");
333                    List<String> children = client.getZookeeperClient().getZooKeeper().getChildren(namespace, null);
334                    for (String child : children) {
335                        checkAndSetACLs("/" + child);
336                    }
337                    client.getZookeeperClient().getZooKeeper().setACL(namespace, saslACL, -1);
338                }
339            }
340        }
341    }
342
343    private void checkAndSetACLs(String path) throws Exception {
344        List<String> children = client.getChildren().forPath(path);
345        for (String child : children) {
346            checkAndSetACLs(path + "/" + child);
347        }
348        client.setACL().withACL(saslACL).forPath(path);
349    }
350
351    // This gets ignored during most tests, see ZKXTestCaseWithSecurity#setupZKServer()
352    private void setJaasConfiguration() throws ServiceException, IOException {
353        String keytabFile = Services.get().getConf().get(KERBEROS_KEYTAB, System.getProperty("user.home") + "/oozie.keytab").trim();
354        if (keytabFile.length() == 0) {
355            throw new ServiceException(ErrorCode.E0026, KERBEROS_KEYTAB);
356        }
357        String principal = Services.get().getConf().get(KERBEROS_PRINCIPAL, "oozie/localhost@LOCALHOST");
358        if (principal.length() == 0) {
359            throw new ServiceException(ErrorCode.E0026, KERBEROS_PRINCIPAL);
360        }
361
362        // This is equivalent to writing a jaas.conf file and setting the system property, "java.security.auth.login.config", to
363        // point to it (but this way we don't have to write a file, and it works better for the tests)
364        JaasConfiguration.addEntry("Client", principal, keytabFile);
365        Configuration.setConfiguration(JaasConfiguration.getInstance());
366    }
367
368    private String getServicePrincipal() throws ServiceException {
369        String principal = Services.get().getConf().get(KERBEROS_PRINCIPAL, "oozie/localhost@LOCALHOST");
370        if (principal.length() == 0) {
371            throw new ServiceException(ErrorCode.E0026, KERBEROS_PRINCIPAL);
372        }
373        return principal.split("[/@]")[0];
374    }
375
376    /**
377     * Useful for tests to get the registered classes
378     *
379     * @return the set of registered classes
380     */
381    @VisibleForTesting
382    public static Set<Object> getUsers() {
383        return users;
384    }
385
386    /**
387     * Keys used in the metadata provided by each Oozie Server to ZooKeeper and other Oozie Servers
388     */
389    public abstract class ZKMetadataKeys {
390        /**
391         * The ID of the Oozie Server
392         */
393        public static final String OOZIE_ID = "OOZIE_ID";
394        /**
395         * The URL of the Oozie Server
396         */
397        public static final String OOZIE_URL = "OOZIE_URL";
398    }
399
400    /**
401     * Simple implementation of an {@link ACLProvider} that simply returns {@link #saslACL}.
402     */
403    public class SASLOwnerACLProvider implements ACLProvider {
404
405        @Override
406        public List<ACL> getDefaultAcl() {
407            return saslACL;
408        }
409
410        @Override
411        public List<ACL> getAclForPath(String path) {
412            return saslACL;
413        }
414    }
415
416    /**
417     * Returns retry policy
418     *
419     * @return RetryPolicy
420     */
421    public static RetryPolicy getRetryPolicy() {
422        return new ExponentialBackoffRetry(1000, ConfigurationService.getInt(ZK_MAX_RETRIES, 10));
423    }
424
425    /**
426     * Returns configured zk namesapces
427     * @return oozie.zookeeper.namespace
428     */
429    public static String getZKNameSpace() {
430        return ConfigurationService.get(ZK_NAMESPACE);
431    }
432}