001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.util; 019 020import com.google.common.annotations.VisibleForTesting; 021import java.io.IOException; 022import java.util.Collections; 023import java.util.HashMap; 024import java.util.HashSet; 025import java.util.List; 026import java.util.Map; 027import java.util.Set; 028import javax.security.auth.login.Configuration; 029import org.apache.curator.RetryPolicy; 030import org.apache.curator.framework.CuratorFramework; 031import org.apache.curator.framework.CuratorFrameworkFactory; 032import org.apache.curator.framework.api.ACLProvider; 033import org.apache.curator.framework.imps.DefaultACLProvider; 034import org.apache.curator.retry.ExponentialBackoffRetry; 035import org.apache.curator.utils.EnsurePath; 036import org.apache.curator.x.discovery.ServiceCache; 037import org.apache.curator.x.discovery.ServiceDiscovery; 038import org.apache.curator.x.discovery.ServiceDiscoveryBuilder; 039import org.apache.curator.x.discovery.ServiceInstance; 040import org.apache.curator.x.discovery.details.InstanceSerializer; 041import org.apache.oozie.ErrorCode; 042import static org.apache.oozie.service.HadoopAccessorService.KERBEROS_KEYTAB; 043import static org.apache.oozie.service.HadoopAccessorService.KERBEROS_PRINCIPAL; 044import org.apache.oozie.service.ServiceException; 045import org.apache.oozie.service.Services; 046import org.apache.zookeeper.ZooDefs.Perms; 047import org.apache.zookeeper.client.ZooKeeperSaslClient; 048import org.apache.zookeeper.data.ACL; 049import org.apache.zookeeper.data.Id; 050import org.apache.zookeeper.data.Stat; 051 052 053/** 054 * This class provides a singleton for interacting with ZooKeeper that other classes can use. It handles connecting to ZooKeeper, 055 * service discovery, and publishing metadata about this server. 056 * <p> 057 * Users of this class should call {@link ZKUtils#register(java.lang.Object)} to obtain the singleton. This will ensure that we're 058 * properly connected and ready to go with ZooKeeper. When the user is done (i.e. on shutdown), it should call 059 * {@link ZKUtils#unregister(java.lang.Object)} to let this class know; once there are no more users, this class will automatically 060 * remove itself from ZooKeeper. 061 * <p> 062 * Each Oozie Server provides metadata that can be shared with the other Oozie Servers. To keep things simple and to make it easy 063 * to add additional metadata in the future, we share a Map. They keys are defined in {@link ZKMetadataKeys}. 064 * <p> 065 * For the service discovery, the structure in ZooKeeper is /oozie.zookeeper.namespace/ZK_BASE_SERVICES_PATH/ (default is 066 * /oozie/services/). ZKUtils has a service named "servers" under which each Oozie server creates a ZNode named 067 * ${OOZIE_SERVICE_INSTANCE} (default is the hostname) that contains the metadata payload. For example, with the default settings, 068 * an Oozie server named "foo" would create a ZNode at /oozie/services/servers/foo where the foo ZNode contains the metadata. 069 * <p> 070 * If oozie.zookeeper.secure is set to true, then Oozie will (a) use jaas to connect to ZooKeeper using SASL/Kerberos based on 071 * Oozie's existing security configuration parameters (b) use/convert every znode under the namespace (including the namespace 072 * itself) to have ACLs such that only Oozie servers have access (i.e. if "service/host@REALM" is the Kerberos principal, then 073 * "service" will be used for the ACLs). 074 */ 075public class ZKUtils { 076 /** 077 * oozie-site property for specifying the ZooKeeper connection string. Comma-separated values of host:port pairs of the 078 * ZooKeeper servers. 079 */ 080 public static final String ZK_CONNECTION_STRING = "oozie.zookeeper.connection.string"; 081 /** 082 * oozie-site property for specifying the ZooKeeper namespace to use (e.g. "oozie"). All of the Oozie servers that are planning 083 * on talking to each other should have the same value for this. 084 */ 085 public static final String ZK_NAMESPACE = "oozie.zookeeper.namespace"; 086 087 /** 088 * oozie-env environment variable for specifying the Oozie instance ID 089 */ 090 public static final String OOZIE_INSTANCE_ID = "oozie.instance.id"; 091 092 /** 093 * oozie-site property for specifying that ZooKeeper is secure. 094 */ 095 public static final String ZK_SECURE = "oozie.zookeeper.secure"; 096 097 private static final String ZK_OOZIE_SERVICE = "servers"; 098 /** 099 * Services that need to put a node in zookeeper should go under here. Try to keep this area clean and organized. 100 */ 101 public static final String ZK_BASE_SERVICES_PATH = "/services"; 102 103 private static Set<Object> users = new HashSet<Object>(); 104 private CuratorFramework client = null; 105 private String zkId; 106 private long zkRegTime; 107 private ServiceDiscovery<Map> sDiscovery; 108 private ServiceCache<Map> sCache; 109 private List<ACL> saslACL; 110 private XLog log; 111 112 private static ZKUtils zk = null; 113 114 /** 115 * Private Constructor for the singleton; it connects to ZooKeeper and advertises this Oozie Server. 116 * 117 * @throws Exception 118 */ 119 private ZKUtils() throws Exception { 120 log = XLog.getLog(getClass()); 121 zkId = Services.get().getConf().get(OOZIE_INSTANCE_ID, Services.get().getConf().get("oozie.http.hostname")); 122 createClient(); 123 advertiseService(); 124 checkAndSetACLs(); 125 } 126 127 /** 128 * Classes that want to use ZooKeeper should call this method to get the ZKUtils singleton. 129 * 130 * @param user The calling class 131 * @return the ZKUtils singleton 132 * @throws Exception 133 */ 134 public static synchronized ZKUtils register(Object user) throws Exception { 135 if (zk == null) { 136 zk = new ZKUtils(); 137 } 138 // Remember the calling class so we can disconnect when everybody is done 139 users.add(user); 140 return zk; 141 } 142 143 /** 144 * Classes should call this when they are done (i.e. shutdown). 145 * 146 * @param user The calling class 147 */ 148 public synchronized void unregister(Object user) { 149 // If there are no more classes using ZooKeeper, we should teardown everything. 150 users.remove(user); 151 if (users.isEmpty() && zk != null) { 152 zk.teardown(); 153 zk = null; 154 } 155 } 156 157 private void createClient() throws Exception { 158 // Connect to the ZooKeeper server 159 RetryPolicy retryPolicy = ZKUtils.getRetryPolicy(); 160 String zkConnectionString = Services.get().getConf().get(ZK_CONNECTION_STRING, "localhost:2181"); 161 String zkNamespace = getZKNameSpace(); 162 ACLProvider aclProvider; 163 if (Services.get().getConf().getBoolean(ZK_SECURE, false)) { 164 log.info("Connecting to ZooKeeper with SASL/Kerberos and using 'sasl' ACLs"); 165 setJaasConfiguration(); 166 System.setProperty(ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY, "Client"); 167 System.setProperty("zookeeper.authProvider.1", "org.apache.zookeeper.server.auth.SASLAuthenticationProvider"); 168 saslACL = Collections.singletonList(new ACL(Perms.ALL, new Id("sasl", getServicePrincipal()))); 169 aclProvider = new SASLOwnerACLProvider(); 170 } else { 171 log.info("Connecting to ZooKeeper without authentication"); 172 aclProvider = new DefaultACLProvider(); // open to everyone 173 } 174 client = CuratorFrameworkFactory.builder() 175 .namespace(zkNamespace) 176 .connectString(zkConnectionString) 177 .retryPolicy(retryPolicy) 178 .aclProvider(aclProvider) 179 .build(); 180 client.start(); 181 } 182 183 private void advertiseService() throws Exception { 184 // Advertise on the service discovery 185 new EnsurePath(ZK_BASE_SERVICES_PATH).ensure(client.getZookeeperClient()); 186 InstanceSerializer<Map> instanceSerializer = new FixedJsonInstanceSerializer<Map>(Map.class); 187 sDiscovery = ServiceDiscoveryBuilder.builder(Map.class) 188 .basePath(ZK_BASE_SERVICES_PATH) 189 .client(client) 190 .serializer(instanceSerializer) 191 .build(); 192 sDiscovery.start(); 193 sDiscovery.registerService(getMetadataInstance()); 194 195 // Create the service discovery cache 196 sCache = sDiscovery.serviceCacheBuilder().name(ZK_OOZIE_SERVICE).build(); 197 sCache.start(); 198 199 zkRegTime = sDiscovery.queryForInstance(ZK_OOZIE_SERVICE, zkId).getRegistrationTimeUTC(); 200 } 201 202 private void unadvertiseService() throws Exception { 203 // Stop the service discovery cache 204 sCache.close(); 205 206 // Unadvertise on the service discovery 207 sDiscovery.unregisterService(getMetadataInstance()); 208 sDiscovery.close(); 209 } 210 211 private void teardown() { 212 try { 213 zk.unadvertiseService(); 214 } 215 catch (Exception ex) { 216 log.warn("Exception occurred while unadvertising: " + ex.getMessage(), ex); 217 } 218 client.close(); 219 client = null; 220 } 221 222 private ServiceInstance<Map> getMetadataInstance() throws Exception { 223 // Creates the metadata that this server is providing to ZooKeeper and other Oozie Servers 224 String url = ConfigUtils.getOozieEffectiveUrl(); 225 Map<String, String> map = new HashMap<String, String>(); 226 map.put(ZKMetadataKeys.OOZIE_ID, zkId); 227 map.put(ZKMetadataKeys.OOZIE_URL, url); 228 229 return ServiceInstance.<Map>builder() 230 .name(ZK_OOZIE_SERVICE) 231 .id(zkId) 232 .payload(map) 233 .build(); 234 } 235 236 /** 237 * Returns a list of the metadata provided by all of the Oozie Servers. Note that the metadata is cached so it may be a second 238 * or two stale. 239 * 240 * @return a List of the metadata provided by all of the Oozie Servers. 241 */ 242 public List<ServiceInstance<Map>> getAllMetaData() { 243 List<ServiceInstance<Map>> instances = null; 244 if (sCache != null) { 245 instances = sCache.getInstances(); 246 } 247 return instances; 248 } 249 250 /** 251 * Returns the ID of this Oozie Server as seen by ZooKeeper and other Oozie Servers 252 * 253 * @return the ID of this Oozie Server 254 */ 255 public String getZKId() { 256 return zkId; 257 } 258 259 /** 260 * Returns the {@link CuratorFramework} used for managing the ZooKeeper connection; it can be used by calling classes to perform 261 * more direct operations on ZooKeeper. Most of the time, this shouldn't be needed. 262 * <p> 263 * Be careful not to close the connection. 264 * 265 * @return the CuratorFramework object 266 */ 267 public CuratorFramework getClient() { 268 return client; 269 } 270 271 /** 272 * Returns the index of this Oozie Server in ZooKeeper's list of Oozie Servers (ordered by registration time) 273 * 274 * @param oozies The collection of metadata provided by all of the Oozie Servers (from calling {@link ZKUtils#getAllMetaData()) 275 * @return the index of this Oozie Server in ZooKeeper's list of Oozie Servers (ordered by registration time) 276 */ 277 public int getZKIdIndex(List<ServiceInstance<Map>> oozies) { 278 int index = 0; 279 // We don't actually have to sort all of the IDs, we can simply find out how many are before our zkId 280 for (ServiceInstance<Map> oozie : oozies) { 281 long otherRegTime = oozie.getRegistrationTimeUTC(); 282 if (otherRegTime < zkRegTime) { 283 index++; 284 } 285 } 286 return index; 287 } 288 289 private void checkAndSetACLs() throws Exception { 290 if (Services.get().getConf().getBoolean(ZK_SECURE, false)) { 291 // If znodes were previously created without security enabled, and now it is, we need to go through all existing znodes 292 // and set the ACLs for them 293 // We can't get the namespace znode through curator; have to go through zk client 294 String namespace = "/" + client.getNamespace(); 295 if (client.getZookeeperClient().getZooKeeper().exists(namespace, null) != null) { 296 List<ACL> acls = client.getZookeeperClient().getZooKeeper().getACL(namespace, new Stat()); 297 if (!acls.get(0).getId().getScheme().equals("sasl")) { 298 log.info("'sasl' ACLs not set; setting..."); 299 List<String> children = client.getZookeeperClient().getZooKeeper().getChildren(namespace, null); 300 for (String child : children) { 301 checkAndSetACLs(child); 302 } 303 client.getZookeeperClient().getZooKeeper().setACL(namespace, saslACL, -1); 304 } 305 } 306 } 307 } 308 309 private void checkAndSetACLs(String path) throws Exception { 310 List<String> children = client.getChildren().forPath(path); 311 for (String child : children) { 312 checkAndSetACLs(path + "/" + child); 313 } 314 client.setACL().withACL(saslACL).forPath(path); 315 } 316 317 // This gets ignored during most tests, see ZKXTestCaseWithSecurity#setupZKServer() 318 private void setJaasConfiguration() throws ServiceException, IOException { 319 String keytabFile = Services.get().getConf().get(KERBEROS_KEYTAB, System.getProperty("user.home") + "/oozie.keytab").trim(); 320 if (keytabFile.length() == 0) { 321 throw new ServiceException(ErrorCode.E0026, KERBEROS_KEYTAB); 322 } 323 String principal = Services.get().getConf().get(KERBEROS_PRINCIPAL, "oozie/localhost@LOCALHOST"); 324 if (principal.length() == 0) { 325 throw new ServiceException(ErrorCode.E0026, KERBEROS_PRINCIPAL); 326 } 327 328 // This is equivalent to writing a jaas.conf file and setting the system property, "java.security.auth.login.config", to 329 // point to it (but this way we don't have to write a file, and it works better for the tests) 330 JaasConfiguration.addEntry("Client", principal, keytabFile); 331 Configuration.setConfiguration(JaasConfiguration.getInstance()); 332 } 333 334 private String getServicePrincipal() throws ServiceException { 335 String principal = Services.get().getConf().get(KERBEROS_PRINCIPAL, "oozie/localhost@LOCALHOST"); 336 if (principal.length() == 0) { 337 throw new ServiceException(ErrorCode.E0026, KERBEROS_PRINCIPAL); 338 } 339 return principal.split("[/@]")[0]; 340 } 341 342 /** 343 * Useful for tests to get the registered classes 344 * 345 * @return the set of registered classes 346 */ 347 @VisibleForTesting 348 public static Set<Object> getUsers() { 349 return users; 350 } 351 352 /** 353 * Keys used in the metadata provided by each Oozie Server to ZooKeeper and other Oozie Servers 354 */ 355 public abstract class ZKMetadataKeys { 356 /** 357 * The ID of the Oozie Server 358 */ 359 public static final String OOZIE_ID = "OOZIE_ID"; 360 /** 361 * The URL of the Oozie Server 362 */ 363 public static final String OOZIE_URL = "OOZIE_URL"; 364 } 365 366 /** 367 * Simple implementation of an {@link ACLProvider} that simply returns {@link #saslACL}. 368 */ 369 public class SASLOwnerACLProvider implements ACLProvider { 370 371 @Override 372 public List<ACL> getDefaultAcl() { 373 return saslACL; 374 } 375 376 @Override 377 public List<ACL> getAclForPath(String path) { 378 return saslACL; 379 } 380 } 381 382 /** 383 * Returns configured zk namesapces 384 * @return oozie.zookeeper.namespace 385 */ 386 public static String getZKNameSpace() { 387 return Services.get().getConf().get(ZK_NAMESPACE, "oozie"); 388 } 389 390 /** 391 * Returns retry policy 392 * 393 * @return RetryPolicy 394 */ 395 public static RetryPolicy getRetryPolicy() { 396 return new ExponentialBackoffRetry(1000, 3); 397 } 398}