001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.dependency; 020 021import java.io.IOException; 022import java.net.URI; 023import java.net.URISyntaxException; 024import java.security.PrivilegedExceptionAction; 025import java.util.Arrays; 026import java.util.HashMap; 027import java.util.HashSet; 028import java.util.List; 029import java.util.Map; 030import java.util.Set; 031 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.hive.conf.HiveConf; 034import org.apache.hadoop.hive.shims.ShimLoader; 035import org.apache.hadoop.security.UserGroupInformation; 036import org.apache.hive.hcatalog.api.ConnectionFailureException; 037import org.apache.hive.hcatalog.api.HCatClient; 038import org.apache.hive.hcatalog.api.HCatPartition; 039import org.apache.hive.hcatalog.common.HCatException; 040import org.apache.oozie.ErrorCode; 041import org.apache.oozie.action.hadoop.HCatLauncherURIHandler; 042import org.apache.oozie.action.hadoop.LauncherURIHandler; 043import org.apache.oozie.dependency.hcat.HCatMessageHandler; 044import org.apache.oozie.service.HCatAccessorException; 045import org.apache.oozie.service.HCatAccessorService; 046import org.apache.oozie.service.PartitionDependencyManagerService; 047import org.apache.oozie.service.Services; 048import org.apache.oozie.service.URIHandlerService; 049import org.apache.oozie.util.HCatURI; 050import org.apache.oozie.util.XLog; 051 052public class HCatURIHandler implements URIHandler { 053 054 private Set<String> supportedSchemes; 055 private Map<String, DependencyType> dependencyTypes; 056 private List<Class<?>> classesToShip; 057 058 @Override 059 public void init(Configuration conf) { 060 dependencyTypes = new HashMap<String, DependencyType>(); 061 supportedSchemes = new HashSet<String>(); 062 String[] schemes = conf.getStrings(URIHandlerService.URI_HANDLER_SUPPORTED_SCHEMES_PREFIX 063 + this.getClass().getSimpleName() + URIHandlerService.URI_HANDLER_SUPPORTED_SCHEMES_SUFFIX, "hcat"); 064 supportedSchemes.addAll(Arrays.asList(schemes)); 065 classesToShip = new HCatLauncherURIHandler().getClassesForLauncher(); 066 } 067 068 @Override 069 public Set<String> getSupportedSchemes() { 070 return supportedSchemes; 071 } 072 073 @Override 074 public Class<? extends LauncherURIHandler> getLauncherURIHandlerClass() { 075 return HCatLauncherURIHandler.class; 076 } 077 078 @Override 079 public List<Class<?>> getClassesForLauncher() { 080 return classesToShip; 081 } 082 083 @Override 084 public DependencyType getDependencyType(URI uri) throws URIHandlerException { 085 DependencyType depType = DependencyType.PULL; 086 // Not initializing in constructor as this will be part of oozie.services.ext 087 // and will be initialized after URIHandlerService 088 HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class); 089 if (hcatService != null) { 090 depType = dependencyTypes.get(uri.getAuthority()); 091 if (depType == null) { 092 depType = hcatService.isKnownPublisher(uri) ? DependencyType.PUSH : DependencyType.PULL; 093 dependencyTypes.put(uri.getAuthority(), depType); 094 } 095 } 096 return depType; 097 } 098 099 @Override 100 public void registerForNotification(URI uri, Configuration conf, String user, String actionID) 101 throws URIHandlerException { 102 HCatURI hcatURI; 103 try { 104 hcatURI = new HCatURI(uri); 105 } 106 catch (URISyntaxException e) { 107 throw new URIHandlerException(ErrorCode.E0906, uri, e); 108 } 109 HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class); 110 if (!hcatService.isRegisteredForNotification(hcatURI)) { 111 HCatClient client = getHCatClient(uri, conf); 112 try { 113 String topic = client.getMessageBusTopicName(hcatURI.getDb(), hcatURI.getTable()); 114 if (topic == null) { 115 return; 116 } 117 hcatService.registerForNotification(hcatURI, topic, new HCatMessageHandler(uri.getAuthority())); 118 } 119 catch (HCatException e) { 120 throw new HCatAccessorException(ErrorCode.E1501, e); 121 } 122 finally { 123 closeQuietly(client, null, true); 124 } 125 } 126 PartitionDependencyManagerService pdmService = Services.get().get(PartitionDependencyManagerService.class); 127 pdmService.addMissingDependency(hcatURI, actionID); 128 } 129 130 @Override 131 public boolean unregisterFromNotification(URI uri, String actionID) { 132 HCatURI hcatURI; 133 try { 134 hcatURI = new HCatURI(uri); 135 } 136 catch (URISyntaxException e) { 137 throw new RuntimeException(e); // Unexpected at this point 138 } 139 PartitionDependencyManagerService pdmService = Services.get().get(PartitionDependencyManagerService.class); 140 return pdmService.removeMissingDependency(hcatURI, actionID); 141 } 142 143 @Override 144 public Context getContext(URI uri, Configuration conf, String user, boolean readOnly) 145 throws URIHandlerException { 146 HCatContext context = null; 147 //read operations are allowed for any user in HCat and so accessing as Oozie server itself 148 //For write operations, perform doAs as user 149 if (readOnly) { 150 HCatClient client = getHCatClient(uri, conf); 151 context = new HCatContext(conf, user, client); 152 } 153 else { 154 HCatClientWithToken client = getHCatClient(uri, conf, user); 155 context = new HCatContext(conf, user, client); 156 } 157 return context; 158 } 159 160 @Override 161 public boolean exists(URI uri, Context context) throws URIHandlerException { 162 HCatClient client = ((HCatContext) context).getHCatClient(); 163 return exists(uri, client, false); 164 } 165 166 @Override 167 public boolean exists(URI uri, Configuration conf, String user) throws URIHandlerException { 168 HCatClient client = getHCatClient(uri, conf); 169 return exists(uri, client, true); 170 } 171 172 @Override 173 public void delete(URI uri, Context context) throws URIHandlerException { 174 HCatClient client = ((HCatContext) context).getHCatClient(); 175 try { 176 HCatURI hcatUri = new HCatURI(uri); 177 client.dropPartitions(hcatUri.getDb(), hcatUri.getTable(), hcatUri.getPartitionMap(), true); 178 } 179 catch (URISyntaxException e) { 180 throw new HCatAccessorException(ErrorCode.E1501, e); 181 } 182 catch (HCatException e) { 183 throw new HCatAccessorException(ErrorCode.E1501, e); 184 } 185 } 186 187 @Override 188 public void delete(URI uri, Configuration conf, String user) throws URIHandlerException { 189 HCatClientWithToken client = null; 190 try { 191 HCatURI hcatUri = new HCatURI(uri); 192 client = getHCatClient(uri, conf, user); 193 client.getHCatClient().dropPartitions(hcatUri.getDb(), hcatUri.getTable(), hcatUri.getPartitionMap(), true); 194 } 195 catch (URISyntaxException e){ 196 throw new HCatAccessorException(ErrorCode.E1501, e); 197 } 198 catch (HCatException e) { 199 throw new HCatAccessorException(ErrorCode.E1501, e); 200 } 201 finally { 202 closeQuietly(client.getHCatClient(), client.getDelegationToken(),true); 203 } 204 } 205 206 @Override 207 public String getURIWithDoneFlag(String uri, String doneFlag) throws URIHandlerException { 208 return uri; 209 } 210 211 @Override 212 public void validate(String uri) throws URIHandlerException { 213 try { 214 new HCatURI(uri); // will fail if uri syntax is incorrect 215 } 216 catch (URISyntaxException e) { 217 throw new URIHandlerException(ErrorCode.E0906, uri, e); 218 } 219 220 } 221 222 @Override 223 public void destroy() { 224 225 } 226 227 private HiveConf getHiveConf(URI uri, Configuration conf){ 228 HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class); 229 if (hcatService.getHCatConf() != null) { 230 conf = hcatService.getHCatConf(); 231 } 232 HiveConf hiveConf = new HiveConf(conf, this.getClass()); 233 String serverURI = getMetastoreConnectURI(uri); 234 if (!serverURI.equals("")) { 235 hiveConf.set("hive.metastore.local", "false"); 236 } 237 hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, serverURI); 238 return hiveConf; 239 } 240 241 private HCatClient getHCatClient(URI uri, Configuration conf) throws HCatAccessorException { 242 HiveConf hiveConf = getHiveConf(uri, conf); 243 try { 244 XLog.getLog(HCatURIHandler.class).info("Creating HCatClient for login_user [{0}] and server [{1}] ", 245 UserGroupInformation.getLoginUser(), hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname)); 246 return HCatClient.create(hiveConf); 247 } 248 catch (HCatException e) { 249 throw new HCatAccessorException(ErrorCode.E1501, e); 250 } 251 catch (IOException e) { 252 throw new HCatAccessorException(ErrorCode.E1501, e); 253 } 254 } 255 256 private HCatClientWithToken getHCatClient(URI uri, Configuration conf, String user) 257 throws HCatAccessorException { 258 final HiveConf hiveConf = getHiveConf(uri, conf); 259 String delegationToken = null; 260 try { 261 // Get UGI to doAs() as the specified user 262 UserGroupInformation ugi = UserGroupInformation.createProxyUser(user, UserGroupInformation.getLoginUser()); 263 // Define the label for the Delegation Token for the HCat instance. 264 hiveConf.set("hive.metastore.token.signature", "HCatTokenSignature"); 265 if (hiveConf.getBoolean(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname, false)) { 266 HCatClient tokenClient = null; 267 try { 268 // Retrieve Delegation token for HCatalog 269 tokenClient = HCatClient.create(hiveConf); 270 delegationToken = tokenClient.getDelegationToken(user, UserGroupInformation.getLoginUser() 271 .getUserName()); 272 // Store Delegation token in the UGI 273 ShimLoader.getHadoopShims().setTokenStr(ugi, delegationToken, 274 hiveConf.get("hive.metastore.token.signature")); 275 } 276 finally { 277 if (tokenClient != null) 278 tokenClient.close(); 279 } 280 } 281 XLog.getLog(HCatURIHandler.class).info( 282 "Creating HCatClient for user [{0}] login_user [{1}] and server [{2}] ", user, 283 UserGroupInformation.getLoginUser(), hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname)); 284 HCatClient hcatClient = ugi.doAs(new PrivilegedExceptionAction<HCatClient>() { 285 @Override 286 public HCatClient run() throws Exception { 287 HCatClient client = HCatClient.create(hiveConf); 288 return client; 289 } 290 }); 291 HCatClientWithToken clientWithToken = new HCatClientWithToken(hcatClient, delegationToken); 292 return clientWithToken; 293 } 294 catch (IOException e) { 295 throw new HCatAccessorException(ErrorCode.E1501, e.getMessage()); 296 } 297 catch (Exception e) { 298 throw new HCatAccessorException(ErrorCode.E1501, e.getMessage()); 299 } 300 } 301 302 private String getMetastoreConnectURI(URI uri) { 303 String metastoreURI; 304 // For unit tests 305 if (uri.getAuthority().equals("unittest-local")) { 306 metastoreURI = ""; 307 } 308 else { 309 // Hardcoding hcat to thrift mapping till support for webhcat(templeton) 310 // is added 311 metastoreURI = "thrift://" + uri.getAuthority(); 312 } 313 return metastoreURI; 314 } 315 316 private boolean exists(URI uri, HCatClient client, boolean closeClient) throws HCatAccessorException { 317 try { 318 HCatURI hcatURI = new HCatURI(uri.toString()); 319 List<HCatPartition> partitions = client.getPartitions(hcatURI.getDb(), hcatURI.getTable(), 320 hcatURI.getPartitionMap()); 321 return (partitions != null && !partitions.isEmpty()); 322 } 323 catch (ConnectionFailureException e) { 324 throw new HCatAccessorException(ErrorCode.E1501, e); 325 } 326 catch (HCatException e) { 327 throw new HCatAccessorException(ErrorCode.E0902, e); 328 } 329 catch (URISyntaxException e) { 330 throw new HCatAccessorException(ErrorCode.E0902, e); 331 } 332 finally { 333 closeQuietly(client, null, closeClient); 334 } 335 } 336 337 private void closeQuietly(HCatClient client, String delegationToken, boolean close) { 338 if (close && client != null) { 339 try { 340 if(delegationToken != null && !delegationToken.isEmpty()) { 341 client.cancelDelegationToken(delegationToken); 342 } 343 client.close(); 344 } 345 catch (Exception ignore) { 346 XLog.getLog(HCatURIHandler.class).warn("Error closing hcat client", ignore); 347 } 348 } 349 } 350 351 class HCatClientWithToken { 352 private HCatClient hcatClient; 353 private String token; 354 355 public HCatClientWithToken(HCatClient client, String delegationToken) { 356 this.hcatClient = client; 357 this.token = delegationToken; 358 } 359 360 public HCatClient getHCatClient() { 361 return this.hcatClient; 362 } 363 364 public String getDelegationToken() { 365 return this.token; 366 } 367 } 368 369 static class HCatContext extends Context { 370 371 private HCatClient hcatClient; 372 private String delegationToken; 373 374 /** 375 * Create a HCatContext that can be used to access a hcat URI 376 * 377 * @param conf Configuration to access the URI 378 * @param user name of the user the URI should be accessed as 379 * @param hcatClient HCatClient to talk to hcatalog server 380 */ 381 public HCatContext(Configuration conf, String user, HCatClient hcatClient) { 382 super(conf, user); 383 this.hcatClient = hcatClient; 384 } 385 386 public HCatContext(Configuration conf, String user, HCatClientWithToken hcatClient) { 387 super(conf, user); 388 this.hcatClient = hcatClient.getHCatClient(); 389 this.delegationToken = hcatClient.getDelegationToken(); 390 } 391 392 /** 393 * Get the HCatClient to talk to hcatalog server 394 * 395 * @return HCatClient to talk to hcatalog server 396 */ 397 public HCatClient getHCatClient() { 398 return hcatClient; 399 } 400 401 /** 402 * Get the Delegation token to access HCat 403 * 404 * @return delegationToken 405 */ 406 public String getDelegationToken() { 407 return delegationToken; 408 } 409 410 @Override 411 public void destroy() { 412 try { 413 hcatClient.close(); 414 delegationToken = null; 415 } 416 catch (Exception ignore) { 417 XLog.getLog(HCatContext.class).warn("Error closing hcat client", ignore); 418 } 419 } 420 421 } 422 423}