001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.util; 019 020import com.google.common.annotations.VisibleForTesting; 021 022import java.io.IOException; 023import java.util.Collections; 024import java.util.HashMap; 025import java.util.HashSet; 026import java.util.List; 027import java.util.Map; 028import java.util.Set; 029 030import javax.security.auth.login.Configuration; 031 032import org.apache.curator.RetryPolicy; 033import org.apache.curator.framework.CuratorFramework; 034import org.apache.curator.framework.CuratorFrameworkFactory; 035import org.apache.curator.framework.api.ACLProvider; 036import org.apache.curator.framework.imps.DefaultACLProvider; 037import org.apache.curator.framework.state.ConnectionState; 038import org.apache.curator.retry.ExponentialBackoffRetry; 039import org.apache.curator.utils.EnsurePath; 040import org.apache.curator.x.discovery.ServiceCache; 041import org.apache.curator.x.discovery.ServiceDiscovery; 042import org.apache.curator.x.discovery.ServiceDiscoveryBuilder; 043import org.apache.curator.x.discovery.ServiceInstance; 044import org.apache.curator.x.discovery.details.InstanceSerializer; 045import org.apache.oozie.ErrorCode; 046 047import static org.apache.oozie.service.HadoopAccessorService.KERBEROS_KEYTAB; 048import static org.apache.oozie.service.HadoopAccessorService.KERBEROS_PRINCIPAL; 049 050import org.apache.oozie.event.listener.ZKConnectionListener; 051import org.apache.oozie.service.ConfigurationService; 052import org.apache.oozie.service.ServiceException; 053import org.apache.oozie.service.Services; 054import org.apache.zookeeper.ZooDefs.Perms; 055import org.apache.zookeeper.client.ZooKeeperSaslClient; 056import org.apache.zookeeper.data.ACL; 057import org.apache.zookeeper.data.Id; 058import org.apache.zookeeper.data.Stat; 059 060/** 061 * This class provides a singleton for interacting with ZooKeeper that other classes can use. It handles connecting to ZooKeeper, 062 * service discovery, and publishing metadata about this server. 063 * <p> 064 * Users of this class should call {@link ZKUtils#register(java.lang.Object)} to obtain the singleton. This will ensure that we're 065 * properly connected and ready to go with ZooKeeper. When the user is done (i.e. on shutdown), it should call 066 * {@link ZKUtils#unregister(java.lang.Object)} to let this class know; once there are no more users, this class will automatically 067 * remove itself from ZooKeeper. 068 * <p> 069 * Each Oozie Server provides metadata that can be shared with the other Oozie Servers. To keep things simple and to make it easy 070 * to add additional metadata in the future, we share a Map. They keys are defined in {@link ZKMetadataKeys}. 071 * <p> 072 * For the service discovery, the structure in ZooKeeper is /oozie.zookeeper.namespace/ZK_BASE_SERVICES_PATH/ (default is 073 * /oozie/services/). ZKUtils has a service named "servers" under which each Oozie server creates a ZNode named 074 * ${OOZIE_SERVICE_INSTANCE} (default is the hostname) that contains the metadata payload. For example, with the default settings, 075 * an Oozie server named "foo" would create a ZNode at /oozie/services/servers/foo where the foo ZNode contains the metadata. 076 * <p> 077 * If oozie.zookeeper.secure is set to true, then Oozie will (a) use jaas to connect to ZooKeeper using SASL/Kerberos based on 078 * Oozie's existing security configuration parameters (b) use/convert every znode under the namespace (including the namespace 079 * itself) to have ACLs such that only Oozie servers have access (i.e. if "service/host@REALM" is the Kerberos principal, then 080 * "service" will be used for the ACLs). 081 * <p> 082 * Oozie server will shutdown itself if ZK connection is lost for ${ZK_CONNECTION_TIMEOUT}. 083 */ 084public class ZKUtils { 085 /** 086 * oozie-site property for specifying the ZooKeeper connection string. Comma-separated values of host:port pairs of the 087 * ZooKeeper servers. 088 */ 089 public static final String ZK_CONNECTION_STRING = "oozie.zookeeper.connection.string"; 090 /** 091 * oozie-site property for specifying the ZooKeeper namespace to use (e.g. "oozie"). All of the Oozie servers that are planning 092 * on talking to each other should have the same value for this. 093 */ 094 public static final String ZK_NAMESPACE = "oozie.zookeeper.namespace"; 095 096 /** 097 *Default ZK connection timeout ( in sec). 098 */ 099 public static final String ZK_CONNECTION_TIMEOUT = "oozie.zookeeper.connection.timeout"; 100 101 /** 102 *Default ZK session timeout ( in sec). If connection is lost after retry, then Oozie server will shutdown itself. 103 */ 104 public static final String ZK_SESSION_TIMEOUT = "oozie.zookeeper.session.timeout"; 105 106 /** 107 * Maximum number of times to retry. 108 */ 109 public static final String ZK_MAX_RETRIES = "oozie.zookeeper.max.retries"; 110 111 /** 112 * oozie-env environment variable for specifying the Oozie instance ID 113 */ 114 public static final String OOZIE_INSTANCE_ID = "oozie.instance.id"; 115 116 /** 117 * oozie-site property for specifying that ZooKeeper is secure. 118 */ 119 public static final String ZK_SECURE = "oozie.zookeeper.secure"; 120 121 private static final String ZK_OOZIE_SERVICE = "servers"; 122 /** 123 * Services that need to put a node in zookeeper should go under here. Try to keep this area clean and organized. 124 */ 125 public static final String ZK_BASE_SERVICES_PATH = "/services"; 126 127 private static Set<Object> users = new HashSet<Object>(); 128 private CuratorFramework client = null; 129 private String zkId; 130 private long zkRegTime; 131 private ServiceDiscovery<Map> sDiscovery; 132 private ServiceCache<Map> sCache; 133 private List<ACL> saslACL; 134 private XLog log; 135 136 private static ZKUtils zk = null; 137 138 139 /** 140 * Private Constructor for the singleton; it connects to ZooKeeper and advertises this Oozie Server. 141 * 142 * @throws Exception 143 */ 144 private ZKUtils() throws Exception { 145 log = XLog.getLog(getClass()); 146 zkId = ConfigurationService.get(OOZIE_INSTANCE_ID); 147 if (zkId.isEmpty()) { 148 zkId = ConfigurationService.get("oozie.http.hostname"); 149 } 150 createClient(); 151 advertiseService(); 152 checkAndSetACLs(); 153 } 154 155 /** 156 * Classes that want to use ZooKeeper should call this method to get the ZKUtils singleton. 157 * 158 * @param user The calling class 159 * @return the ZKUtils singleton 160 * @throws Exception 161 */ 162 public static synchronized ZKUtils register(Object user) throws Exception { 163 if (zk == null) { 164 zk = new ZKUtils(); 165 } 166 // Remember the calling class so we can disconnect when everybody is done 167 users.add(user); 168 return zk; 169 } 170 171 /** 172 * Classes should call this when they are done (i.e. shutdown). 173 * 174 * @param user The calling class 175 */ 176 public synchronized void unregister(Object user) { 177 // If there are no more classes using ZooKeeper, we should teardown everything. 178 users.remove(user); 179 if (users.isEmpty() && zk != null) { 180 if (ZKConnectionListener.getZKConnectionState() != ConnectionState.LOST) { 181 zk.teardown(); 182 } 183 zk = null; 184 } 185 } 186 187 private void createClient() throws Exception { 188 // Connect to the ZooKeeper server 189 RetryPolicy retryPolicy = ZKUtils.getRetryPolicy(); 190 String zkConnectionString = ConfigurationService.get(ZK_CONNECTION_STRING); 191 String zkNamespace = getZKNameSpace(); 192 int zkConnectionTimeout = ConfigurationService.getInt(ZK_CONNECTION_TIMEOUT); 193 int zkSessionTimeout = ConfigurationService.getInt(ZK_SESSION_TIMEOUT, 300); 194 195 ACLProvider aclProvider; 196 if (Services.get().getConf().getBoolean(ZK_SECURE, false)) { 197 log.info("Connecting to ZooKeeper with SASL/Kerberos and using 'sasl' ACLs"); 198 setJaasConfiguration(); 199 System.setProperty(ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY, "Client"); 200 System.setProperty("zookeeper.authProvider.1", "org.apache.zookeeper.server.auth.SASLAuthenticationProvider"); 201 saslACL = Collections.singletonList(new ACL(Perms.ALL, new Id("sasl", getServicePrincipal()))); 202 aclProvider = new SASLOwnerACLProvider(); 203 } else { 204 log.info("Connecting to ZooKeeper without authentication"); 205 aclProvider = new DefaultACLProvider(); // open to everyone 206 } 207 client = CuratorFrameworkFactory.builder() 208 .namespace(zkNamespace) 209 .connectString(zkConnectionString) 210 .retryPolicy(retryPolicy) 211 .aclProvider(aclProvider) 212 .connectionTimeoutMs(zkConnectionTimeout * 1000) // in ms 213 .sessionTimeoutMs(zkSessionTimeout * 1000) //in ms 214 .build(); 215 client.start(); 216 client.getConnectionStateListenable().addListener(new ZKConnectionListener()); 217 } 218 219 private void advertiseService() throws Exception { 220 // Advertise on the service discovery 221 new EnsurePath(ZK_BASE_SERVICES_PATH).ensure(client.getZookeeperClient()); 222 InstanceSerializer<Map> instanceSerializer = new FixedJsonInstanceSerializer<Map>(Map.class); 223 sDiscovery = ServiceDiscoveryBuilder.builder(Map.class) 224 .basePath(ZK_BASE_SERVICES_PATH) 225 .client(client) 226 .serializer(instanceSerializer) 227 .build(); 228 sDiscovery.start(); 229 sDiscovery.registerService(getMetadataInstance()); 230 231 // Create the service discovery cache 232 sCache = sDiscovery.serviceCacheBuilder().name(ZK_OOZIE_SERVICE).build(); 233 sCache.start(); 234 235 zkRegTime = sDiscovery.queryForInstance(ZK_OOZIE_SERVICE, zkId).getRegistrationTimeUTC(); 236 } 237 238 private void unadvertiseService() throws Exception { 239 // Stop the service discovery cache 240 sCache.close(); 241 242 // Unadvertise on the service discovery 243 sDiscovery.unregisterService(getMetadataInstance()); 244 sDiscovery.close(); 245 } 246 247 private void teardown() { 248 try { 249 zk.unadvertiseService(); 250 } 251 catch (Exception ex) { 252 log.warn("Exception occurred while unadvertising: " + ex.getMessage(), ex); 253 } 254 client.close(); 255 client = null; 256 } 257 258 private ServiceInstance<Map> getMetadataInstance() throws Exception { 259 // Creates the metadata that this server is providing to ZooKeeper and other Oozie Servers 260 String url = ConfigUtils.getOozieEffectiveUrl(); 261 Map<String, String> map = new HashMap<String, String>(); 262 map.put(ZKMetadataKeys.OOZIE_ID, zkId); 263 map.put(ZKMetadataKeys.OOZIE_URL, url); 264 265 return ServiceInstance.<Map>builder() 266 .name(ZK_OOZIE_SERVICE) 267 .id(zkId) 268 .payload(map) 269 .build(); 270 } 271 272 /** 273 * Returns a list of the metadata provided by all of the Oozie Servers. Note that the metadata is cached so it may be a second 274 * or two stale. 275 * 276 * @return a List of the metadata provided by all of the Oozie Servers. 277 */ 278 public List<ServiceInstance<Map>> getAllMetaData() { 279 List<ServiceInstance<Map>> instances = null; 280 if (sCache != null) { 281 instances = sCache.getInstances(); 282 } 283 return instances; 284 } 285 286 /** 287 * Returns the ID of this Oozie Server as seen by ZooKeeper and other Oozie Servers 288 * 289 * @return the ID of this Oozie Server 290 */ 291 public String getZKId() { 292 return zkId; 293 } 294 295 /** 296 * Returns the {@link CuratorFramework} used for managing the ZooKeeper connection; it can be used by calling classes to perform 297 * more direct operations on ZooKeeper. Most of the time, this shouldn't be needed. 298 * <p> 299 * Be careful not to close the connection. 300 * 301 * @return the CuratorFramework object 302 */ 303 public CuratorFramework getClient() { 304 return client; 305 } 306 307 /** 308 * Returns the index of this Oozie Server in ZooKeeper's list of Oozie Servers (ordered by registration time) 309 * 310 * @param oozies The collection of metadata provided by all of the Oozie Servers (from calling {@link ZKUtils#getAllMetaData()} 311 * @return the index of this Oozie Server in ZooKeeper's list of Oozie Servers (ordered by registration time) 312 */ 313 public int getZKIdIndex(List<ServiceInstance<Map>> oozies) { 314 int index = 0; 315 // We don't actually have to sort all of the IDs, we can simply find out how many are before our zkId 316 for (ServiceInstance<Map> oozie : oozies) { 317 long otherRegTime = oozie.getRegistrationTimeUTC(); 318 if (otherRegTime < zkRegTime) { 319 index++; 320 } 321 } 322 return index; 323 } 324 325 private void checkAndSetACLs() throws Exception { 326 if (Services.get().getConf().getBoolean(ZK_SECURE, false)) { 327 // If znodes were previously created without security enabled, and now it is, we need to go through all existing znodes 328 // and set the ACLs for them 329 // We can't get the namespace znode through curator; have to go through zk client 330 String namespace = "/" + client.getNamespace(); 331 if (client.getZookeeperClient().getZooKeeper().exists(namespace, null) != null) { 332 List<ACL> acls = client.getZookeeperClient().getZooKeeper().getACL(namespace, new Stat()); 333 if (!acls.get(0).getId().getScheme().equals("sasl")) { 334 log.info("'sasl' ACLs not set; setting..."); 335 List<String> children = client.getZookeeperClient().getZooKeeper().getChildren(namespace, null); 336 for (String child : children) { 337 checkAndSetACLs("/" + child); 338 } 339 client.getZookeeperClient().getZooKeeper().setACL(namespace, saslACL, -1); 340 } 341 } 342 } 343 } 344 345 private void checkAndSetACLs(String path) throws Exception { 346 List<String> children = client.getChildren().forPath(path); 347 for (String child : children) { 348 checkAndSetACLs(path + "/" + child); 349 } 350 client.setACL().withACL(saslACL).forPath(path); 351 } 352 353 // This gets ignored during most tests, see ZKXTestCaseWithSecurity#setupZKServer() 354 private void setJaasConfiguration() throws ServiceException, IOException { 355 String keytabFile = Services.get().getConf().get(KERBEROS_KEYTAB, System.getProperty("user.home") + "/oozie.keytab").trim(); 356 if (keytabFile.length() == 0) { 357 throw new ServiceException(ErrorCode.E0026, KERBEROS_KEYTAB); 358 } 359 String principal = Services.get().getConf().get(KERBEROS_PRINCIPAL, "oozie/localhost@LOCALHOST"); 360 if (principal.length() == 0) { 361 throw new ServiceException(ErrorCode.E0026, KERBEROS_PRINCIPAL); 362 } 363 364 // This is equivalent to writing a jaas.conf file and setting the system property, "java.security.auth.login.config", to 365 // point to it (but this way we don't have to write a file, and it works better for the tests) 366 JaasConfiguration.addEntry("Client", principal, keytabFile); 367 Configuration.setConfiguration(JaasConfiguration.getInstance()); 368 } 369 370 private String getServicePrincipal() throws ServiceException { 371 String principal = Services.get().getConf().get(KERBEROS_PRINCIPAL, "oozie/localhost@LOCALHOST"); 372 if (principal.length() == 0) { 373 throw new ServiceException(ErrorCode.E0026, KERBEROS_PRINCIPAL); 374 } 375 return principal.split("[/@]")[0]; 376 } 377 378 /** 379 * Useful for tests to get the registered classes 380 * 381 * @return the set of registered classes 382 */ 383 @VisibleForTesting 384 public static Set<Object> getUsers() { 385 return users; 386 } 387 388 /** 389 * Keys used in the metadata provided by each Oozie Server to ZooKeeper and other Oozie Servers 390 */ 391 public abstract class ZKMetadataKeys { 392 /** 393 * The ID of the Oozie Server 394 */ 395 public static final String OOZIE_ID = "OOZIE_ID"; 396 /** 397 * The URL of the Oozie Server 398 */ 399 public static final String OOZIE_URL = "OOZIE_URL"; 400 } 401 402 /** 403 * Simple implementation of an {@link ACLProvider} that simply returns {@link #saslACL}. 404 */ 405 public class SASLOwnerACLProvider implements ACLProvider { 406 407 @Override 408 public List<ACL> getDefaultAcl() { 409 return saslACL; 410 } 411 412 @Override 413 public List<ACL> getAclForPath(String path) { 414 return saslACL; 415 } 416 } 417 418 /** 419 * Returns retry policy 420 * 421 * @return RetryPolicy 422 */ 423 public static RetryPolicy getRetryPolicy() { 424 return new ExponentialBackoffRetry(1000, ConfigurationService.getInt(ZK_MAX_RETRIES, 10)); 425 } 426 427 /** 428 * Returns configured zk namesapces 429 * @return oozie.zookeeper.namespace 430 */ 431 public static String getZKNameSpace() { 432 return ConfigurationService.get(ZK_NAMESPACE); 433 } 434}