001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.util; 019 020import com.google.common.annotations.VisibleForTesting; 021 022import java.io.IOException; 023import java.util.Collections; 024import java.util.HashMap; 025import java.util.HashSet; 026import java.util.List; 027import java.util.Map; 028import java.util.Set; 029 030import javax.security.auth.login.Configuration; 031 032import org.apache.curator.RetryPolicy; 033import org.apache.curator.framework.CuratorFramework; 034import org.apache.curator.framework.CuratorFrameworkFactory; 035import org.apache.curator.framework.api.ACLProvider; 036import org.apache.curator.framework.imps.DefaultACLProvider; 037import org.apache.curator.framework.state.ConnectionState; 038import org.apache.curator.retry.ExponentialBackoffRetry; 039import org.apache.curator.utils.EnsurePath; 040import org.apache.curator.x.discovery.ServiceCache; 041import org.apache.curator.x.discovery.ServiceDiscovery; 042import org.apache.curator.x.discovery.ServiceDiscoveryBuilder; 043import org.apache.curator.x.discovery.ServiceInstance; 044import org.apache.curator.x.discovery.details.InstanceSerializer; 045import org.apache.oozie.ErrorCode; 046 047import static org.apache.oozie.service.HadoopAccessorService.KERBEROS_KEYTAB; 048import static org.apache.oozie.service.HadoopAccessorService.KERBEROS_PRINCIPAL; 049 050import org.apache.oozie.event.listener.ZKConnectionListener; 051import org.apache.oozie.service.ConfigurationService; 052import org.apache.oozie.service.ServiceException; 053import org.apache.oozie.service.Services; 054import org.apache.zookeeper.ZooDefs.Perms; 055import org.apache.zookeeper.client.ZooKeeperSaslClient; 056import org.apache.zookeeper.data.ACL; 057import org.apache.zookeeper.data.Id; 058import org.apache.zookeeper.data.Stat; 059 060/** 061 * This class provides a singleton for interacting with ZooKeeper that other classes can use. It handles connecting to ZooKeeper, 062 * service discovery, and publishing metadata about this server. 063 * <p> 064 * Users of this class should call {@link ZKUtils#register(java.lang.Object)} to obtain the singleton. This will ensure that we're 065 * properly connected and ready to go with ZooKeeper. When the user is done (i.e. on shutdown), it should call 066 * {@link ZKUtils#unregister(java.lang.Object)} to let this class know; once there are no more users, this class will automatically 067 * remove itself from ZooKeeper. 068 * <p> 069 * Each Oozie Server provides metadata that can be shared with the other Oozie Servers. To keep things simple and to make it easy 070 * to add additional metadata in the future, we share a Map. They keys are defined in {@link ZKMetadataKeys}. 071 * <p> 072 * For the service discovery, the structure in ZooKeeper is /oozie.zookeeper.namespace/ZK_BASE_SERVICES_PATH/ (default is 073 * /oozie/services/). ZKUtils has a service named "servers" under which each Oozie server creates a ZNode named 074 * ${OOZIE_SERVICE_INSTANCE} (default is the hostname) that contains the metadata payload. For example, with the default settings, 075 * an Oozie server named "foo" would create a ZNode at /oozie/services/servers/foo where the foo ZNode contains the metadata. 076 * <p> 077 * If oozie.zookeeper.secure is set to true, then Oozie will (a) use jaas to connect to ZooKeeper using SASL/Kerberos based on 078 * Oozie's existing security configuration parameters (b) use/convert every znode under the namespace (including the namespace 079 * itself) to have ACLs such that only Oozie servers have access (i.e. if "service/host@REALM" is the Kerberos principal, then 080 * "service" will be used for the ACLs). 081 * <p> 082 * Oozie server will shutdown itself if ZK connection is lost for ${ZK_CONNECTION_TIMEOUT}. 083 */ 084public class ZKUtils { 085 /** 086 * oozie-site property for specifying the ZooKeeper connection string. Comma-separated values of host:port pairs of the 087 * ZooKeeper servers. 088 */ 089 public static final String ZK_CONNECTION_STRING = "oozie.zookeeper.connection.string"; 090 /** 091 * oozie-site property for specifying the ZooKeeper namespace to use (e.g. "oozie"). All of the Oozie servers that are planning 092 * on talking to each other should have the same value for this. 093 */ 094 public static final String ZK_NAMESPACE = "oozie.zookeeper.namespace"; 095 096 /** 097 *Default ZK connection timeout ( in sec). If connection is lost for more than timeout, then Oozie server will shutdown itself. 098 */ 099 public static final String ZK_CONNECTION_TIMEOUT = "oozie.zookeeper.connection.timeout"; 100 101 /** 102 * oozie-env environment variable for specifying the Oozie instance ID 103 */ 104 public static final String OOZIE_INSTANCE_ID = "oozie.instance.id"; 105 106 /** 107 * oozie-site property for specifying that ZooKeeper is secure. 108 */ 109 public static final String ZK_SECURE = "oozie.zookeeper.secure"; 110 111 private static final String ZK_OOZIE_SERVICE = "servers"; 112 /** 113 * Services that need to put a node in zookeeper should go under here. Try to keep this area clean and organized. 114 */ 115 public static final String ZK_BASE_SERVICES_PATH = "/services"; 116 117 private static Set<Object> users = new HashSet<Object>(); 118 private CuratorFramework client = null; 119 private String zkId; 120 private long zkRegTime; 121 private ServiceDiscovery<Map> sDiscovery; 122 private ServiceCache<Map> sCache; 123 private List<ACL> saslACL; 124 private XLog log; 125 126 private static ZKUtils zk = null; 127 private static int zkConnectionTimeout; 128 129 /** 130 * Private Constructor for the singleton; it connects to ZooKeeper and advertises this Oozie Server. 131 * 132 * @throws Exception 133 */ 134 private ZKUtils() throws Exception { 135 log = XLog.getLog(getClass()); 136 zkId = ConfigurationService.get(OOZIE_INSTANCE_ID); 137 if (zkId.isEmpty()) { 138 zkId = ConfigurationService.get("oozie.http.hostname"); 139 } 140 createClient(); 141 advertiseService(); 142 checkAndSetACLs(); 143 } 144 145 /** 146 * Classes that want to use ZooKeeper should call this method to get the ZKUtils singleton. 147 * 148 * @param user The calling class 149 * @return the ZKUtils singleton 150 * @throws Exception 151 */ 152 public static synchronized ZKUtils register(Object user) throws Exception { 153 if (zk == null) { 154 zk = new ZKUtils(); 155 } 156 // Remember the calling class so we can disconnect when everybody is done 157 users.add(user); 158 return zk; 159 } 160 161 /** 162 * Classes should call this when they are done (i.e. shutdown). 163 * 164 * @param user The calling class 165 */ 166 public synchronized void unregister(Object user) { 167 // If there are no more classes using ZooKeeper, we should teardown everything. 168 users.remove(user); 169 if (users.isEmpty() && zk != null) { 170 if (ZKConnectionListener.getZKConnectionState() != ConnectionState.LOST) { 171 zk.teardown(); 172 } 173 zk = null; 174 } 175 } 176 177 private void createClient() throws Exception { 178 // Connect to the ZooKeeper server 179 RetryPolicy retryPolicy = ZKUtils.getRetryPolicy(); 180 String zkConnectionString = ConfigurationService.get(ZK_CONNECTION_STRING); 181 String zkNamespace = getZKNameSpace(); 182 zkConnectionTimeout = ConfigurationService.getInt(ZK_CONNECTION_TIMEOUT); 183 184 ACLProvider aclProvider; 185 if (Services.get().getConf().getBoolean(ZK_SECURE, false)) { 186 log.info("Connecting to ZooKeeper with SASL/Kerberos and using 'sasl' ACLs"); 187 setJaasConfiguration(); 188 System.setProperty(ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY, "Client"); 189 System.setProperty("zookeeper.authProvider.1", "org.apache.zookeeper.server.auth.SASLAuthenticationProvider"); 190 saslACL = Collections.singletonList(new ACL(Perms.ALL, new Id("sasl", getServicePrincipal()))); 191 aclProvider = new SASLOwnerACLProvider(); 192 } else { 193 log.info("Connecting to ZooKeeper without authentication"); 194 aclProvider = new DefaultACLProvider(); // open to everyone 195 } 196 client = CuratorFrameworkFactory.builder() 197 .namespace(zkNamespace) 198 .connectString(zkConnectionString) 199 .retryPolicy(retryPolicy) 200 .aclProvider(aclProvider) 201 .connectionTimeoutMs(zkConnectionTimeout * 1000) // in ms 202 .build(); 203 client.start(); 204 client.getConnectionStateListenable().addListener(new ZKConnectionListener()); 205 } 206 207 private void advertiseService() throws Exception { 208 // Advertise on the service discovery 209 new EnsurePath(ZK_BASE_SERVICES_PATH).ensure(client.getZookeeperClient()); 210 InstanceSerializer<Map> instanceSerializer = new FixedJsonInstanceSerializer<Map>(Map.class); 211 sDiscovery = ServiceDiscoveryBuilder.builder(Map.class) 212 .basePath(ZK_BASE_SERVICES_PATH) 213 .client(client) 214 .serializer(instanceSerializer) 215 .build(); 216 sDiscovery.start(); 217 sDiscovery.registerService(getMetadataInstance()); 218 219 // Create the service discovery cache 220 sCache = sDiscovery.serviceCacheBuilder().name(ZK_OOZIE_SERVICE).build(); 221 sCache.start(); 222 223 zkRegTime = sDiscovery.queryForInstance(ZK_OOZIE_SERVICE, zkId).getRegistrationTimeUTC(); 224 } 225 226 private void unadvertiseService() throws Exception { 227 // Stop the service discovery cache 228 sCache.close(); 229 230 // Unadvertise on the service discovery 231 sDiscovery.unregisterService(getMetadataInstance()); 232 sDiscovery.close(); 233 } 234 235 private void teardown() { 236 try { 237 zk.unadvertiseService(); 238 } 239 catch (Exception ex) { 240 log.warn("Exception occurred while unadvertising: " + ex.getMessage(), ex); 241 } 242 client.close(); 243 client = null; 244 } 245 246 private ServiceInstance<Map> getMetadataInstance() throws Exception { 247 // Creates the metadata that this server is providing to ZooKeeper and other Oozie Servers 248 String url = ConfigUtils.getOozieEffectiveUrl(); 249 Map<String, String> map = new HashMap<String, String>(); 250 map.put(ZKMetadataKeys.OOZIE_ID, zkId); 251 map.put(ZKMetadataKeys.OOZIE_URL, url); 252 253 return ServiceInstance.<Map>builder() 254 .name(ZK_OOZIE_SERVICE) 255 .id(zkId) 256 .payload(map) 257 .build(); 258 } 259 260 /** 261 * Returns a list of the metadata provided by all of the Oozie Servers. Note that the metadata is cached so it may be a second 262 * or two stale. 263 * 264 * @return a List of the metadata provided by all of the Oozie Servers. 265 */ 266 public List<ServiceInstance<Map>> getAllMetaData() { 267 List<ServiceInstance<Map>> instances = null; 268 if (sCache != null) { 269 instances = sCache.getInstances(); 270 } 271 return instances; 272 } 273 274 /** 275 * Returns the ID of this Oozie Server as seen by ZooKeeper and other Oozie Servers 276 * 277 * @return the ID of this Oozie Server 278 */ 279 public String getZKId() { 280 return zkId; 281 } 282 283 /** 284 * Returns the {@link CuratorFramework} used for managing the ZooKeeper connection; it can be used by calling classes to perform 285 * more direct operations on ZooKeeper. Most of the time, this shouldn't be needed. 286 * <p> 287 * Be careful not to close the connection. 288 * 289 * @return the CuratorFramework object 290 */ 291 public CuratorFramework getClient() { 292 return client; 293 } 294 295 /** 296 * Returns the index of this Oozie Server in ZooKeeper's list of Oozie Servers (ordered by registration time) 297 * 298 * @param oozies The collection of metadata provided by all of the Oozie Servers (from calling {@link ZKUtils#getAllMetaData()) 299 * @return the index of this Oozie Server in ZooKeeper's list of Oozie Servers (ordered by registration time) 300 */ 301 public int getZKIdIndex(List<ServiceInstance<Map>> oozies) { 302 int index = 0; 303 // We don't actually have to sort all of the IDs, we can simply find out how many are before our zkId 304 for (ServiceInstance<Map> oozie : oozies) { 305 long otherRegTime = oozie.getRegistrationTimeUTC(); 306 if (otherRegTime < zkRegTime) { 307 index++; 308 } 309 } 310 return index; 311 } 312 313 private void checkAndSetACLs() throws Exception { 314 if (Services.get().getConf().getBoolean(ZK_SECURE, false)) { 315 // If znodes were previously created without security enabled, and now it is, we need to go through all existing znodes 316 // and set the ACLs for them 317 // We can't get the namespace znode through curator; have to go through zk client 318 String namespace = "/" + client.getNamespace(); 319 if (client.getZookeeperClient().getZooKeeper().exists(namespace, null) != null) { 320 List<ACL> acls = client.getZookeeperClient().getZooKeeper().getACL(namespace, new Stat()); 321 if (!acls.get(0).getId().getScheme().equals("sasl")) { 322 log.info("'sasl' ACLs not set; setting..."); 323 List<String> children = client.getZookeeperClient().getZooKeeper().getChildren(namespace, null); 324 for (String child : children) { 325 checkAndSetACLs("/" + child); 326 } 327 client.getZookeeperClient().getZooKeeper().setACL(namespace, saslACL, -1); 328 } 329 } 330 } 331 } 332 333 private void checkAndSetACLs(String path) throws Exception { 334 List<String> children = client.getChildren().forPath(path); 335 for (String child : children) { 336 checkAndSetACLs(path + "/" + child); 337 } 338 client.setACL().withACL(saslACL).forPath(path); 339 } 340 341 // This gets ignored during most tests, see ZKXTestCaseWithSecurity#setupZKServer() 342 private void setJaasConfiguration() throws ServiceException, IOException { 343 String keytabFile = Services.get().getConf().get(KERBEROS_KEYTAB, System.getProperty("user.home") + "/oozie.keytab").trim(); 344 if (keytabFile.length() == 0) { 345 throw new ServiceException(ErrorCode.E0026, KERBEROS_KEYTAB); 346 } 347 String principal = Services.get().getConf().get(KERBEROS_PRINCIPAL, "oozie/localhost@LOCALHOST"); 348 if (principal.length() == 0) { 349 throw new ServiceException(ErrorCode.E0026, KERBEROS_PRINCIPAL); 350 } 351 352 // This is equivalent to writing a jaas.conf file and setting the system property, "java.security.auth.login.config", to 353 // point to it (but this way we don't have to write a file, and it works better for the tests) 354 JaasConfiguration.addEntry("Client", principal, keytabFile); 355 Configuration.setConfiguration(JaasConfiguration.getInstance()); 356 } 357 358 private String getServicePrincipal() throws ServiceException { 359 String principal = Services.get().getConf().get(KERBEROS_PRINCIPAL, "oozie/localhost@LOCALHOST"); 360 if (principal.length() == 0) { 361 throw new ServiceException(ErrorCode.E0026, KERBEROS_PRINCIPAL); 362 } 363 return principal.split("[/@]")[0]; 364 } 365 366 /** 367 * Useful for tests to get the registered classes 368 * 369 * @return the set of registered classes 370 */ 371 @VisibleForTesting 372 public static Set<Object> getUsers() { 373 return users; 374 } 375 376 /** 377 * Keys used in the metadata provided by each Oozie Server to ZooKeeper and other Oozie Servers 378 */ 379 public abstract class ZKMetadataKeys { 380 /** 381 * The ID of the Oozie Server 382 */ 383 public static final String OOZIE_ID = "OOZIE_ID"; 384 /** 385 * The URL of the Oozie Server 386 */ 387 public static final String OOZIE_URL = "OOZIE_URL"; 388 } 389 390 /** 391 * Simple implementation of an {@link ACLProvider} that simply returns {@link #saslACL}. 392 */ 393 public class SASLOwnerACLProvider implements ACLProvider { 394 395 @Override 396 public List<ACL> getDefaultAcl() { 397 return saslACL; 398 } 399 400 @Override 401 public List<ACL> getAclForPath(String path) { 402 return saslACL; 403 } 404 } 405 406 /** 407 * Returns retry policy 408 * 409 * @return RetryPolicy 410 */ 411 public static RetryPolicy getRetryPolicy() { 412 return new ExponentialBackoffRetry(1000, 3); 413 } 414 415 /** 416 * Returns configured zk namesapces 417 * @return oozie.zookeeper.namespace 418 */ 419 public static String getZKNameSpace() { 420 return ConfigurationService.get(ZK_NAMESPACE); 421 } 422 /** 423 * Return ZK connection timeout 424 * @return 425 */ 426 public static int getZKConnectionTimeout(){ 427 return zkConnectionTimeout; 428 } 429}