001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.util; 019 020import com.google.common.annotations.VisibleForTesting; 021 022import java.io.IOException; 023import java.util.Collections; 024import java.util.HashMap; 025import java.util.HashSet; 026import java.util.List; 027import java.util.Map; 028import java.util.Set; 029 030import javax.security.auth.login.Configuration; 031 032import org.apache.curator.RetryPolicy; 033import org.apache.curator.framework.CuratorFramework; 034import org.apache.curator.framework.CuratorFrameworkFactory; 035import org.apache.curator.framework.api.ACLProvider; 036import org.apache.curator.framework.imps.DefaultACLProvider; 037import org.apache.curator.framework.state.ConnectionState; 038import org.apache.curator.retry.ExponentialBackoffRetry; 039import org.apache.curator.utils.EnsurePath; 040import org.apache.curator.x.discovery.ServiceCache; 041import org.apache.curator.x.discovery.ServiceDiscovery; 042import org.apache.curator.x.discovery.ServiceDiscoveryBuilder; 043import org.apache.curator.x.discovery.ServiceInstance; 044import org.apache.curator.x.discovery.details.InstanceSerializer; 045import org.apache.oozie.ErrorCode; 046 047import static org.apache.oozie.service.HadoopAccessorService.KERBEROS_KEYTAB; 048import static org.apache.oozie.service.HadoopAccessorService.KERBEROS_PRINCIPAL; 049 050import org.apache.oozie.event.listener.ZKConnectionListener; 051import org.apache.oozie.service.ConfigurationService; 052import org.apache.oozie.service.ServiceException; 053import org.apache.oozie.service.Services; 054import org.apache.zookeeper.ZooDefs.Perms; 055import org.apache.zookeeper.client.ZooKeeperSaslClient; 056import org.apache.zookeeper.data.ACL; 057import org.apache.zookeeper.data.Id; 058import org.apache.zookeeper.data.Stat; 059 060/** 061 * This class provides a singleton for interacting with ZooKeeper that other classes can use. It handles connecting to ZooKeeper, 062 * service discovery, and publishing metadata about this server. 063 * <p> 064 * Users of this class should call {@link ZKUtils#register(java.lang.Object)} to obtain the singleton. This will ensure that we're 065 * properly connected and ready to go with ZooKeeper. When the user is done (i.e. on shutdown), it should call 066 * {@link ZKUtils#unregister(java.lang.Object)} to let this class know; once there are no more users, this class will automatically 067 * remove itself from ZooKeeper. 068 * <p> 069 * Each Oozie Server provides metadata that can be shared with the other Oozie Servers. To keep things simple and to make it easy 070 * to add additional metadata in the future, we share a Map. They keys are defined in {@link ZKMetadataKeys}. 071 * <p> 072 * For the service discovery, the structure in ZooKeeper is /oozie.zookeeper.namespace/ZK_BASE_SERVICES_PATH/ (default is 073 * /oozie/services/). ZKUtils has a service named "servers" under which each Oozie server creates a ZNode named 074 * ${OOZIE_SERVICE_INSTANCE} (default is the hostname) that contains the metadata payload. For example, with the default settings, 075 * an Oozie server named "foo" would create a ZNode at /oozie/services/servers/foo where the foo ZNode contains the metadata. 076 * <p> 077 * If oozie.zookeeper.secure is set to true, then Oozie will (a) use jaas to connect to ZooKeeper using SASL/Kerberos based on 078 * Oozie's existing security configuration parameters (b) use/convert every znode under the namespace (including the namespace 079 * itself) to have ACLs such that only Oozie servers have access (i.e. if "service/host@REALM" is the Kerberos principal, then 080 * "service" will be used for the ACLs). 081 * <p> 082 * Oozie server will shutdown itself if ZK connection is lost for ${ZK_CONNECTION_TIMEOUT}. 083 */ 084public class ZKUtils { 085 /** 086 * oozie-site property for specifying the ZooKeeper connection string. Comma-separated values of host:port pairs of the 087 * ZooKeeper servers. 088 */ 089 public static final String ZK_CONNECTION_STRING = "oozie.zookeeper.connection.string"; 090 /** 091 * oozie-site property for specifying the ZooKeeper namespace to use (e.g. "oozie"). All of the Oozie servers that are planning 092 * on talking to each other should have the same value for this. 093 */ 094 public static final String ZK_NAMESPACE = "oozie.zookeeper.namespace"; 095 096 /** 097 *Default ZK connection timeout ( in sec). 098 */ 099 public static final String ZK_CONNECTION_TIMEOUT = "oozie.zookeeper.connection.timeout"; 100 101 /** 102 *Default ZK session timeout ( in sec). If connection is lost after retry, then Oozie server will shutdown itself. 103 */ 104 public static final String ZK_SESSION_TIMEOUT = "oozie.zookeeper.session.timeout"; 105 106 /** 107 * Maximum number of times to retry. 108 */ 109 public static final String ZK_MAX_RETRIES = "oozie.zookeeper.max.retries"; 110 111 /** 112 * oozie-env environment variable for specifying the Oozie instance ID 113 */ 114 public static final String OOZIE_INSTANCE_ID = "oozie.instance.id"; 115 116 /** 117 * oozie-site property for specifying that ZooKeeper is secure. 118 */ 119 public static final String ZK_SECURE = "oozie.zookeeper.secure"; 120 121 private static final String ZK_OOZIE_SERVICE = "servers"; 122 /** 123 * Services that need to put a node in zookeeper should go under here. Try to keep this area clean and organized. 124 */ 125 public static final String ZK_BASE_SERVICES_PATH = "/services"; 126 127 private static Set<Object> users = new HashSet<Object>(); 128 private CuratorFramework client = null; 129 private String zkId; 130 private long zkRegTime; 131 private ServiceDiscovery<Map> sDiscovery; 132 private ServiceCache<Map> sCache; 133 private List<ACL> saslACL; 134 private XLog log; 135 136 private static ZKUtils zk = null; 137 138 139 /** 140 * Private Constructor for the singleton; it connects to ZooKeeper and advertises this Oozie Server. 141 * 142 * @throws Exception 143 */ 144 private ZKUtils() throws Exception { 145 log = XLog.getLog(getClass()); 146 zkId = ConfigurationService.get(OOZIE_INSTANCE_ID); 147 if (zkId.isEmpty()) { 148 zkId = ConfigurationService.get("oozie.http.hostname"); 149 } 150 createClient(); 151 advertiseService(); 152 checkAndSetACLs(); 153 } 154 155 /** 156 * Classes that want to use ZooKeeper should call this method to get the ZKUtils singleton. 157 * 158 * @param user The calling class 159 * @return the ZKUtils singleton 160 * @throws Exception if there is an issue when connecting to ZooKeeper 161 */ 162 public static synchronized ZKUtils register(Object user) throws Exception { 163 if (zk == null) { 164 zk = new ZKUtils(); 165 } 166 // Remember the calling class so we can disconnect when everybody is done 167 users.add(user); 168 return zk; 169 } 170 171 /** 172 * Classes should call this when they are done (i.e. shutdown). 173 * 174 * @param user The calling class 175 */ 176 public synchronized void unregister(Object user) { 177 // If there are no more classes using ZooKeeper, we should teardown everything. 178 users.remove(user); 179 if (users.isEmpty() && zk != null) { 180 zk.teardown(); 181 zk = null; 182 } 183 } 184 185 private void createClient() throws Exception { 186 // Connect to the ZooKeeper server 187 RetryPolicy retryPolicy = ZKUtils.getRetryPolicy(); 188 String zkConnectionString = ConfigurationService.get(ZK_CONNECTION_STRING); 189 String zkNamespace = getZKNameSpace(); 190 int zkConnectionTimeout = ConfigurationService.getInt(ZK_CONNECTION_TIMEOUT); 191 int zkSessionTimeout = ConfigurationService.getInt(ZK_SESSION_TIMEOUT, 300); 192 193 ACLProvider aclProvider; 194 if (Services.get().getConf().getBoolean(ZK_SECURE, false)) { 195 log.info("Connecting to ZooKeeper with SASL/Kerberos and using 'sasl' ACLs"); 196 setJaasConfiguration(); 197 System.setProperty(ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY, "Client"); 198 System.setProperty("zookeeper.authProvider.1", "org.apache.zookeeper.server.auth.SASLAuthenticationProvider"); 199 saslACL = Collections.singletonList(new ACL(Perms.ALL, new Id("sasl", getServicePrincipal()))); 200 aclProvider = new SASLOwnerACLProvider(); 201 } else { 202 log.info("Connecting to ZooKeeper without authentication"); 203 aclProvider = new DefaultACLProvider(); // open to everyone 204 } 205 client = CuratorFrameworkFactory.builder() 206 .namespace(zkNamespace) 207 .connectString(zkConnectionString) 208 .retryPolicy(retryPolicy) 209 .aclProvider(aclProvider) 210 .connectionTimeoutMs(zkConnectionTimeout * 1000) // in ms 211 .sessionTimeoutMs(zkSessionTimeout * 1000) //in ms 212 .build(); 213 client.start(); 214 client.getConnectionStateListenable().addListener(new ZKConnectionListener()); 215 } 216 217 private void advertiseService() throws Exception { 218 // Advertise on the service discovery 219 new EnsurePath(ZK_BASE_SERVICES_PATH).ensure(client.getZookeeperClient()); 220 InstanceSerializer<Map> instanceSerializer = new FixedJsonInstanceSerializer<Map>(Map.class); 221 sDiscovery = ServiceDiscoveryBuilder.builder(Map.class) 222 .basePath(ZK_BASE_SERVICES_PATH) 223 .client(client) 224 .serializer(instanceSerializer) 225 .build(); 226 sDiscovery.start(); 227 sDiscovery.registerService(getMetadataInstance()); 228 229 // Create the service discovery cache 230 sCache = sDiscovery.serviceCacheBuilder().name(ZK_OOZIE_SERVICE).build(); 231 sCache.start(); 232 233 zkRegTime = sDiscovery.queryForInstance(ZK_OOZIE_SERVICE, zkId).getRegistrationTimeUTC(); 234 } 235 236 private void unadvertiseService() throws Exception { 237 // Stop the service discovery cache 238 sCache.close(); 239 240 // Unadvertise on the service discovery 241 sDiscovery.unregisterService(getMetadataInstance()); 242 sDiscovery.close(); 243 } 244 245 private void teardown() { 246 try { 247 zk.unadvertiseService(); 248 } 249 catch (Exception ex) { 250 log.warn("Exception occurred while unadvertising: " + ex.getMessage(), ex); 251 } 252 client.close(); 253 client = null; 254 } 255 256 private ServiceInstance<Map> getMetadataInstance() throws Exception { 257 // Creates the metadata that this server is providing to ZooKeeper and other Oozie Servers 258 String url = ConfigUtils.getOozieEffectiveUrl(); 259 Map<String, String> map = new HashMap<String, String>(); 260 map.put(ZKMetadataKeys.OOZIE_ID, zkId); 261 map.put(ZKMetadataKeys.OOZIE_URL, url); 262 263 return ServiceInstance.<Map>builder() 264 .name(ZK_OOZIE_SERVICE) 265 .id(zkId) 266 .payload(map) 267 .build(); 268 } 269 270 /** 271 * Returns a list of the metadata provided by all of the Oozie Servers. Note that the metadata is cached so it may be a second 272 * or two stale. 273 * 274 * @return a List of the metadata provided by all of the Oozie Servers. 275 */ 276 public List<ServiceInstance<Map>> getAllMetaData() { 277 List<ServiceInstance<Map>> instances = null; 278 if (sCache != null) { 279 instances = sCache.getInstances(); 280 } 281 return instances; 282 } 283 284 /** 285 * Returns the ID of this Oozie Server as seen by ZooKeeper and other Oozie Servers 286 * 287 * @return the ID of this Oozie Server 288 */ 289 public String getZKId() { 290 return zkId; 291 } 292 293 /** 294 * Returns the {@link CuratorFramework} used for managing the ZooKeeper connection; it can be used by calling classes to perform 295 * more direct operations on ZooKeeper. Most of the time, this shouldn't be needed. 296 * <p> 297 * Be careful not to close the connection. 298 * 299 * @return the CuratorFramework object 300 */ 301 public CuratorFramework getClient() { 302 return client; 303 } 304 305 /** 306 * Returns the index of this Oozie Server in ZooKeeper's list of Oozie Servers (ordered by registration time) 307 * 308 * @param oozies The collection of metadata provided by all of the Oozie Servers (from calling {@link ZKUtils#getAllMetaData()} 309 * @return the index of this Oozie Server in ZooKeeper's list of Oozie Servers (ordered by registration time) 310 */ 311 public int getZKIdIndex(List<ServiceInstance<Map>> oozies) { 312 int index = 0; 313 // We don't actually have to sort all of the IDs, we can simply find out how many are before our zkId 314 for (ServiceInstance<Map> oozie : oozies) { 315 long otherRegTime = oozie.getRegistrationTimeUTC(); 316 if (otherRegTime < zkRegTime) { 317 index++; 318 } 319 } 320 return index; 321 } 322 323 private void checkAndSetACLs() throws Exception { 324 if (Services.get().getConf().getBoolean(ZK_SECURE, false)) { 325 // If znodes were previously created without security enabled, and now it is, we need to go through all existing znodes 326 // and set the ACLs for them 327 // We can't get the namespace znode through curator; have to go through zk client 328 String namespace = "/" + client.getNamespace(); 329 if (client.getZookeeperClient().getZooKeeper().exists(namespace, null) != null) { 330 List<ACL> acls = client.getZookeeperClient().getZooKeeper().getACL(namespace, new Stat()); 331 if (!acls.get(0).getId().getScheme().equals("sasl")) { 332 log.info("'sasl' ACLs not set; setting..."); 333 List<String> children = client.getZookeeperClient().getZooKeeper().getChildren(namespace, null); 334 for (String child : children) { 335 checkAndSetACLs("/" + child); 336 } 337 client.getZookeeperClient().getZooKeeper().setACL(namespace, saslACL, -1); 338 } 339 } 340 } 341 } 342 343 private void checkAndSetACLs(String path) throws Exception { 344 List<String> children = client.getChildren().forPath(path); 345 for (String child : children) { 346 checkAndSetACLs(path + "/" + child); 347 } 348 client.setACL().withACL(saslACL).forPath(path); 349 } 350 351 // This gets ignored during most tests, see ZKXTestCaseWithSecurity#setupZKServer() 352 private void setJaasConfiguration() throws ServiceException, IOException { 353 String keytabFile = Services.get().getConf().get(KERBEROS_KEYTAB, System.getProperty("user.home") + "/oozie.keytab").trim(); 354 if (keytabFile.length() == 0) { 355 throw new ServiceException(ErrorCode.E0026, KERBEROS_KEYTAB); 356 } 357 String principal = Services.get().getConf().get(KERBEROS_PRINCIPAL, "oozie/localhost@LOCALHOST"); 358 if (principal.length() == 0) { 359 throw new ServiceException(ErrorCode.E0026, KERBEROS_PRINCIPAL); 360 } 361 362 // This is equivalent to writing a jaas.conf file and setting the system property, "java.security.auth.login.config", to 363 // point to it (but this way we don't have to write a file, and it works better for the tests) 364 JaasConfiguration.addEntry("Client", principal, keytabFile); 365 Configuration.setConfiguration(JaasConfiguration.getInstance()); 366 } 367 368 private String getServicePrincipal() throws ServiceException { 369 String principal = Services.get().getConf().get(KERBEROS_PRINCIPAL, "oozie/localhost@LOCALHOST"); 370 if (principal.length() == 0) { 371 throw new ServiceException(ErrorCode.E0026, KERBEROS_PRINCIPAL); 372 } 373 return principal.split("[/@]")[0]; 374 } 375 376 /** 377 * Useful for tests to get the registered classes 378 * 379 * @return the set of registered classes 380 */ 381 @VisibleForTesting 382 public static Set<Object> getUsers() { 383 return users; 384 } 385 386 /** 387 * Keys used in the metadata provided by each Oozie Server to ZooKeeper and other Oozie Servers 388 */ 389 public abstract class ZKMetadataKeys { 390 /** 391 * The ID of the Oozie Server 392 */ 393 public static final String OOZIE_ID = "OOZIE_ID"; 394 /** 395 * The URL of the Oozie Server 396 */ 397 public static final String OOZIE_URL = "OOZIE_URL"; 398 } 399 400 /** 401 * Simple implementation of an {@link ACLProvider} that simply returns {@link #saslACL}. 402 */ 403 public class SASLOwnerACLProvider implements ACLProvider { 404 405 @Override 406 public List<ACL> getDefaultAcl() { 407 return saslACL; 408 } 409 410 @Override 411 public List<ACL> getAclForPath(String path) { 412 return saslACL; 413 } 414 } 415 416 /** 417 * Returns retry policy 418 * 419 * @return RetryPolicy 420 */ 421 public static RetryPolicy getRetryPolicy() { 422 return new ExponentialBackoffRetry(1000, ConfigurationService.getInt(ZK_MAX_RETRIES, 10)); 423 } 424 425 /** 426 * Returns configured zk namesapces 427 * @return oozie.zookeeper.namespace 428 */ 429 public static String getZKNameSpace() { 430 return ConfigurationService.get(ZK_NAMESPACE); 431 } 432}