001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.dependency; 020 021import java.io.IOException; 022import java.net.URI; 023import java.net.URISyntaxException; 024import java.security.PrivilegedExceptionAction; 025import java.util.Arrays; 026import java.util.HashMap; 027import java.util.HashSet; 028import java.util.List; 029import java.util.Map; 030import java.util.Set; 031 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.hive.conf.HiveConf; 034import org.apache.hadoop.security.UserGroupInformation; 035import org.apache.hive.hcatalog.api.ConnectionFailureException; 036import org.apache.hive.hcatalog.api.HCatClient; 037import org.apache.hive.hcatalog.api.HCatPartition; 038import org.apache.hive.hcatalog.common.HCatException; 039import org.apache.oozie.ErrorCode; 040import org.apache.oozie.action.hadoop.HCatLauncherURIHandler; 041import org.apache.oozie.action.hadoop.LauncherURIHandler; 042import org.apache.oozie.dependency.hcat.HCatMessageHandler; 043import org.apache.oozie.service.HCatAccessorException; 044import org.apache.oozie.service.HCatAccessorService; 045import org.apache.oozie.service.PartitionDependencyManagerService; 046import org.apache.oozie.service.Services; 047import org.apache.oozie.service.URIHandlerService; 048import org.apache.oozie.util.HCatURI; 049import org.apache.oozie.util.XLog; 050import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier; 051import org.apache.hadoop.io.Text; 052import org.apache.hadoop.security.token.Token; 053 054public class HCatURIHandler implements URIHandler { 055 056 private Set<String> supportedSchemes; 057 private Map<String, DependencyType> dependencyTypes; 058 private List<Class<?>> classesToShip; 059 060 @Override 061 public void init(Configuration conf) { 062 dependencyTypes = new HashMap<String, DependencyType>(); 063 supportedSchemes = new HashSet<String>(); 064 String[] schemes = conf.getStrings(URIHandlerService.URI_HANDLER_SUPPORTED_SCHEMES_PREFIX 065 + this.getClass().getSimpleName() + URIHandlerService.URI_HANDLER_SUPPORTED_SCHEMES_SUFFIX, "hcat"); 066 supportedSchemes.addAll(Arrays.asList(schemes)); 067 classesToShip = new HCatLauncherURIHandler().getClassesForLauncher(); 068 } 069 070 @Override 071 public Set<String> getSupportedSchemes() { 072 return supportedSchemes; 073 } 074 075 @Override 076 public Class<? extends LauncherURIHandler> getLauncherURIHandlerClass() { 077 return HCatLauncherURIHandler.class; 078 } 079 080 @Override 081 public List<Class<?>> getClassesForLauncher() { 082 return classesToShip; 083 } 084 085 @Override 086 public DependencyType getDependencyType(URI uri) throws URIHandlerException { 087 DependencyType depType = DependencyType.PULL; 088 // Not initializing in constructor as this will be part of oozie.services.ext 089 // and will be initialized after URIHandlerService 090 HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class); 091 if (hcatService != null) { 092 depType = dependencyTypes.get(uri.getAuthority()); 093 if (depType == null) { 094 depType = hcatService.isKnownPublisher(uri) ? DependencyType.PUSH : DependencyType.PULL; 095 dependencyTypes.put(uri.getAuthority(), depType); 096 } 097 } 098 return depType; 099 } 100 101 @Override 102 public void registerForNotification(URI uri, Configuration conf, String user, String actionID) 103 throws URIHandlerException { 104 HCatURI hcatURI; 105 try { 106 hcatURI = new HCatURI(uri); 107 } 108 catch (URISyntaxException e) { 109 throw new URIHandlerException(ErrorCode.E0906, uri, e); 110 } 111 HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class); 112 if (!hcatService.isRegisteredForNotification(hcatURI)) { 113 HCatClient client = getHCatClient(uri, conf); 114 try { 115 String topic = client.getMessageBusTopicName(hcatURI.getDb(), hcatURI.getTable()); 116 if (topic == null) { 117 return; 118 } 119 hcatService.registerForNotification(hcatURI, topic, new HCatMessageHandler(uri.getAuthority())); 120 } 121 catch (HCatException e) { 122 throw new HCatAccessorException(ErrorCode.E1501, e); 123 } 124 finally { 125 closeQuietly(client, null, true); 126 } 127 } 128 PartitionDependencyManagerService pdmService = Services.get().get(PartitionDependencyManagerService.class); 129 pdmService.addMissingDependency(hcatURI, actionID); 130 } 131 132 @Override 133 public boolean unregisterFromNotification(URI uri, String actionID) { 134 HCatURI hcatURI; 135 try { 136 hcatURI = new HCatURI(uri); 137 } 138 catch (URISyntaxException e) { 139 throw new RuntimeException(e); // Unexpected at this point 140 } 141 PartitionDependencyManagerService pdmService = Services.get().get(PartitionDependencyManagerService.class); 142 return pdmService.removeMissingDependency(hcatURI, actionID); 143 } 144 145 @Override 146 public Context getContext(URI uri, Configuration conf, String user, boolean readOnly) 147 throws URIHandlerException { 148 HCatContext context = null; 149 //read operations are allowed for any user in HCat and so accessing as Oozie server itself 150 //For write operations, perform doAs as user 151 if (readOnly) { 152 HCatClient client = getHCatClient(uri, conf); 153 context = new HCatContext(conf, user, client); 154 } 155 else { 156 HCatClientWithToken client = getHCatClient(uri, conf, user); 157 context = new HCatContext(conf, user, client); 158 } 159 return context; 160 } 161 162 @Override 163 public boolean exists(URI uri, Context context) throws URIHandlerException { 164 HCatClient client = ((HCatContext) context).getHCatClient(); 165 return exists(uri, client, false); 166 } 167 168 @Override 169 public boolean exists(URI uri, Configuration conf, String user) throws URIHandlerException { 170 HCatClient client = getHCatClient(uri, conf); 171 return exists(uri, client, true); 172 } 173 174 @Override 175 public void delete(URI uri, Context context) throws URIHandlerException { 176 HCatClient client = ((HCatContext) context).getHCatClient(); 177 try { 178 HCatURI hcatUri = new HCatURI(uri); 179 client.dropPartitions(hcatUri.getDb(), hcatUri.getTable(), hcatUri.getPartitionMap(), true); 180 } 181 catch (URISyntaxException e) { 182 throw new HCatAccessorException(ErrorCode.E1501, e); 183 } 184 catch (HCatException e) { 185 throw new HCatAccessorException(ErrorCode.E1501, e); 186 } 187 } 188 189 @Override 190 public void delete(URI uri, Configuration conf, String user) throws URIHandlerException { 191 HCatClientWithToken client = null; 192 try { 193 HCatURI hcatUri = new HCatURI(uri); 194 client = getHCatClient(uri, conf, user); 195 client.getHCatClient().dropPartitions(hcatUri.getDb(), hcatUri.getTable(), hcatUri.getPartitionMap(), true); 196 } 197 catch (URISyntaxException e){ 198 throw new HCatAccessorException(ErrorCode.E1501, e); 199 } 200 catch (HCatException e) { 201 throw new HCatAccessorException(ErrorCode.E1501, e); 202 } 203 finally { 204 closeQuietly(client.getHCatClient(), client.getDelegationToken(),true); 205 } 206 } 207 208 @Override 209 public String getURIWithDoneFlag(String uri, String doneFlag) throws URIHandlerException { 210 return uri; 211 } 212 213 @Override 214 public String getURIWithoutDoneFlag(String uri, String doneFlag) throws URIHandlerException { 215 return uri; 216 } 217 218 @Override 219 public void validate(String uri) throws URIHandlerException { 220 try { 221 new HCatURI(uri); // will fail if uri syntax is incorrect 222 } 223 catch (URISyntaxException e) { 224 throw new URIHandlerException(ErrorCode.E0906, uri, e); 225 } 226 227 } 228 229 @Override 230 public void destroy() { 231 232 } 233 234 private HiveConf getHiveConf(URI uri, Configuration conf){ 235 HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class); 236 if (hcatService.getHCatConf() != null) { 237 conf = hcatService.getHCatConf(); 238 } 239 HiveConf hiveConf = new HiveConf(conf, this.getClass()); 240 String serverURI = getMetastoreConnectURI(uri); 241 if (!serverURI.equals("")) { 242 hiveConf.set("hive.metastore.local", "false"); 243 } 244 hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, serverURI); 245 return hiveConf; 246 } 247 248 private HCatClient getHCatClient(URI uri, Configuration conf) throws HCatAccessorException { 249 HiveConf hiveConf = getHiveConf(uri, conf); 250 try { 251 XLog.getLog(HCatURIHandler.class).info("Creating HCatClient for login_user [{0}] and server [{1}] ", 252 UserGroupInformation.getLoginUser(), hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname)); 253 return HCatClient.create(hiveConf); 254 } 255 catch (HCatException e) { 256 throw new HCatAccessorException(ErrorCode.E1501, e); 257 } 258 catch (IOException e) { 259 throw new HCatAccessorException(ErrorCode.E1501, e); 260 } 261 } 262 263 private HCatClientWithToken getHCatClient(URI uri, Configuration conf, String user) 264 throws HCatAccessorException { 265 final HiveConf hiveConf = getHiveConf(uri, conf); 266 String delegationToken = null; 267 try { 268 // Get UGI to doAs() as the specified user 269 UserGroupInformation ugi = UserGroupInformation.createProxyUser(user, UserGroupInformation.getLoginUser()); 270 // Define the label for the Delegation Token for the HCat instance. 271 hiveConf.set("hive.metastore.token.signature", "HCatTokenSignature"); 272 if (hiveConf.getBoolean(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname, false)) { 273 HCatClient tokenClient = null; 274 try { 275 // Retrieve Delegation token for HCatalog 276 tokenClient = HCatClient.create(hiveConf); 277 delegationToken = tokenClient.getDelegationToken(user, UserGroupInformation.getLoginUser() 278 .getUserName()); 279 // Store Delegation token in the UGI 280 Token<DelegationTokenIdentifier> token = new Token<DelegationTokenIdentifier>(); 281 token.decodeFromUrlString(delegationToken); 282 token.setService(new Text(hiveConf.get("hive.metastore.token.signature"))); 283 ugi.addToken(token); 284 } 285 finally { 286 if (tokenClient != null) { 287 tokenClient.close(); 288 } 289 } 290 } 291 XLog.getLog(HCatURIHandler.class).info( 292 "Creating HCatClient for user [{0}] login_user [{1}] and server [{2}] ", user, 293 UserGroupInformation.getLoginUser(), hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname)); 294 HCatClient hcatClient = ugi.doAs(new PrivilegedExceptionAction<HCatClient>() { 295 @Override 296 public HCatClient run() throws Exception { 297 HCatClient client = HCatClient.create(hiveConf); 298 return client; 299 } 300 }); 301 HCatClientWithToken clientWithToken = new HCatClientWithToken(hcatClient, delegationToken); 302 return clientWithToken; 303 } 304 catch (IOException e) { 305 throw new HCatAccessorException(ErrorCode.E1501, e.getMessage()); 306 } 307 catch (Exception e) { 308 throw new HCatAccessorException(ErrorCode.E1501, e.getMessage()); 309 } 310 } 311 312 private String getMetastoreConnectURI(URI uri) { 313 String metastoreURI; 314 // For unit tests 315 if (uri.getAuthority().equals("unittest-local")) { 316 metastoreURI = ""; 317 } 318 else { 319 // Hardcoding hcat to thrift mapping till support for webhcat(templeton) 320 // is added 321 metastoreURI = "thrift://" + uri.getAuthority(); 322 } 323 return metastoreURI; 324 } 325 326 private boolean exists(URI uri, HCatClient client, boolean closeClient) throws HCatAccessorException { 327 try { 328 HCatURI hcatURI = new HCatURI(uri.toString()); 329 List<HCatPartition> partitions = client.getPartitions(hcatURI.getDb(), hcatURI.getTable(), 330 hcatURI.getPartitionMap()); 331 return (partitions != null && !partitions.isEmpty()); 332 } 333 catch (ConnectionFailureException e) { 334 throw new HCatAccessorException(ErrorCode.E1501, e); 335 } 336 catch (HCatException e) { 337 throw new HCatAccessorException(ErrorCode.E0902, e); 338 } 339 catch (URISyntaxException e) { 340 throw new HCatAccessorException(ErrorCode.E0902, e); 341 } 342 finally { 343 closeQuietly(client, null, closeClient); 344 } 345 } 346 347 private void closeQuietly(HCatClient client, String delegationToken, boolean close) { 348 if (close && client != null) { 349 try { 350 if(delegationToken != null && !delegationToken.isEmpty()) { 351 client.cancelDelegationToken(delegationToken); 352 } 353 client.close(); 354 } 355 catch (Exception ignore) { 356 XLog.getLog(HCatURIHandler.class).warn("Error closing hcat client", ignore); 357 } 358 } 359 } 360 361 class HCatClientWithToken { 362 private HCatClient hcatClient; 363 private String token; 364 365 public HCatClientWithToken(HCatClient client, String delegationToken) { 366 this.hcatClient = client; 367 this.token = delegationToken; 368 } 369 370 public HCatClient getHCatClient() { 371 return this.hcatClient; 372 } 373 374 public String getDelegationToken() { 375 return this.token; 376 } 377 } 378 379 static class HCatContext extends Context { 380 381 private HCatClient hcatClient; 382 private String delegationToken; 383 384 /** 385 * Create a HCatContext that can be used to access a hcat URI 386 * 387 * @param conf Configuration to access the URI 388 * @param user name of the user the URI should be accessed as 389 * @param hcatClient HCatClient to talk to hcatalog server 390 */ 391 public HCatContext(Configuration conf, String user, HCatClient hcatClient) { 392 super(conf, user); 393 this.hcatClient = hcatClient; 394 } 395 396 public HCatContext(Configuration conf, String user, HCatClientWithToken hcatClient) { 397 super(conf, user); 398 this.hcatClient = hcatClient.getHCatClient(); 399 this.delegationToken = hcatClient.getDelegationToken(); 400 } 401 402 /** 403 * Get the HCatClient to talk to hcatalog server 404 * 405 * @return HCatClient to talk to hcatalog server 406 */ 407 public HCatClient getHCatClient() { 408 return hcatClient; 409 } 410 411 /** 412 * Get the Delegation token to access HCat 413 * 414 * @return delegationToken 415 */ 416 public String getDelegationToken() { 417 return delegationToken; 418 } 419 420 @Override 421 public void destroy() { 422 try { 423 if (delegationToken != null && !delegationToken.isEmpty()) { 424 hcatClient.cancelDelegationToken(delegationToken); 425 } 426 delegationToken = null; 427 } 428 catch (Exception ignore) { 429 XLog.getLog(HCatContext.class).warn("Error cancelling delegation token", ignore); 430 } 431 try { 432 hcatClient.close(); 433 } 434 catch (Exception ignore) { 435 XLog.getLog(HCatContext.class).warn("Error closing hcat client", ignore); 436 } 437 } 438 439 } 440 441}