This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.dependency;
020
021import java.io.IOException;
022import java.net.URI;
023import java.net.URISyntaxException;
024import java.security.PrivilegedExceptionAction;
025import java.util.Arrays;
026import java.util.HashMap;
027import java.util.HashSet;
028import java.util.List;
029import java.util.Map;
030import java.util.Set;
031
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hive.conf.HiveConf;
034import org.apache.hadoop.hive.shims.ShimLoader;
035import org.apache.hadoop.security.UserGroupInformation;
036import org.apache.hive.hcatalog.api.ConnectionFailureException;
037import org.apache.hive.hcatalog.api.HCatClient;
038import org.apache.hive.hcatalog.api.HCatPartition;
039import org.apache.hive.hcatalog.common.HCatException;
040import org.apache.oozie.ErrorCode;
041import org.apache.oozie.action.hadoop.HCatLauncherURIHandler;
042import org.apache.oozie.action.hadoop.LauncherURIHandler;
043import org.apache.oozie.dependency.hcat.HCatMessageHandler;
044import org.apache.oozie.service.HCatAccessorException;
045import org.apache.oozie.service.HCatAccessorService;
046import org.apache.oozie.service.PartitionDependencyManagerService;
047import org.apache.oozie.service.Services;
048import org.apache.oozie.service.URIHandlerService;
049import org.apache.oozie.util.HCatURI;
050import org.apache.oozie.util.XLog;
051
052public class HCatURIHandler implements URIHandler {
053
054    private Set<String> supportedSchemes;
055    private Map<String, DependencyType> dependencyTypes;
056    private List<Class<?>> classesToShip;
057
058    @Override
059    public void init(Configuration conf) {
060        dependencyTypes = new HashMap<String, DependencyType>();
061        supportedSchemes = new HashSet<String>();
062        String[] schemes = conf.getStrings(URIHandlerService.URI_HANDLER_SUPPORTED_SCHEMES_PREFIX
063                + this.getClass().getSimpleName() + URIHandlerService.URI_HANDLER_SUPPORTED_SCHEMES_SUFFIX, "hcat");
064        supportedSchemes.addAll(Arrays.asList(schemes));
065        classesToShip = new HCatLauncherURIHandler().getClassesForLauncher();
066    }
067
068    @Override
069    public Set<String> getSupportedSchemes() {
070        return supportedSchemes;
071    }
072
073    @Override
074    public Class<? extends LauncherURIHandler> getLauncherURIHandlerClass() {
075        return HCatLauncherURIHandler.class;
076    }
077
078    @Override
079    public List<Class<?>> getClassesForLauncher() {
080        return classesToShip;
081    }
082
083    @Override
084    public DependencyType getDependencyType(URI uri) throws URIHandlerException {
085        DependencyType depType = DependencyType.PULL;
086        // Not initializing in constructor as this will be part of oozie.services.ext
087        // and will be initialized after URIHandlerService
088        HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
089        if (hcatService != null) {
090            depType = dependencyTypes.get(uri.getAuthority());
091            if (depType == null) {
092                depType = hcatService.isKnownPublisher(uri) ? DependencyType.PUSH : DependencyType.PULL;
093                dependencyTypes.put(uri.getAuthority(), depType);
094            }
095        }
096        return depType;
097    }
098
099    @Override
100    public void registerForNotification(URI uri, Configuration conf, String user, String actionID)
101            throws URIHandlerException {
102        HCatURI hcatURI;
103        try {
104            hcatURI = new HCatURI(uri);
105        }
106        catch (URISyntaxException e) {
107            throw new URIHandlerException(ErrorCode.E0906, uri, e);
108        }
109        HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
110        if (!hcatService.isRegisteredForNotification(hcatURI)) {
111            HCatClient client = getHCatClient(uri, conf);
112            try {
113                String topic = client.getMessageBusTopicName(hcatURI.getDb(), hcatURI.getTable());
114                if (topic == null) {
115                    return;
116                }
117                hcatService.registerForNotification(hcatURI, topic, new HCatMessageHandler(uri.getAuthority()));
118            }
119            catch (HCatException e) {
120                throw new HCatAccessorException(ErrorCode.E1501, e);
121            }
122            finally {
123                closeQuietly(client, null, true);
124            }
125        }
126        PartitionDependencyManagerService pdmService = Services.get().get(PartitionDependencyManagerService.class);
127        pdmService.addMissingDependency(hcatURI, actionID);
128    }
129
130    @Override
131    public boolean unregisterFromNotification(URI uri, String actionID) {
132        HCatURI hcatURI;
133        try {
134            hcatURI = new HCatURI(uri);
135        }
136        catch (URISyntaxException e) {
137            throw new RuntimeException(e); // Unexpected at this point
138        }
139        PartitionDependencyManagerService pdmService = Services.get().get(PartitionDependencyManagerService.class);
140        return pdmService.removeMissingDependency(hcatURI, actionID);
141    }
142
143    @Override
144    public Context getContext(URI uri, Configuration conf, String user, boolean readOnly)
145            throws URIHandlerException {
146        HCatContext context = null;
147        //read operations are allowed for any user in HCat and so accessing as Oozie server itself
148        //For write operations, perform doAs as user
149        if (readOnly) {
150            HCatClient client = getHCatClient(uri, conf);
151            context = new HCatContext(conf, user, client);
152        }
153        else {
154            HCatClientWithToken client = getHCatClient(uri, conf, user);
155            context = new HCatContext(conf, user, client);
156        }
157        return context;
158    }
159
160    @Override
161    public boolean exists(URI uri, Context context) throws URIHandlerException {
162        HCatClient client = ((HCatContext) context).getHCatClient();
163        return exists(uri, client, false);
164    }
165
166    @Override
167    public boolean exists(URI uri, Configuration conf, String user) throws URIHandlerException {
168        HCatClient client = getHCatClient(uri, conf);
169        return exists(uri, client, true);
170    }
171
172    @Override
173    public void delete(URI uri, Context context) throws URIHandlerException {
174        HCatClient client = ((HCatContext) context).getHCatClient();
175        try {
176            HCatURI hcatUri  = new HCatURI(uri);
177            client.dropPartitions(hcatUri.getDb(), hcatUri.getTable(), hcatUri.getPartitionMap(), true);
178        }
179        catch (URISyntaxException e) {
180            throw new HCatAccessorException(ErrorCode.E1501, e);
181        }
182        catch (HCatException e) {
183            throw new HCatAccessorException(ErrorCode.E1501, e);
184        }
185    }
186
187    @Override
188    public void delete(URI uri, Configuration conf, String user) throws URIHandlerException {
189        HCatClientWithToken client = null;
190        try {
191            HCatURI hcatUri = new HCatURI(uri);
192            client = getHCatClient(uri, conf, user);
193            client.getHCatClient().dropPartitions(hcatUri.getDb(), hcatUri.getTable(), hcatUri.getPartitionMap(), true);
194        }
195        catch (URISyntaxException e){
196            throw new HCatAccessorException(ErrorCode.E1501, e);
197        }
198        catch (HCatException e) {
199            throw new HCatAccessorException(ErrorCode.E1501, e);
200        }
201        finally {
202            closeQuietly(client.getHCatClient(), client.getDelegationToken(),true);
203        }
204    }
205
206    @Override
207    public String getURIWithDoneFlag(String uri, String doneFlag) throws URIHandlerException {
208        return uri;
209    }
210
211    @Override
212    public void validate(String uri) throws URIHandlerException {
213        try {
214            new HCatURI(uri); // will fail if uri syntax is incorrect
215        }
216        catch (URISyntaxException e) {
217            throw new URIHandlerException(ErrorCode.E0906, uri, e);
218        }
219
220    }
221
222    @Override
223    public void destroy() {
224
225    }
226
227    private HiveConf getHiveConf(URI uri, Configuration conf){
228        HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
229        if (hcatService.getHCatConf() != null) {
230            conf = hcatService.getHCatConf();
231        }
232        HiveConf hiveConf = new HiveConf(conf, this.getClass());
233        String serverURI = getMetastoreConnectURI(uri);
234        if (!serverURI.equals("")) {
235            hiveConf.set("hive.metastore.local", "false");
236        }
237        hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, serverURI);
238        return hiveConf;
239    }
240
241    private HCatClient getHCatClient(URI uri, Configuration conf) throws HCatAccessorException {
242        HiveConf hiveConf = getHiveConf(uri, conf);
243        try {
244            XLog.getLog(HCatURIHandler.class).info("Creating HCatClient for login_user [{0}] and server [{1}] ",
245                    UserGroupInformation.getLoginUser(), hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname));
246            return HCatClient.create(hiveConf);
247        }
248        catch (HCatException e) {
249            throw new HCatAccessorException(ErrorCode.E1501, e);
250        }
251        catch (IOException e) {
252            throw new HCatAccessorException(ErrorCode.E1501, e);
253        }
254    }
255
256    private HCatClientWithToken getHCatClient(URI uri, Configuration conf, String user)
257            throws HCatAccessorException {
258        final HiveConf hiveConf = getHiveConf(uri, conf);
259        String delegationToken = null;
260        try {
261            // Get UGI to doAs() as the specified user
262            UserGroupInformation ugi = UserGroupInformation.createProxyUser(user, UserGroupInformation.getLoginUser());
263            // Define the label for the Delegation Token for the HCat instance.
264            hiveConf.set("hive.metastore.token.signature", "HCatTokenSignature");
265            if (hiveConf.getBoolean(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname, false)) {
266                HCatClient tokenClient = null;
267                try {
268                    // Retrieve Delegation token for HCatalog
269                    tokenClient = HCatClient.create(hiveConf);
270                    delegationToken = tokenClient.getDelegationToken(user, UserGroupInformation.getLoginUser()
271                            .getUserName());
272                    // Store Delegation token in the UGI
273                    ShimLoader.getHadoopShims().setTokenStr(ugi, delegationToken,
274                            hiveConf.get("hive.metastore.token.signature"));
275                }
276                finally {
277                    if (tokenClient != null)
278                        tokenClient.close();
279                }
280            }
281            XLog.getLog(HCatURIHandler.class).info(
282                    "Creating HCatClient for user [{0}] login_user [{1}] and server [{2}] ", user,
283                    UserGroupInformation.getLoginUser(), hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname));
284            HCatClient hcatClient = ugi.doAs(new PrivilegedExceptionAction<HCatClient>() {
285                @Override
286                public HCatClient run() throws Exception {
287                    HCatClient client = HCatClient.create(hiveConf);
288                    return client;
289                }
290            });
291            HCatClientWithToken clientWithToken = new HCatClientWithToken(hcatClient, delegationToken);
292            return clientWithToken;
293        }
294        catch (IOException e) {
295            throw new HCatAccessorException(ErrorCode.E1501, e.getMessage());
296        }
297        catch (Exception e) {
298            throw new HCatAccessorException(ErrorCode.E1501, e.getMessage());
299        }
300    }
301
302    private String getMetastoreConnectURI(URI uri) {
303        String metastoreURI;
304        // For unit tests
305        if (uri.getAuthority().equals("unittest-local")) {
306            metastoreURI = "";
307        }
308        else {
309            // Hardcoding hcat to thrift mapping till support for webhcat(templeton)
310            // is added
311            metastoreURI = "thrift://" + uri.getAuthority();
312        }
313        return metastoreURI;
314    }
315
316    private boolean exists(URI uri, HCatClient client, boolean closeClient) throws HCatAccessorException {
317        try {
318            HCatURI hcatURI = new HCatURI(uri.toString());
319            List<HCatPartition> partitions = client.getPartitions(hcatURI.getDb(), hcatURI.getTable(),
320                    hcatURI.getPartitionMap());
321            return (partitions != null && !partitions.isEmpty());
322        }
323        catch (ConnectionFailureException e) {
324            throw new HCatAccessorException(ErrorCode.E1501, e);
325        }
326        catch (HCatException e) {
327            throw new HCatAccessorException(ErrorCode.E0902, e);
328        }
329        catch (URISyntaxException e) {
330            throw new HCatAccessorException(ErrorCode.E0902, e);
331        }
332        finally {
333            closeQuietly(client, null, closeClient);
334        }
335    }
336
337    private void closeQuietly(HCatClient client, String delegationToken, boolean close) {
338        if (close && client != null) {
339            try {
340                if(delegationToken != null && !delegationToken.isEmpty()) {
341                    client.cancelDelegationToken(delegationToken);
342                }
343                client.close();
344            }
345            catch (Exception ignore) {
346                XLog.getLog(HCatURIHandler.class).warn("Error closing hcat client", ignore);
347            }
348        }
349    }
350
351    class HCatClientWithToken {
352        private HCatClient hcatClient;
353        private String token;
354
355        public HCatClientWithToken(HCatClient client, String delegationToken) {
356            this.hcatClient = client;
357            this.token = delegationToken;
358        }
359
360        public HCatClient getHCatClient() {
361            return this.hcatClient;
362        }
363
364        public String getDelegationToken() {
365            return this.token;
366        }
367    }
368
369    static class HCatContext extends Context {
370
371        private HCatClient hcatClient;
372        private String delegationToken;
373
374        /**
375         * Create a HCatContext that can be used to access a hcat URI
376         *
377         * @param conf Configuration to access the URI
378         * @param user name of the user the URI should be accessed as
379         * @param hcatClient HCatClient to talk to hcatalog server
380         */
381        public HCatContext(Configuration conf, String user, HCatClient hcatClient) {
382            super(conf, user);
383            this.hcatClient = hcatClient;
384        }
385
386        public HCatContext(Configuration conf, String user, HCatClientWithToken hcatClient) {
387            super(conf, user);
388            this.hcatClient = hcatClient.getHCatClient();
389            this.delegationToken = hcatClient.getDelegationToken();
390        }
391
392        /**
393         * Get the HCatClient to talk to hcatalog server
394         *
395         * @return HCatClient to talk to hcatalog server
396         */
397        public HCatClient getHCatClient() {
398            return hcatClient;
399        }
400
401        /**
402         * Get the Delegation token to access HCat
403         *
404         * @return delegationToken
405         */
406        public String getDelegationToken() {
407            return delegationToken;
408        }
409
410        @Override
411        public void destroy() {
412            try {
413                hcatClient.close();
414                delegationToken = null;
415            }
416            catch (Exception ignore) {
417                XLog.getLog(HCatContext.class).warn("Error closing hcat client", ignore);
418            }
419        }
420
421    }
422
423}