001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.dependency; 019 020import java.io.IOException; 021import java.net.URI; 022import java.net.URISyntaxException; 023import java.util.Arrays; 024import java.util.HashMap; 025import java.util.HashSet; 026import java.util.List; 027import java.util.Map; 028import java.util.Set; 029 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.hive.conf.HiveConf; 032import org.apache.hadoop.security.UserGroupInformation; 033import org.apache.hive.hcatalog.api.ConnectionFailureException; 034import org.apache.hive.hcatalog.api.HCatClient; 035import org.apache.hive.hcatalog.api.HCatPartition; 036import org.apache.hive.hcatalog.common.HCatException; 037import org.apache.oozie.ErrorCode; 038import org.apache.oozie.action.hadoop.HCatLauncherURIHandler; 039import org.apache.oozie.action.hadoop.LauncherURIHandler; 040import org.apache.oozie.dependency.hcat.HCatMessageHandler; 041import org.apache.oozie.service.HCatAccessorException; 042import org.apache.oozie.service.HCatAccessorService; 043import org.apache.oozie.service.PartitionDependencyManagerService; 044import org.apache.oozie.service.Services; 045import org.apache.oozie.service.URIHandlerService; 046import org.apache.oozie.util.HCatURI; 047import org.apache.oozie.util.XLog; 048 049public class HCatURIHandler implements URIHandler { 050 051 private Set<String> supportedSchemes; 052 private Map<String, DependencyType> dependencyTypes; 053 private List<Class<?>> classesToShip; 054 055 @Override 056 public void init(Configuration conf) { 057 dependencyTypes = new HashMap<String, DependencyType>(); 058 supportedSchemes = new HashSet<String>(); 059 String[] schemes = conf.getStrings(URIHandlerService.URI_HANDLER_SUPPORTED_SCHEMES_PREFIX 060 + this.getClass().getSimpleName() + URIHandlerService.URI_HANDLER_SUPPORTED_SCHEMES_SUFFIX, "hcat"); 061 supportedSchemes.addAll(Arrays.asList(schemes)); 062 classesToShip = new HCatLauncherURIHandler().getClassesForLauncher(); 063 } 064 065 @Override 066 public Set<String> getSupportedSchemes() { 067 return supportedSchemes; 068 } 069 070 @Override 071 public Class<? extends LauncherURIHandler> getLauncherURIHandlerClass() { 072 return HCatLauncherURIHandler.class; 073 } 074 075 @Override 076 public List<Class<?>> getClassesForLauncher() { 077 return classesToShip; 078 } 079 080 @Override 081 public DependencyType getDependencyType(URI uri) throws URIHandlerException { 082 DependencyType depType = DependencyType.PULL; 083 // Not initializing in constructor as this will be part of oozie.services.ext 084 // and will be initialized after URIHandlerService 085 HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class); 086 if (hcatService != null) { 087 depType = dependencyTypes.get(uri.getAuthority()); 088 if (depType == null) { 089 depType = hcatService.isKnownPublisher(uri) ? DependencyType.PUSH : DependencyType.PULL; 090 dependencyTypes.put(uri.getAuthority(), depType); 091 } 092 } 093 return depType; 094 } 095 096 @Override 097 public void registerForNotification(URI uri, Configuration conf, String user, String actionID) 098 throws URIHandlerException { 099 HCatURI hcatURI; 100 try { 101 hcatURI = new HCatURI(uri); 102 } 103 catch (URISyntaxException e) { 104 throw new URIHandlerException(ErrorCode.E0906, uri, e); 105 } 106 HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class); 107 if (!hcatService.isRegisteredForNotification(hcatURI)) { 108 HCatClient client = getHCatClient(uri, conf, user); 109 try { 110 String topic = client.getMessageBusTopicName(hcatURI.getDb(), hcatURI.getTable()); 111 if (topic == null) { 112 return; 113 } 114 hcatService.registerForNotification(hcatURI, topic, new HCatMessageHandler(uri.getAuthority())); 115 } 116 catch (HCatException e) { 117 throw new HCatAccessorException(ErrorCode.E1501, e); 118 } 119 finally { 120 closeQuietly(client, true); 121 } 122 } 123 PartitionDependencyManagerService pdmService = Services.get().get(PartitionDependencyManagerService.class); 124 pdmService.addMissingDependency(hcatURI, actionID); 125 } 126 127 @Override 128 public boolean unregisterFromNotification(URI uri, String actionID) { 129 HCatURI hcatURI; 130 try { 131 hcatURI = new HCatURI(uri); 132 } 133 catch (URISyntaxException e) { 134 throw new RuntimeException(e); // Unexpected at this point 135 } 136 PartitionDependencyManagerService pdmService = Services.get().get(PartitionDependencyManagerService.class); 137 return pdmService.removeMissingDependency(hcatURI, actionID); 138 } 139 140 @Override 141 public Context getContext(URI uri, Configuration conf, String user) throws URIHandlerException { 142 HCatClient client = getHCatClient(uri, conf, user); 143 return new HCatContext(conf, user, client); 144 } 145 146 @Override 147 public boolean exists(URI uri, Context context) throws URIHandlerException { 148 HCatClient client = ((HCatContext) context).getHCatClient(); 149 return exists(uri, client, false); 150 } 151 152 @Override 153 public boolean exists(URI uri, Configuration conf, String user) throws URIHandlerException { 154 HCatClient client = getHCatClient(uri, conf, user); 155 return exists(uri, client, true); 156 } 157 158 @Override 159 public String getURIWithDoneFlag(String uri, String doneFlag) throws URIHandlerException { 160 return uri; 161 } 162 163 @Override 164 public void validate(String uri) throws URIHandlerException { 165 try { 166 new HCatURI(uri); // will fail if uri syntax is incorrect 167 } 168 catch (URISyntaxException e) { 169 throw new URIHandlerException(ErrorCode.E0906, uri, e); 170 } 171 172 } 173 174 @Override 175 public void destroy() { 176 177 } 178 179 private HCatClient getHCatClient(URI uri, Configuration conf, String user) throws HCatAccessorException { 180 HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class); 181 if (hcatService.getHCatConf() != null) { 182 conf = hcatService.getHCatConf(); 183 } 184 final HiveConf hiveConf = new HiveConf(conf, this.getClass()); 185 String serverURI = getMetastoreConnectURI(uri); 186 if (!serverURI.equals("")) { 187 hiveConf.set("hive.metastore.local", "false"); 188 } 189 hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, serverURI); 190 try { 191 XLog.getLog(HCatURIHandler.class).info( 192 "Creating HCatClient for user [{0}] login_user [{1}] and server [{2}] ", user, 193 UserGroupInformation.getLoginUser(), serverURI); 194 195 // HiveMetastoreClient (hive 0.9) currently does not work if UGI has doAs 196 // We are good to connect as the oozie user since listPartitions does not require 197 // authorization 198 /* 199 UserGroupInformation ugi = ugiService.getProxyUser(user); 200 return ugi.doAs(new PrivilegedExceptionAction<HCatClient>() { 201 public HCatClient run() throws Exception { 202 return HCatClient.create(hiveConf); 203 } 204 }); 205 */ 206 207 return HCatClient.create(hiveConf); 208 } 209 catch (HCatException e) { 210 throw new HCatAccessorException(ErrorCode.E1501, e); 211 } 212 catch (IOException e) { 213 throw new HCatAccessorException(ErrorCode.E1501, e); 214 } 215 216 } 217 218 private String getMetastoreConnectURI(URI uri) { 219 String metastoreURI; 220 // For unit tests 221 if (uri.getAuthority().equals("unittest-local")) { 222 metastoreURI = ""; 223 } 224 else { 225 // Hardcoding hcat to thrift mapping till support for webhcat(templeton) 226 // is added 227 metastoreURI = "thrift://" + uri.getAuthority(); 228 } 229 return metastoreURI; 230 } 231 232 private boolean exists(URI uri, HCatClient client, boolean closeClient) throws HCatAccessorException { 233 try { 234 HCatURI hcatURI = new HCatURI(uri.toString()); 235 List<HCatPartition> partitions = client.getPartitions(hcatURI.getDb(), hcatURI.getTable(), 236 hcatURI.getPartitionMap()); 237 return (partitions != null && !partitions.isEmpty()); 238 } 239 catch (ConnectionFailureException e) { 240 throw new HCatAccessorException(ErrorCode.E1501, e); 241 } 242 catch (HCatException e) { 243 throw new HCatAccessorException(ErrorCode.E0902, e); 244 } 245 catch (URISyntaxException e) { 246 throw new HCatAccessorException(ErrorCode.E0902, e); 247 } 248 finally { 249 closeQuietly(client, closeClient); 250 } 251 } 252 253 private void closeQuietly(HCatClient client, boolean close) { 254 if (close && client != null) { 255 try { 256 client.close(); 257 } 258 catch (Exception ignore) { 259 XLog.getLog(HCatURIHandler.class).warn("Error closing hcat client", ignore); 260 } 261 } 262 } 263 264 static class HCatContext extends Context { 265 266 private HCatClient hcatClient; 267 268 /** 269 * Create a HCatContext that can be used to access a hcat URI 270 * 271 * @param conf Configuration to access the URI 272 * @param user name of the user the URI should be accessed as 273 * @param hcatClient HCatClient to talk to hcatalog server 274 */ 275 public HCatContext(Configuration conf, String user, HCatClient hcatClient) { 276 super(conf, user); 277 this.hcatClient = hcatClient; 278 } 279 280 /** 281 * Get the HCatClient to talk to hcatalog server 282 * 283 * @return HCatClient to talk to hcatalog server 284 */ 285 public HCatClient getHCatClient() { 286 return hcatClient; 287 } 288 289 @Override 290 public void destroy() { 291 try { 292 hcatClient.close(); 293 } 294 catch (Exception ignore) { 295 XLog.getLog(HCatContext.class).warn("Error closing hcat client", ignore); 296 } 297 } 298 299 } 300 301}