This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.dependency;
019
020 import java.io.IOException;
021 import java.net.URI;
022 import java.net.URISyntaxException;
023 import java.util.Arrays;
024 import java.util.HashMap;
025 import java.util.HashSet;
026 import java.util.List;
027 import java.util.Map;
028 import java.util.Set;
029
030 import org.apache.hadoop.conf.Configuration;
031 import org.apache.hadoop.hive.conf.HiveConf;
032 import org.apache.hadoop.security.UserGroupInformation;
033 import org.apache.hcatalog.api.ConnectionFailureException;
034 import org.apache.hcatalog.api.HCatClient;
035 import org.apache.hcatalog.api.HCatPartition;
036 import org.apache.hcatalog.common.HCatException;
037 import org.apache.oozie.ErrorCode;
038 import org.apache.oozie.action.hadoop.HCatLauncherURIHandler;
039 import org.apache.oozie.action.hadoop.LauncherURIHandler;
040 import org.apache.oozie.dependency.hcat.HCatMessageHandler;
041 import org.apache.oozie.service.HCatAccessorException;
042 import org.apache.oozie.service.HCatAccessorService;
043 import org.apache.oozie.service.PartitionDependencyManagerService;
044 import org.apache.oozie.service.Services;
045 import org.apache.oozie.service.URIHandlerService;
046 import org.apache.oozie.util.HCatURI;
047 import org.apache.oozie.util.XLog;
048
049 public class HCatURIHandler implements URIHandler {
050
051 private Set<String> supportedSchemes;
052 private Map<String, DependencyType> dependencyTypes;
053 private List<Class<?>> classesToShip;
054
055 @Override
056 public void init(Configuration conf) {
057 dependencyTypes = new HashMap<String, DependencyType>();
058 supportedSchemes = new HashSet<String>();
059 String[] schemes = conf.getStrings(URIHandlerService.URI_HANDLER_SUPPORTED_SCHEMES_PREFIX
060 + this.getClass().getSimpleName() + URIHandlerService.URI_HANDLER_SUPPORTED_SCHEMES_SUFFIX, "hcat");
061 supportedSchemes.addAll(Arrays.asList(schemes));
062 classesToShip = new HCatLauncherURIHandler().getClassesForLauncher();
063 }
064
065 @Override
066 public Set<String> getSupportedSchemes() {
067 return supportedSchemes;
068 }
069
070 @Override
071 public Class<? extends LauncherURIHandler> getLauncherURIHandlerClass() {
072 return HCatLauncherURIHandler.class;
073 }
074
075 @Override
076 public List<Class<?>> getClassesForLauncher() {
077 return classesToShip;
078 }
079
080 @Override
081 public DependencyType getDependencyType(URI uri) throws URIHandlerException {
082 DependencyType depType = DependencyType.PULL;
083 // Not initializing in constructor as this will be part of oozie.services.ext
084 // and will be initialized after URIHandlerService
085 HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
086 if (hcatService != null) {
087 depType = dependencyTypes.get(uri.getAuthority());
088 if (depType == null) {
089 depType = hcatService.isKnownPublisher(uri) ? DependencyType.PUSH : DependencyType.PULL;
090 dependencyTypes.put(uri.getAuthority(), depType);
091 }
092 }
093 return depType;
094 }
095
096 @Override
097 public void registerForNotification(URI uri, Configuration conf, String user, String actionID)
098 throws URIHandlerException {
099 HCatURI hcatURI;
100 try {
101 hcatURI = new HCatURI(uri);
102 }
103 catch (URISyntaxException e) {
104 throw new URIHandlerException(ErrorCode.E0906, uri, e);
105 }
106 HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
107 if (!hcatService.isRegisteredForNotification(hcatURI)) {
108 HCatClient client = getHCatClient(uri, conf, user);
109 try {
110 String topic = client.getMessageBusTopicName(hcatURI.getDb(), hcatURI.getTable());
111 if (topic == null) {
112 return;
113 }
114 hcatService.registerForNotification(hcatURI, topic, new HCatMessageHandler(uri.getAuthority()));
115 }
116 catch (HCatException e) {
117 throw new HCatAccessorException(ErrorCode.E1501, e);
118 }
119 finally {
120 closeQuietly(client, true);
121 }
122 }
123 PartitionDependencyManagerService pdmService = Services.get().get(PartitionDependencyManagerService.class);
124 pdmService.addMissingDependency(hcatURI, actionID);
125 }
126
127 @Override
128 public boolean unregisterFromNotification(URI uri, String actionID) {
129 HCatURI hcatURI;
130 try {
131 hcatURI = new HCatURI(uri);
132 }
133 catch (URISyntaxException e) {
134 throw new RuntimeException(e); // Unexpected at this point
135 }
136 PartitionDependencyManagerService pdmService = Services.get().get(PartitionDependencyManagerService.class);
137 return pdmService.removeMissingDependency(hcatURI, actionID);
138 }
139
140 @Override
141 public Context getContext(URI uri, Configuration conf, String user) throws URIHandlerException {
142 HCatClient client = getHCatClient(uri, conf, user);
143 return new HCatContext(conf, user, client);
144 }
145
146 @Override
147 public boolean exists(URI uri, Context context) throws URIHandlerException {
148 HCatClient client = ((HCatContext) context).getHCatClient();
149 return exists(uri, client, false);
150 }
151
152 @Override
153 public boolean exists(URI uri, Configuration conf, String user) throws URIHandlerException {
154 HCatClient client = getHCatClient(uri, conf, user);
155 return exists(uri, client, true);
156 }
157
158 @Override
159 public String getURIWithDoneFlag(String uri, String doneFlag) throws URIHandlerException {
160 return uri;
161 }
162
163 @Override
164 public void validate(String uri) throws URIHandlerException {
165 try {
166 new HCatURI(uri); // will fail if uri syntax is incorrect
167 }
168 catch (URISyntaxException e) {
169 throw new URIHandlerException(ErrorCode.E0906, uri, e);
170 }
171
172 }
173
174 @Override
175 public void destroy() {
176
177 }
178
179 private HCatClient getHCatClient(URI uri, Configuration conf, String user) throws HCatAccessorException {
180 final HiveConf hiveConf = new HiveConf(conf, this.getClass());
181 String serverURI = getMetastoreConnectURI(uri);
182 if (!serverURI.equals("")) {
183 hiveConf.set("hive.metastore.local", "false");
184 }
185 hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, serverURI);
186 try {
187 XLog.getLog(HCatURIHandler.class).info(
188 "Creating HCatClient for user [{0}] login_user [{1}] and server [{2}] ", user,
189 UserGroupInformation.getLoginUser(), serverURI);
190
191 // HiveMetastoreClient (hive 0.9) currently does not work if UGI has doAs
192 // We are good to connect as the oozie user since listPartitions does not require
193 // authorization
194 /*
195 UserGroupInformation ugi = ugiService.getProxyUser(user);
196 return ugi.doAs(new PrivilegedExceptionAction<HCatClient>() {
197 public HCatClient run() throws Exception {
198 return HCatClient.create(hiveConf);
199 }
200 });
201 */
202
203 return HCatClient.create(hiveConf);
204 }
205 catch (HCatException e) {
206 throw new HCatAccessorException(ErrorCode.E1501, e);
207 }
208 catch (IOException e) {
209 throw new HCatAccessorException(ErrorCode.E1501, e);
210 }
211
212 }
213
214 private String getMetastoreConnectURI(URI uri) {
215 String metastoreURI;
216 // For unit tests
217 if (uri.getAuthority().equals("unittest-local")) {
218 metastoreURI = "";
219 }
220 else {
221 // Hardcoding hcat to thrift mapping till support for webhcat(templeton)
222 // is added
223 metastoreURI = "thrift://" + uri.getAuthority();
224 }
225 return metastoreURI;
226 }
227
228 private boolean exists(URI uri, HCatClient client, boolean closeClient) throws HCatAccessorException {
229 try {
230 HCatURI hcatURI = new HCatURI(uri.toString());
231 List<HCatPartition> partitions = client.getPartitions(hcatURI.getDb(), hcatURI.getTable(),
232 hcatURI.getPartitionMap());
233 return (partitions != null && !partitions.isEmpty());
234 }
235 catch (ConnectionFailureException e) {
236 throw new HCatAccessorException(ErrorCode.E1501, e);
237 }
238 catch (HCatException e) {
239 throw new HCatAccessorException(ErrorCode.E0902, e);
240 }
241 catch (URISyntaxException e) {
242 throw new HCatAccessorException(ErrorCode.E0902, e);
243 }
244 finally {
245 closeQuietly(client, closeClient);
246 }
247 }
248
249 private void closeQuietly(HCatClient client, boolean close) {
250 if (close && client != null) {
251 try {
252 client.close();
253 }
254 catch (Exception ignore) {
255 XLog.getLog(HCatURIHandler.class).warn("Error closing hcat client", ignore);
256 }
257 }
258 }
259
260 static class HCatContext extends Context {
261
262 private HCatClient hcatClient;
263
264 /**
265 * Create a HCatContext that can be used to access a hcat URI
266 *
267 * @param conf Configuration to access the URI
268 * @param user name of the user the URI should be accessed as
269 * @param hcatClient HCatClient to talk to hcatalog server
270 */
271 public HCatContext(Configuration conf, String user, HCatClient hcatClient) {
272 super(conf, user);
273 this.hcatClient = hcatClient;
274 }
275
276 /**
277 * Get the HCatClient to talk to hcatalog server
278 *
279 * @return HCatClient to talk to hcatalog server
280 */
281 public HCatClient getHCatClient() {
282 return hcatClient;
283 }
284
285 @Override
286 public void destroy() {
287 try {
288 hcatClient.close();
289 }
290 catch (Exception ignore) {
291 XLog.getLog(HCatContext.class).warn("Error closing hcat client", ignore);
292 }
293 }
294
295 }
296
297 }