001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.util;
019
020import com.google.common.annotations.VisibleForTesting;
021import java.io.IOException;
022import java.util.Collections;
023import java.util.HashMap;
024import java.util.HashSet;
025import java.util.List;
026import java.util.Map;
027import java.util.Set;
028import javax.security.auth.login.Configuration;
029import org.apache.curator.RetryPolicy;
030import org.apache.curator.framework.CuratorFramework;
031import org.apache.curator.framework.CuratorFrameworkFactory;
032import org.apache.curator.framework.api.ACLProvider;
033import org.apache.curator.framework.imps.DefaultACLProvider;
034import org.apache.curator.retry.ExponentialBackoffRetry;
035import org.apache.curator.utils.EnsurePath;
036import org.apache.curator.x.discovery.ServiceCache;
037import org.apache.curator.x.discovery.ServiceDiscovery;
038import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
039import org.apache.curator.x.discovery.ServiceInstance;
040import org.apache.curator.x.discovery.details.InstanceSerializer;
041import org.apache.oozie.ErrorCode;
042import static org.apache.oozie.service.HadoopAccessorService.KERBEROS_KEYTAB;
043import static org.apache.oozie.service.HadoopAccessorService.KERBEROS_PRINCIPAL;
044import org.apache.oozie.service.ServiceException;
045import org.apache.oozie.service.Services;
046import org.apache.zookeeper.ZooDefs.Perms;
047import org.apache.zookeeper.client.ZooKeeperSaslClient;
048import org.apache.zookeeper.data.ACL;
049import org.apache.zookeeper.data.Id;
050import org.apache.zookeeper.data.Stat;
051
052
053/**
054 * This class provides a singleton for interacting with ZooKeeper that other classes can use.  It handles connecting to ZooKeeper,
055 * service discovery, and publishing metadata about this server.
056 * <p>
057 * Users of this class should call {@link ZKUtils#register(java.lang.Object)} to obtain the singleton.  This will ensure that we're
058 * properly connected and ready to go with ZooKeeper.  When the user is done (i.e. on shutdown), it should call
059 * {@link ZKUtils#unregister(java.lang.Object)} to let this class know; once there are no more users, this class will automatically
060 * remove itself from ZooKeeper.
061 * <p>
062 * Each Oozie Server provides metadata that can be shared with the other Oozie Servers.  To keep things simple and to make it easy
063 * to add additional metadata in the future, we share a Map.  They keys are defined in {@link ZKMetadataKeys}.
064 * <p>
065 * For the service discovery, the structure in ZooKeeper is /oozie.zookeeper.namespace/ZK_BASE_SERVICES_PATH/ (default is
066 * /oozie/services/).  ZKUtils has a service named "servers" under which each Oozie server creates a ZNode named
067 * ${OOZIE_SERVICE_INSTANCE} (default is the hostname) that contains the metadata payload.  For example, with the default settings,
068 * an Oozie server named "foo" would create a ZNode at /oozie/services/servers/foo where the foo ZNode contains the metadata.
069 * <p>
070 * If oozie.zookeeper.secure is set to true, then Oozie will (a) use jaas to connect to ZooKeeper using SASL/Kerberos based on
071 * Oozie's existing security configuration parameters (b) use/convert every znode under the namespace (including the namespace
072 * itself) to have ACLs such that only Oozie servers have access (i.e. if "service/host@REALM" is the Kerberos principal, then
073 * "service" will be used for the ACLs).
074 */
075public class ZKUtils {
076    /**
077     * oozie-site property for specifying the ZooKeeper connection string.  Comma-separated values of host:port pairs of the
078     * ZooKeeper servers.
079     */
080    public static final String ZK_CONNECTION_STRING = "oozie.zookeeper.connection.string";
081    /**
082     * oozie-site property for specifying the ZooKeeper namespace to use (e.g. "oozie").  All of the Oozie servers that are planning
083     * on talking to each other should have the same value for this.
084     */
085    public static final String ZK_NAMESPACE = "oozie.zookeeper.namespace";
086
087    /**
088     * oozie-env environment variable for specifying the Oozie instance ID
089     */
090    public static final String OOZIE_INSTANCE_ID = "oozie.instance.id";
091
092    /**
093     * oozie-site property for specifying that ZooKeeper is secure.
094     */
095    public static final String ZK_SECURE = "oozie.zookeeper.secure";
096
097    private static final String ZK_OOZIE_SERVICE = "servers";
098    /**
099     * Services that need to put a node in zookeeper should go under here.  Try to keep this area clean and organized.
100     */
101    public static final String ZK_BASE_SERVICES_PATH = "/services";
102
103    private static Set<Object> users = new HashSet<Object>();
104    private CuratorFramework client = null;
105    private String zkId;
106    private long zkRegTime;
107    private ServiceDiscovery<Map> sDiscovery;
108    private ServiceCache<Map> sCache;
109    private List<ACL> saslACL;
110    private XLog log;
111
112    private static ZKUtils zk = null;
113
114    /**
115     * Private Constructor for the singleton; it connects to ZooKeeper and advertises this Oozie Server.
116     *
117     * @throws Exception
118     */
119    private ZKUtils() throws Exception {
120        log = XLog.getLog(getClass());
121        zkId = Services.get().getConf().get(OOZIE_INSTANCE_ID, Services.get().getConf().get("oozie.http.hostname"));
122        createClient();
123        advertiseService();
124        checkAndSetACLs();
125    }
126
127    /**
128     * Classes that want to use ZooKeeper should call this method to get the ZKUtils singleton.
129     *
130     * @param user The calling class
131     * @return the ZKUtils singleton
132     * @throws Exception
133     */
134    public static synchronized ZKUtils register(Object user) throws Exception {
135        if (zk == null) {
136            zk = new ZKUtils();
137        }
138        // Remember the calling class so we can disconnect when everybody is done
139        users.add(user);
140        return zk;
141    }
142
143    /**
144     * Classes should call this when they are done (i.e. shutdown).
145     *
146     * @param user The calling class
147     */
148    public synchronized void unregister(Object user) {
149        // If there are no more classes using ZooKeeper, we should teardown everything.
150        users.remove(user);
151        if (users.isEmpty() && zk != null) {
152            zk.teardown();
153            zk = null;
154        }
155    }
156
157    private void createClient() throws Exception {
158        // Connect to the ZooKeeper server
159        RetryPolicy retryPolicy = ZKUtils.getRetryPolicy();
160        String zkConnectionString = Services.get().getConf().get(ZK_CONNECTION_STRING, "localhost:2181");
161        String zkNamespace = getZKNameSpace();
162        ACLProvider aclProvider;
163        if (Services.get().getConf().getBoolean(ZK_SECURE, false)) {
164            log.info("Connecting to ZooKeeper with SASL/Kerberos and using 'sasl' ACLs");
165            setJaasConfiguration();
166            System.setProperty(ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY, "Client");
167            System.setProperty("zookeeper.authProvider.1", "org.apache.zookeeper.server.auth.SASLAuthenticationProvider");
168            saslACL = Collections.singletonList(new ACL(Perms.ALL, new Id("sasl", getServicePrincipal())));
169            aclProvider = new SASLOwnerACLProvider();
170        } else {
171            log.info("Connecting to ZooKeeper without authentication");
172            aclProvider = new DefaultACLProvider();     // open to everyone
173        }
174        client = CuratorFrameworkFactory.builder()
175                                            .namespace(zkNamespace)
176                                            .connectString(zkConnectionString)
177                                            .retryPolicy(retryPolicy)
178                                            .aclProvider(aclProvider)
179                                            .build();
180        client.start();
181    }
182
183    private void advertiseService() throws Exception {
184        // Advertise on the service discovery
185        new EnsurePath(ZK_BASE_SERVICES_PATH).ensure(client.getZookeeperClient());
186        InstanceSerializer<Map> instanceSerializer = new FixedJsonInstanceSerializer<Map>(Map.class);
187        sDiscovery = ServiceDiscoveryBuilder.builder(Map.class)
188                                                .basePath(ZK_BASE_SERVICES_PATH)
189                                                .client(client)
190                                                .serializer(instanceSerializer)
191                                                .build();
192        sDiscovery.start();
193        sDiscovery.registerService(getMetadataInstance());
194
195        // Create the service discovery cache
196        sCache = sDiscovery.serviceCacheBuilder().name(ZK_OOZIE_SERVICE).build();
197        sCache.start();
198
199        zkRegTime = sDiscovery.queryForInstance(ZK_OOZIE_SERVICE, zkId).getRegistrationTimeUTC();
200    }
201
202    private void unadvertiseService() throws Exception {
203        // Stop the service discovery cache
204        sCache.close();
205
206        // Unadvertise on the service discovery
207        sDiscovery.unregisterService(getMetadataInstance());
208        sDiscovery.close();
209    }
210
211    private void teardown() {
212        try {
213            zk.unadvertiseService();
214        }
215        catch (Exception ex) {
216            log.warn("Exception occurred while unadvertising: " + ex.getMessage(), ex);
217        }
218        client.close();
219        client = null;
220    }
221
222    private ServiceInstance<Map> getMetadataInstance() throws Exception {
223        // Creates the metadata that this server is providing to ZooKeeper and other Oozie Servers
224        String url = ConfigUtils.getOozieEffectiveUrl();
225        Map<String, String> map = new HashMap<String, String>();
226        map.put(ZKMetadataKeys.OOZIE_ID, zkId);
227        map.put(ZKMetadataKeys.OOZIE_URL, url);
228
229        return ServiceInstance.<Map>builder()
230            .name(ZK_OOZIE_SERVICE)
231            .id(zkId)
232            .payload(map)
233            .build();
234    }
235
236    /**
237     * Returns a list of the metadata provided by all of the Oozie Servers.  Note that the metadata is cached so it may be a second
238     * or two stale.
239     *
240     * @return a List of the metadata provided by all of the Oozie Servers.
241     */
242    public List<ServiceInstance<Map>> getAllMetaData() {
243        List<ServiceInstance<Map>> instances = null;
244        if (sCache != null) {
245            instances = sCache.getInstances();
246        }
247        return instances;
248    }
249
250    /**
251     * Returns the ID of this Oozie Server as seen by ZooKeeper and other Oozie Servers
252     *
253     * @return the ID of this Oozie Server
254     */
255    public String getZKId() {
256        return zkId;
257    }
258
259    /**
260     * Returns the {@link CuratorFramework} used for managing the ZooKeeper connection; it can be used by calling classes to perform
261     * more direct operations on ZooKeeper.  Most of the time, this shouldn't be needed.
262     * <p>
263     * Be careful not to close the connection.
264     *
265     * @return the CuratorFramework object
266     */
267    public CuratorFramework getClient() {
268        return client;
269    }
270
271    /**
272     * Returns the index of this Oozie Server in ZooKeeper's list of Oozie Servers (ordered by registration time)
273     *
274     * @param oozies The collection of metadata provided by all of the Oozie Servers (from calling {@link ZKUtils#getAllMetaData())
275     * @return the index of this Oozie Server in ZooKeeper's list of Oozie Servers (ordered by registration time)
276     */
277    public int getZKIdIndex(List<ServiceInstance<Map>> oozies) {
278        int index = 0;
279        // We don't actually have to sort all of the IDs, we can simply find out how many are before our zkId
280        for (ServiceInstance<Map> oozie : oozies) {
281            long otherRegTime = oozie.getRegistrationTimeUTC();
282            if (otherRegTime < zkRegTime) {
283                index++;
284            }
285        }
286        return index;
287    }
288
289    private void checkAndSetACLs() throws Exception {
290        if (Services.get().getConf().getBoolean(ZK_SECURE, false)) {
291            // If znodes were previously created without security enabled, and now it is, we need to go through all existing znodes
292            // and set the ACLs for them
293            // We can't get the namespace znode through curator; have to go through zk client
294            String namespace = "/" + client.getNamespace();
295            if (client.getZookeeperClient().getZooKeeper().exists(namespace, null) != null) {
296                List<ACL> acls = client.getZookeeperClient().getZooKeeper().getACL(namespace, new Stat());
297                if (!acls.get(0).getId().getScheme().equals("sasl")) {
298                    log.info("'sasl' ACLs not set; setting...");
299                    List<String> children = client.getZookeeperClient().getZooKeeper().getChildren(namespace, null);
300                    for (String child : children) {
301                        checkAndSetACLs(child);
302                    }
303                    client.getZookeeperClient().getZooKeeper().setACL(namespace, saslACL, -1);
304                }
305            }
306        }
307    }
308
309    private void checkAndSetACLs(String path) throws Exception {
310        List<String> children = client.getChildren().forPath(path);
311        for (String child : children) {
312            checkAndSetACLs(path + "/" + child);
313        }
314        client.setACL().withACL(saslACL).forPath(path);
315    }
316
317    // This gets ignored during most tests, see ZKXTestCaseWithSecurity#setupZKServer()
318    private void setJaasConfiguration() throws ServiceException, IOException {
319        String keytabFile = Services.get().getConf().get(KERBEROS_KEYTAB, System.getProperty("user.home") + "/oozie.keytab").trim();
320        if (keytabFile.length() == 0) {
321            throw new ServiceException(ErrorCode.E0026, KERBEROS_KEYTAB);
322        }
323        String principal = Services.get().getConf().get(KERBEROS_PRINCIPAL, "oozie/localhost@LOCALHOST");
324        if (principal.length() == 0) {
325            throw new ServiceException(ErrorCode.E0026, KERBEROS_PRINCIPAL);
326        }
327
328        // This is equivalent to writing a jaas.conf file and setting the system property, "java.security.auth.login.config", to
329        // point to it (but this way we don't have to write a file, and it works better for the tests)
330        JaasConfiguration.addEntry("Client", principal, keytabFile);
331        Configuration.setConfiguration(JaasConfiguration.getInstance());
332    }
333
334    private String getServicePrincipal() throws ServiceException {
335        String principal = Services.get().getConf().get(KERBEROS_PRINCIPAL, "oozie/localhost@LOCALHOST");
336        if (principal.length() == 0) {
337            throw new ServiceException(ErrorCode.E0026, KERBEROS_PRINCIPAL);
338        }
339        return principal.split("[/@]")[0];
340    }
341
342    /**
343     * Useful for tests to get the registered classes
344     *
345     * @return the set of registered classes
346     */
347    @VisibleForTesting
348    public static Set<Object> getUsers() {
349        return users;
350    }
351
352    /**
353     * Keys used in the metadata provided by each Oozie Server to ZooKeeper and other Oozie Servers
354     */
355    public abstract class ZKMetadataKeys {
356        /**
357         * The ID of the Oozie Server
358         */
359        public static final String OOZIE_ID = "OOZIE_ID";
360        /**
361         * The URL of the Oozie Server
362         */
363        public static final String OOZIE_URL = "OOZIE_URL";
364    }
365
366    /**
367     * Simple implementation of an {@link ACLProvider} that simply returns {@link #saslACL}.
368     */
369    public class SASLOwnerACLProvider implements ACLProvider {
370
371        @Override
372        public List<ACL> getDefaultAcl() {
373            return saslACL;
374        }
375
376        @Override
377        public List<ACL> getAclForPath(String path) {
378            return saslACL;
379        }
380    }
381    
382    /**
383     * Returns configured zk namesapces
384     * @return oozie.zookeeper.namespace
385     */
386    public static String getZKNameSpace() {
387        return Services.get().getConf().get(ZK_NAMESPACE, "oozie");
388    }
389
390    /**
391     * Returns retry policy
392     *
393     * @return RetryPolicy
394     */
395    public static RetryPolicy getRetryPolicy() {
396        return new ExponentialBackoffRetry(1000, 3);
397    }
398}