001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.dependency; 020 021import java.io.IOException; 022import java.net.URI; 023import java.net.URISyntaxException; 024import java.security.PrivilegedExceptionAction; 025import java.util.Arrays; 026import java.util.HashMap; 027import java.util.HashSet; 028import java.util.List; 029import java.util.Map; 030import java.util.Set; 031 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.hive.conf.HiveConf; 034import org.apache.hadoop.security.UserGroupInformation; 035import org.apache.hive.hcatalog.api.ConnectionFailureException; 036import org.apache.hive.hcatalog.api.HCatClient; 037import org.apache.hive.hcatalog.api.HCatPartition; 038import org.apache.hive.hcatalog.common.HCatException; 039import org.apache.oozie.ErrorCode; 040import org.apache.oozie.action.hadoop.HCatLauncherURIHandler; 041import org.apache.oozie.action.hadoop.LauncherURIHandler; 042import org.apache.oozie.dependency.hcat.HCatMessageHandler; 043import org.apache.oozie.service.HCatAccessorException; 044import org.apache.oozie.service.HCatAccessorService; 045import org.apache.oozie.service.PartitionDependencyManagerService; 046import org.apache.oozie.service.Services; 047import org.apache.oozie.service.URIHandlerService; 048import org.apache.oozie.util.HCatURI; 049import org.apache.oozie.util.XLog; 050import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier; 051import org.apache.hadoop.io.Text; 052import org.apache.hadoop.security.token.Token; 053 054public class HCatURIHandler implements URIHandler { 055 056 private Set<String> supportedSchemes; 057 private Map<String, DependencyType> dependencyTypes; 058 private List<Class<?>> classesToShip; 059 060 @Override 061 public void init(Configuration conf) { 062 dependencyTypes = new HashMap<String, DependencyType>(); 063 supportedSchemes = new HashSet<String>(); 064 String[] schemes = conf.getStrings(URIHandlerService.URI_HANDLER_SUPPORTED_SCHEMES_PREFIX 065 + this.getClass().getSimpleName() + URIHandlerService.URI_HANDLER_SUPPORTED_SCHEMES_SUFFIX, "hcat"); 066 supportedSchemes.addAll(Arrays.asList(schemes)); 067 classesToShip = new HCatLauncherURIHandler().getClassesForLauncher(); 068 } 069 070 @Override 071 public Set<String> getSupportedSchemes() { 072 return supportedSchemes; 073 } 074 075 @Override 076 public Class<? extends LauncherURIHandler> getLauncherURIHandlerClass() { 077 return HCatLauncherURIHandler.class; 078 } 079 080 @Override 081 public List<Class<?>> getClassesForLauncher() { 082 return classesToShip; 083 } 084 085 @Override 086 public DependencyType getDependencyType(URI uri) throws URIHandlerException { 087 DependencyType depType = DependencyType.PULL; 088 // Not initializing in constructor as this will be part of oozie.services.ext 089 // and will be initialized after URIHandlerService 090 HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class); 091 if (hcatService != null) { 092 depType = dependencyTypes.get(uri.getAuthority()); 093 if (depType == null) { 094 depType = hcatService.isKnownPublisher(uri) ? DependencyType.PUSH : DependencyType.PULL; 095 dependencyTypes.put(uri.getAuthority(), depType); 096 } 097 } 098 return depType; 099 } 100 101 @Override 102 public void registerForNotification(URI uri, Configuration conf, String user, String actionID) 103 throws URIHandlerException { 104 HCatURI hcatURI; 105 try { 106 hcatURI = new HCatURI(uri); 107 } 108 catch (URISyntaxException e) { 109 throw new URIHandlerException(ErrorCode.E0906, uri, e); 110 } 111 HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class); 112 if (!hcatService.isRegisteredForNotification(hcatURI)) { 113 HCatClient client = getHCatClient(uri, conf); 114 try { 115 String topic = client.getMessageBusTopicName(hcatURI.getDb(), hcatURI.getTable()); 116 if (topic == null) { 117 return; 118 } 119 hcatService.registerForNotification(hcatURI, topic, new HCatMessageHandler(uri.getAuthority())); 120 } 121 catch (HCatException e) { 122 throw new HCatAccessorException(ErrorCode.E1501, e); 123 } 124 finally { 125 closeQuietly(client, null, true); 126 } 127 } 128 PartitionDependencyManagerService pdmService = Services.get().get(PartitionDependencyManagerService.class); 129 pdmService.addMissingDependency(hcatURI, actionID); 130 } 131 132 @Override 133 public boolean unregisterFromNotification(URI uri, String actionID) { 134 HCatURI hcatURI; 135 try { 136 hcatURI = new HCatURI(uri); 137 } 138 catch (URISyntaxException e) { 139 throw new RuntimeException(e); // Unexpected at this point 140 } 141 PartitionDependencyManagerService pdmService = Services.get().get(PartitionDependencyManagerService.class); 142 return pdmService.removeMissingDependency(hcatURI, actionID); 143 } 144 145 @Override 146 public Context getContext(URI uri, Configuration conf, String user, boolean readOnly) 147 throws URIHandlerException { 148 HCatContext context = null; 149 //read operations are allowed for any user in HCat and so accessing as Oozie server itself 150 //For write operations, perform doAs as user 151 if (readOnly) { 152 HCatClient client = getHCatClient(uri, conf); 153 context = new HCatContext(conf, user, client); 154 } 155 else { 156 HCatClientWithToken client = getHCatClient(uri, conf, user); 157 context = new HCatContext(conf, user, client); 158 } 159 return context; 160 } 161 162 @Override 163 public boolean exists(URI uri, Context context) throws URIHandlerException { 164 HCatClient client = ((HCatContext) context).getHCatClient(); 165 return exists(uri, client, false); 166 } 167 168 @Override 169 public boolean exists(URI uri, Configuration conf, String user) throws URIHandlerException { 170 HCatClient client = getHCatClient(uri, conf); 171 return exists(uri, client, true); 172 } 173 174 @Override 175 public void delete(URI uri, Context context) throws URIHandlerException { 176 HCatClient client = ((HCatContext) context).getHCatClient(); 177 try { 178 HCatURI hcatUri = new HCatURI(uri); 179 if (!hcatUri.getPartitionMap().isEmpty()) { 180 client.dropPartitions(hcatUri.getDb(), hcatUri.getTable(), hcatUri.getPartitionMap(), true); 181 } else { 182 client.dropTable(hcatUri.getDb(), hcatUri.getTable(), true); 183 } 184 } 185 catch (URISyntaxException e) { 186 throw new HCatAccessorException(ErrorCode.E1501, e); 187 } 188 catch (HCatException e) { 189 throw new HCatAccessorException(ErrorCode.E1501, e); 190 } 191 } 192 193 @Override 194 public void delete(URI uri, Configuration conf, String user) throws URIHandlerException { 195 HCatClientWithToken client = null; 196 HCatClient hCatClient = null; 197 try { 198 HCatURI hcatUri = new HCatURI(uri); 199 client = getHCatClient(uri, conf, user); 200 hCatClient = client.getHCatClient(); 201 if (!hcatUri.getPartitionMap().isEmpty()) { 202 hCatClient.dropPartitions(hcatUri.getDb(), hcatUri.getTable(), hcatUri.getPartitionMap(), true); 203 } else { 204 hCatClient.dropTable(hcatUri.getDb(), hcatUri.getTable(), true); 205 } 206 } 207 catch (URISyntaxException e){ 208 throw new HCatAccessorException(ErrorCode.E1501, e); 209 } 210 catch (HCatException e) { 211 throw new HCatAccessorException(ErrorCode.E1501, e); 212 } 213 finally { 214 closeQuietly(hCatClient, client != null ? client.getDelegationToken() : null, true); 215 } 216 } 217 218 @Override 219 public String getURIWithDoneFlag(String uri, String doneFlag) throws URIHandlerException { 220 return uri; 221 } 222 223 @Override 224 public String getURIWithoutDoneFlag(String uri, String doneFlag) throws URIHandlerException { 225 return uri; 226 } 227 228 @Override 229 public void validate(String uri) throws URIHandlerException { 230 try { 231 new HCatURI(uri); // will fail if uri syntax is incorrect 232 } 233 catch (URISyntaxException e) { 234 throw new URIHandlerException(ErrorCode.E0906, uri, e); 235 } 236 237 } 238 239 @Override 240 public void destroy() { 241 242 } 243 244 private HiveConf getHiveConf(URI uri, Configuration conf) throws HCatAccessorException { 245 HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class); 246 if (hcatService.getHCatConf() != null) { 247 conf = hcatService.getHCatConf(); 248 } 249 HiveConf hiveConf = new HiveConf(conf, this.getClass()); 250 String serverURI = getMetastoreConnectURI(uri); 251 if (!serverURI.equals("")) { 252 hiveConf.set("hive.metastore.local", "false"); 253 } 254 hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, serverURI); 255 return hiveConf; 256 } 257 258 private HCatClient getHCatClient(URI uri, Configuration conf) throws HCatAccessorException { 259 HiveConf hiveConf = getHiveConf(uri, conf); 260 try { 261 XLog.getLog(HCatURIHandler.class).info("Creating HCatClient for login_user [{0}] and server [{1}] ", 262 UserGroupInformation.getLoginUser(), hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname)); 263 return HCatClient.create(hiveConf); 264 } 265 catch (HCatException e) { 266 throw new HCatAccessorException(ErrorCode.E1501, e); 267 } 268 catch (IOException e) { 269 throw new HCatAccessorException(ErrorCode.E1501, e); 270 } 271 } 272 273 private HCatClientWithToken getHCatClient(URI uri, Configuration conf, String user) 274 throws HCatAccessorException { 275 final HiveConf hiveConf = getHiveConf(uri, conf); 276 String delegationToken = null; 277 try { 278 // Get UGI to doAs() as the specified user 279 UserGroupInformation ugi = UserGroupInformation.createProxyUser(user, UserGroupInformation.getLoginUser()); 280 // Define the label for the Delegation Token for the HCat instance. 281 hiveConf.set("hive.metastore.token.signature", "HCatTokenSignature"); 282 if (hiveConf.getBoolean(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname, false)) { 283 HCatClient tokenClient = null; 284 try { 285 // Retrieve Delegation token for HCatalog 286 tokenClient = HCatClient.create(hiveConf); 287 delegationToken = tokenClient.getDelegationToken(user, UserGroupInformation.getLoginUser() 288 .getUserName()); 289 // Store Delegation token in the UGI 290 Token<DelegationTokenIdentifier> token = new Token<DelegationTokenIdentifier>(); 291 token.decodeFromUrlString(delegationToken); 292 token.setService(new Text(hiveConf.get("hive.metastore.token.signature"))); 293 ugi.addToken(token); 294 } 295 finally { 296 if (tokenClient != null) { 297 tokenClient.close(); 298 } 299 } 300 } 301 XLog.getLog(HCatURIHandler.class).info( 302 "Creating HCatClient for user [{0}] login_user [{1}] and server [{2}] ", user, 303 UserGroupInformation.getLoginUser(), hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname)); 304 HCatClient hcatClient = ugi.doAs(new PrivilegedExceptionAction<HCatClient>() { 305 @Override 306 public HCatClient run() throws Exception { 307 HCatClient client = HCatClient.create(hiveConf); 308 return client; 309 } 310 }); 311 HCatClientWithToken clientWithToken = new HCatClientWithToken(hcatClient, delegationToken); 312 return clientWithToken; 313 } 314 catch (IOException e) { 315 throw new HCatAccessorException(ErrorCode.E1501, e.getMessage()); 316 } 317 catch (Exception e) { 318 throw new HCatAccessorException(ErrorCode.E1501, e.getMessage()); 319 } 320 } 321 322 private String getMetastoreConnectURI(URI uri) throws HCatAccessorException { 323 String metastoreURI; 324 // For unit tests 325 if (uri.getAuthority().equals("unittest-local")) { 326 metastoreURI = ""; 327 } 328 else { 329 // Hardcoding hcat to thrift mapping till support for webhcat(templeton) 330 // is added 331 HCatURI hCatURI; 332 try { 333 hCatURI = new HCatURI(uri.toString()); 334 metastoreURI = hCatURI.getServerEndPointWithScheme("thrift"); 335 } catch (URISyntaxException e) { 336 throw new HCatAccessorException(ErrorCode.E0902, e); 337 } 338 } 339 return metastoreURI; 340 } 341 342 private boolean exists(URI uri, HCatClient client, boolean closeClient) throws HCatAccessorException { 343 try { 344 boolean flag; 345 HCatURI hcatURI = new HCatURI(uri.toString()); 346 if (!hcatURI.getPartitionMap().isEmpty()) { 347 List<HCatPartition> partitions = client.getPartitions(hcatURI.getDb(), hcatURI.getTable(), 348 hcatURI.getPartitionMap()); 349 flag = partitions != null && !partitions.isEmpty(); 350 } else { 351 List<String> tables = client.listTableNamesByPattern(hcatURI.getDb(), hcatURI.getTable()); 352 flag = tables != null && !tables.isEmpty(); 353 } 354 return (flag); 355 } 356 catch (ConnectionFailureException e) { 357 throw new HCatAccessorException(ErrorCode.E1501, e); 358 } 359 catch (HCatException e) { 360 throw new HCatAccessorException(ErrorCode.E0902, e); 361 } 362 catch (URISyntaxException e) { 363 throw new HCatAccessorException(ErrorCode.E0902, e); 364 } 365 finally { 366 closeQuietly(client, null, closeClient); 367 } 368 } 369 370 private void closeQuietly(HCatClient client, String delegationToken, boolean close) { 371 if (close && client != null) { 372 try { 373 if(delegationToken != null && !delegationToken.isEmpty()) { 374 client.cancelDelegationToken(delegationToken); 375 } 376 client.close(); 377 } 378 catch (Exception ignore) { 379 XLog.getLog(HCatURIHandler.class).warn("Error closing hcat client", ignore); 380 } 381 } 382 } 383 384 class HCatClientWithToken { 385 private HCatClient hcatClient; 386 private String token; 387 388 public HCatClientWithToken(HCatClient client, String delegationToken) { 389 this.hcatClient = client; 390 this.token = delegationToken; 391 } 392 393 public HCatClient getHCatClient() { 394 return this.hcatClient; 395 } 396 397 public String getDelegationToken() { 398 return this.token; 399 } 400 } 401 402 static class HCatContext extends Context { 403 404 private HCatClient hcatClient; 405 private String delegationToken; 406 407 /** 408 * Create a HCatContext that can be used to access a hcat URI 409 * 410 * @param conf Configuration to access the URI 411 * @param user name of the user the URI should be accessed as 412 * @param hcatClient HCatClient to talk to hcatalog server 413 */ 414 public HCatContext(Configuration conf, String user, HCatClient hcatClient) { 415 super(conf, user); 416 this.hcatClient = hcatClient; 417 } 418 419 public HCatContext(Configuration conf, String user, HCatClientWithToken hcatClient) { 420 super(conf, user); 421 this.hcatClient = hcatClient.getHCatClient(); 422 this.delegationToken = hcatClient.getDelegationToken(); 423 } 424 425 /** 426 * Get the HCatClient to talk to hcatalog server 427 * 428 * @return HCatClient to talk to hcatalog server 429 */ 430 public HCatClient getHCatClient() { 431 return hcatClient; 432 } 433 434 /** 435 * Get the Delegation token to access HCat 436 * 437 * @return delegationToken 438 */ 439 public String getDelegationToken() { 440 return delegationToken; 441 } 442 443 @Override 444 public void destroy() { 445 try { 446 if (delegationToken != null && !delegationToken.isEmpty()) { 447 hcatClient.cancelDelegationToken(delegationToken); 448 } 449 delegationToken = null; 450 } 451 catch (Exception ignore) { 452 XLog.getLog(HCatContext.class).warn("Error cancelling delegation token", ignore); 453 } 454 try { 455 hcatClient.close(); 456 } 457 catch (Exception ignore) { 458 XLog.getLog(HCatContext.class).warn("Error closing hcat client", ignore); 459 } 460 } 461 462 } 463 464}