001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.dependency; 019 020 import java.io.IOException; 021 import java.net.URI; 022 import java.net.URISyntaxException; 023 import java.util.Arrays; 024 import java.util.HashMap; 025 import java.util.HashSet; 026 import java.util.List; 027 import java.util.Map; 028 import java.util.Set; 029 030 import org.apache.hadoop.conf.Configuration; 031 import org.apache.hadoop.hive.conf.HiveConf; 032 import org.apache.hadoop.security.UserGroupInformation; 033 import org.apache.hcatalog.api.ConnectionFailureException; 034 import org.apache.hcatalog.api.HCatClient; 035 import org.apache.hcatalog.api.HCatPartition; 036 import org.apache.hcatalog.common.HCatException; 037 import org.apache.oozie.ErrorCode; 038 import org.apache.oozie.action.hadoop.HCatLauncherURIHandler; 039 import org.apache.oozie.action.hadoop.LauncherURIHandler; 040 import org.apache.oozie.dependency.hcat.HCatMessageHandler; 041 import org.apache.oozie.service.HCatAccessorException; 042 import org.apache.oozie.service.HCatAccessorService; 043 import org.apache.oozie.service.PartitionDependencyManagerService; 044 import org.apache.oozie.service.Services; 045 import org.apache.oozie.service.URIHandlerService; 046 import org.apache.oozie.util.HCatURI; 047 import org.apache.oozie.util.XLog; 048 049 public class HCatURIHandler implements URIHandler { 050 051 private Set<String> supportedSchemes; 052 private Map<String, DependencyType> dependencyTypes; 053 private List<Class<?>> classesToShip; 054 055 @Override 056 public void init(Configuration conf) { 057 dependencyTypes = new HashMap<String, DependencyType>(); 058 supportedSchemes = new HashSet<String>(); 059 String[] schemes = conf.getStrings(URIHandlerService.URI_HANDLER_SUPPORTED_SCHEMES_PREFIX 060 + this.getClass().getSimpleName() + URIHandlerService.URI_HANDLER_SUPPORTED_SCHEMES_SUFFIX, "hcat"); 061 supportedSchemes.addAll(Arrays.asList(schemes)); 062 classesToShip = new HCatLauncherURIHandler().getClassesForLauncher(); 063 } 064 065 @Override 066 public Set<String> getSupportedSchemes() { 067 return supportedSchemes; 068 } 069 070 @Override 071 public Class<? extends LauncherURIHandler> getLauncherURIHandlerClass() { 072 return HCatLauncherURIHandler.class; 073 } 074 075 @Override 076 public List<Class<?>> getClassesForLauncher() { 077 return classesToShip; 078 } 079 080 @Override 081 public DependencyType getDependencyType(URI uri) throws URIHandlerException { 082 DependencyType depType = DependencyType.PULL; 083 // Not initializing in constructor as this will be part of oozie.services.ext 084 // and will be initialized after URIHandlerService 085 HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class); 086 if (hcatService != null) { 087 depType = dependencyTypes.get(uri.getAuthority()); 088 if (depType == null) { 089 depType = hcatService.isKnownPublisher(uri) ? DependencyType.PUSH : DependencyType.PULL; 090 dependencyTypes.put(uri.getAuthority(), depType); 091 } 092 } 093 return depType; 094 } 095 096 @Override 097 public void registerForNotification(URI uri, Configuration conf, String user, String actionID) 098 throws URIHandlerException { 099 HCatURI hcatURI; 100 try { 101 hcatURI = new HCatURI(uri); 102 } 103 catch (URISyntaxException e) { 104 throw new URIHandlerException(ErrorCode.E0906, uri, e); 105 } 106 HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class); 107 if (!hcatService.isRegisteredForNotification(hcatURI)) { 108 HCatClient client = getHCatClient(uri, conf, user); 109 try { 110 String topic = client.getMessageBusTopicName(hcatURI.getDb(), hcatURI.getTable()); 111 if (topic == null) { 112 return; 113 } 114 hcatService.registerForNotification(hcatURI, topic, new HCatMessageHandler(uri.getAuthority())); 115 } 116 catch (HCatException e) { 117 throw new HCatAccessorException(ErrorCode.E1501, e); 118 } 119 finally { 120 closeQuietly(client, true); 121 } 122 } 123 PartitionDependencyManagerService pdmService = Services.get().get(PartitionDependencyManagerService.class); 124 pdmService.addMissingDependency(hcatURI, actionID); 125 } 126 127 @Override 128 public boolean unregisterFromNotification(URI uri, String actionID) { 129 HCatURI hcatURI; 130 try { 131 hcatURI = new HCatURI(uri); 132 } 133 catch (URISyntaxException e) { 134 throw new RuntimeException(e); // Unexpected at this point 135 } 136 PartitionDependencyManagerService pdmService = Services.get().get(PartitionDependencyManagerService.class); 137 return pdmService.removeMissingDependency(hcatURI, actionID); 138 } 139 140 @Override 141 public Context getContext(URI uri, Configuration conf, String user) throws URIHandlerException { 142 HCatClient client = getHCatClient(uri, conf, user); 143 return new HCatContext(conf, user, client); 144 } 145 146 @Override 147 public boolean exists(URI uri, Context context) throws URIHandlerException { 148 HCatClient client = ((HCatContext) context).getHCatClient(); 149 return exists(uri, client, false); 150 } 151 152 @Override 153 public boolean exists(URI uri, Configuration conf, String user) throws URIHandlerException { 154 HCatClient client = getHCatClient(uri, conf, user); 155 return exists(uri, client, true); 156 } 157 158 @Override 159 public String getURIWithDoneFlag(String uri, String doneFlag) throws URIHandlerException { 160 return uri; 161 } 162 163 @Override 164 public void validate(String uri) throws URIHandlerException { 165 try { 166 new HCatURI(uri); // will fail if uri syntax is incorrect 167 } 168 catch (URISyntaxException e) { 169 throw new URIHandlerException(ErrorCode.E0906, uri, e); 170 } 171 172 } 173 174 @Override 175 public void destroy() { 176 177 } 178 179 private HCatClient getHCatClient(URI uri, Configuration conf, String user) throws HCatAccessorException { 180 final HiveConf hiveConf = new HiveConf(conf, this.getClass()); 181 String serverURI = getMetastoreConnectURI(uri); 182 if (!serverURI.equals("")) { 183 hiveConf.set("hive.metastore.local", "false"); 184 } 185 hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, serverURI); 186 try { 187 XLog.getLog(HCatURIHandler.class).info( 188 "Creating HCatClient for user [{0}] login_user [{1}] and server [{2}] ", user, 189 UserGroupInformation.getLoginUser(), serverURI); 190 191 // HiveMetastoreClient (hive 0.9) currently does not work if UGI has doAs 192 // We are good to connect as the oozie user since listPartitions does not require 193 // authorization 194 /* 195 UserGroupInformation ugi = ugiService.getProxyUser(user); 196 return ugi.doAs(new PrivilegedExceptionAction<HCatClient>() { 197 public HCatClient run() throws Exception { 198 return HCatClient.create(hiveConf); 199 } 200 }); 201 */ 202 203 return HCatClient.create(hiveConf); 204 } 205 catch (HCatException e) { 206 throw new HCatAccessorException(ErrorCode.E1501, e); 207 } 208 catch (IOException e) { 209 throw new HCatAccessorException(ErrorCode.E1501, e); 210 } 211 212 } 213 214 private String getMetastoreConnectURI(URI uri) { 215 String metastoreURI; 216 // For unit tests 217 if (uri.getAuthority().equals("unittest-local")) { 218 metastoreURI = ""; 219 } 220 else { 221 // Hardcoding hcat to thrift mapping till support for webhcat(templeton) 222 // is added 223 metastoreURI = "thrift://" + uri.getAuthority(); 224 } 225 return metastoreURI; 226 } 227 228 private boolean exists(URI uri, HCatClient client, boolean closeClient) throws HCatAccessorException { 229 try { 230 HCatURI hcatURI = new HCatURI(uri.toString()); 231 List<HCatPartition> partitions = client.getPartitions(hcatURI.getDb(), hcatURI.getTable(), 232 hcatURI.getPartitionMap()); 233 return (partitions != null && !partitions.isEmpty()); 234 } 235 catch (ConnectionFailureException e) { 236 throw new HCatAccessorException(ErrorCode.E1501, e); 237 } 238 catch (HCatException e) { 239 throw new HCatAccessorException(ErrorCode.E0902, e); 240 } 241 catch (URISyntaxException e) { 242 throw new HCatAccessorException(ErrorCode.E0902, e); 243 } 244 finally { 245 closeQuietly(client, closeClient); 246 } 247 } 248 249 private void closeQuietly(HCatClient client, boolean close) { 250 if (close && client != null) { 251 try { 252 client.close(); 253 } 254 catch (Exception ignore) { 255 XLog.getLog(HCatURIHandler.class).warn("Error closing hcat client", ignore); 256 } 257 } 258 } 259 260 static class HCatContext extends Context { 261 262 private HCatClient hcatClient; 263 264 /** 265 * Create a HCatContext that can be used to access a hcat URI 266 * 267 * @param conf Configuration to access the URI 268 * @param user name of the user the URI should be accessed as 269 * @param hcatClient HCatClient to talk to hcatalog server 270 */ 271 public HCatContext(Configuration conf, String user, HCatClient hcatClient) { 272 super(conf, user); 273 this.hcatClient = hcatClient; 274 } 275 276 /** 277 * Get the HCatClient to talk to hcatalog server 278 * 279 * @return HCatClient to talk to hcatalog server 280 */ 281 public HCatClient getHCatClient() { 282 return hcatClient; 283 } 284 285 @Override 286 public void destroy() { 287 try { 288 hcatClient.close(); 289 } 290 catch (Exception ignore) { 291 XLog.getLog(HCatContext.class).warn("Error closing hcat client", ignore); 292 } 293 } 294 295 } 296 297 }