001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.dependency;
020
021import java.io.IOException;
022import java.net.URI;
023import java.net.URISyntaxException;
024import java.security.PrivilegedExceptionAction;
025import java.util.Arrays;
026import java.util.HashMap;
027import java.util.HashSet;
028import java.util.List;
029import java.util.Map;
030import java.util.Set;
031
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hive.conf.HiveConf;
034import org.apache.hadoop.security.UserGroupInformation;
035import org.apache.hive.hcatalog.api.ConnectionFailureException;
036import org.apache.hive.hcatalog.api.HCatClient;
037import org.apache.hive.hcatalog.api.HCatPartition;
038import org.apache.hive.hcatalog.common.HCatException;
039import org.apache.oozie.ErrorCode;
040import org.apache.oozie.action.hadoop.HCatLauncherURIHandler;
041import org.apache.oozie.action.hadoop.LauncherURIHandler;
042import org.apache.oozie.dependency.hcat.HCatMessageHandler;
043import org.apache.oozie.service.HCatAccessorException;
044import org.apache.oozie.service.HCatAccessorService;
045import org.apache.oozie.service.PartitionDependencyManagerService;
046import org.apache.oozie.service.Services;
047import org.apache.oozie.service.URIHandlerService;
048import org.apache.oozie.util.HCatURI;
049import org.apache.oozie.util.XLog;
050import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier;
051import org.apache.hadoop.io.Text;
052import org.apache.hadoop.security.token.Token;
053
054public class HCatURIHandler implements URIHandler {
055
056    private Set<String> supportedSchemes;
057    private Map<String, DependencyType> dependencyTypes;
058    private List<Class<?>> classesToShip;
059
060    @Override
061    public void init(Configuration conf) {
062        dependencyTypes = new HashMap<String, DependencyType>();
063        supportedSchemes = new HashSet<String>();
064        String[] schemes = conf.getStrings(URIHandlerService.URI_HANDLER_SUPPORTED_SCHEMES_PREFIX
065                + this.getClass().getSimpleName() + URIHandlerService.URI_HANDLER_SUPPORTED_SCHEMES_SUFFIX, "hcat");
066        supportedSchemes.addAll(Arrays.asList(schemes));
067        classesToShip = new HCatLauncherURIHandler().getClassesForLauncher();
068    }
069
070    @Override
071    public Set<String> getSupportedSchemes() {
072        return supportedSchemes;
073    }
074
075    @Override
076    public Class<? extends LauncherURIHandler> getLauncherURIHandlerClass() {
077        return HCatLauncherURIHandler.class;
078    }
079
080    @Override
081    public List<Class<?>> getClassesForLauncher() {
082        return classesToShip;
083    }
084
085    @Override
086    public DependencyType getDependencyType(URI uri) throws URIHandlerException {
087        DependencyType depType = DependencyType.PULL;
088        // Not initializing in constructor as this will be part of oozie.services.ext
089        // and will be initialized after URIHandlerService
090        HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
091        if (hcatService != null) {
092            depType = dependencyTypes.get(uri.getAuthority());
093            if (depType == null) {
094                depType = hcatService.isKnownPublisher(uri) ? DependencyType.PUSH : DependencyType.PULL;
095                dependencyTypes.put(uri.getAuthority(), depType);
096            }
097        }
098        return depType;
099    }
100
101    @Override
102    public void registerForNotification(URI uri, Configuration conf, String user, String actionID)
103            throws URIHandlerException {
104        HCatURI hcatURI;
105        try {
106            hcatURI = new HCatURI(uri);
107        }
108        catch (URISyntaxException e) {
109            throw new URIHandlerException(ErrorCode.E0906, uri, e);
110        }
111        HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
112        if (!hcatService.isRegisteredForNotification(hcatURI)) {
113            HCatClient client = getHCatClient(uri, conf);
114            try {
115                String topic = client.getMessageBusTopicName(hcatURI.getDb(), hcatURI.getTable());
116                if (topic == null) {
117                    return;
118                }
119                hcatService.registerForNotification(hcatURI, topic, new HCatMessageHandler(uri.getAuthority()));
120            }
121            catch (HCatException e) {
122                throw new HCatAccessorException(ErrorCode.E1501, e);
123            }
124            finally {
125                closeQuietly(client, null, true);
126            }
127        }
128        PartitionDependencyManagerService pdmService = Services.get().get(PartitionDependencyManagerService.class);
129        pdmService.addMissingDependency(hcatURI, actionID);
130    }
131
132    @Override
133    public boolean unregisterFromNotification(URI uri, String actionID) {
134        HCatURI hcatURI;
135        try {
136            hcatURI = new HCatURI(uri);
137        }
138        catch (URISyntaxException e) {
139            throw new RuntimeException(e); // Unexpected at this point
140        }
141        PartitionDependencyManagerService pdmService = Services.get().get(PartitionDependencyManagerService.class);
142        return pdmService.removeMissingDependency(hcatURI, actionID);
143    }
144
145    @Override
146    public Context getContext(URI uri, Configuration conf, String user, boolean readOnly)
147            throws URIHandlerException {
148        HCatContext context = null;
149        //read operations are allowed for any user in HCat and so accessing as Oozie server itself
150        //For write operations, perform doAs as user
151        if (readOnly) {
152            HCatClient client = getHCatClient(uri, conf);
153            context = new HCatContext(conf, user, client);
154        }
155        else {
156            HCatClientWithToken client = getHCatClient(uri, conf, user);
157            context = new HCatContext(conf, user, client);
158        }
159        return context;
160    }
161
162    @Override
163    public boolean exists(URI uri, Context context) throws URIHandlerException {
164        HCatClient client = ((HCatContext) context).getHCatClient();
165        return exists(uri, client, false);
166    }
167
168    @Override
169    public boolean exists(URI uri, Configuration conf, String user) throws URIHandlerException {
170        HCatClient client = getHCatClient(uri, conf);
171        return exists(uri, client, true);
172    }
173
174    @Override
175    public void delete(URI uri, Context context) throws URIHandlerException {
176        HCatClient client = ((HCatContext) context).getHCatClient();
177        try {
178            HCatURI hcatUri  = new HCatURI(uri);
179            if (!hcatUri.getPartitionMap().isEmpty()) {
180                client.dropPartitions(hcatUri.getDb(), hcatUri.getTable(), hcatUri.getPartitionMap(), true);
181            } else {
182                client.dropTable(hcatUri.getDb(), hcatUri.getTable(), true);
183            }
184        }
185        catch (URISyntaxException e) {
186            throw new HCatAccessorException(ErrorCode.E1501, e);
187        }
188        catch (HCatException e) {
189            throw new HCatAccessorException(ErrorCode.E1501, e);
190        }
191    }
192
193    @Override
194    public void delete(URI uri, Configuration conf, String user) throws URIHandlerException {
195        HCatClientWithToken client = null;
196        HCatClient hCatClient = null;
197        try {
198            HCatURI hcatUri = new HCatURI(uri);
199            client = getHCatClient(uri, conf, user);
200            hCatClient = client.getHCatClient();
201            if (!hcatUri.getPartitionMap().isEmpty()) {
202                hCatClient.dropPartitions(hcatUri.getDb(), hcatUri.getTable(), hcatUri.getPartitionMap(), true);
203            } else {
204                hCatClient.dropTable(hcatUri.getDb(), hcatUri.getTable(), true);
205            }
206        }
207        catch (URISyntaxException e){
208            throw new HCatAccessorException(ErrorCode.E1501, e);
209        }
210        catch (HCatException e) {
211            throw new HCatAccessorException(ErrorCode.E1501, e);
212        }
213        finally {
214            closeQuietly(hCatClient, client != null ? client.getDelegationToken() : null, true);
215        }
216    }
217
218    @Override
219    public String getURIWithDoneFlag(String uri, String doneFlag) throws URIHandlerException {
220        return uri;
221    }
222
223    @Override
224    public String getURIWithoutDoneFlag(String uri, String doneFlag) throws URIHandlerException {
225        return uri;
226    }
227
228    @Override
229    public void validate(String uri) throws URIHandlerException {
230        try {
231            new HCatURI(uri); // will fail if uri syntax is incorrect
232        }
233        catch (URISyntaxException e) {
234            throw new URIHandlerException(ErrorCode.E0906, uri, e);
235        }
236
237    }
238
239    @Override
240    public void destroy() {
241
242    }
243
244    private HiveConf getHiveConf(URI uri, Configuration conf) throws HCatAccessorException {
245        HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
246        if (hcatService.getHCatConf() != null) {
247            conf = hcatService.getHCatConf();
248        }
249        HiveConf hiveConf = new HiveConf(conf, this.getClass());
250        String serverURI = getMetastoreConnectURI(uri);
251        if (!serverURI.equals("")) {
252            hiveConf.set("hive.metastore.local", "false");
253        }
254        hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, serverURI);
255        return hiveConf;
256    }
257
258    private HCatClient getHCatClient(URI uri, Configuration conf) throws HCatAccessorException {
259        HiveConf hiveConf = getHiveConf(uri, conf);
260        try {
261            XLog.getLog(HCatURIHandler.class).info("Creating HCatClient for login_user [{0}] and server [{1}] ",
262                    UserGroupInformation.getLoginUser(), hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname));
263            return HCatClient.create(hiveConf);
264        }
265        catch (HCatException e) {
266            throw new HCatAccessorException(ErrorCode.E1501, e);
267        }
268        catch (IOException e) {
269            throw new HCatAccessorException(ErrorCode.E1501, e);
270        }
271    }
272
273    private HCatClientWithToken getHCatClient(URI uri, Configuration conf, String user)
274            throws HCatAccessorException {
275        final HiveConf hiveConf = getHiveConf(uri, conf);
276        String delegationToken = null;
277        try {
278            // Get UGI to doAs() as the specified user
279            UserGroupInformation ugi = UserGroupInformation.createProxyUser(user, UserGroupInformation.getLoginUser());
280            // Define the label for the Delegation Token for the HCat instance.
281            hiveConf.set("hive.metastore.token.signature", "HCatTokenSignature");
282            if (hiveConf.getBoolean(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname, false)) {
283                HCatClient tokenClient = null;
284                try {
285                    // Retrieve Delegation token for HCatalog
286                    tokenClient = HCatClient.create(hiveConf);
287                    delegationToken = tokenClient.getDelegationToken(user, UserGroupInformation.getLoginUser()
288                            .getUserName());
289                    // Store Delegation token in the UGI
290                    Token<DelegationTokenIdentifier> token = new Token<DelegationTokenIdentifier>();
291                    token.decodeFromUrlString(delegationToken);
292                    token.setService(new Text(hiveConf.get("hive.metastore.token.signature")));
293                    ugi.addToken(token);
294                }
295                finally {
296                    if (tokenClient != null) {
297                        tokenClient.close();
298                    }
299                }
300            }
301            XLog.getLog(HCatURIHandler.class).info(
302                    "Creating HCatClient for user [{0}] login_user [{1}] and server [{2}] ", user,
303                    UserGroupInformation.getLoginUser(), hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname));
304            HCatClient hcatClient = ugi.doAs(new PrivilegedExceptionAction<HCatClient>() {
305                @Override
306                public HCatClient run() throws Exception {
307                    HCatClient client = HCatClient.create(hiveConf);
308                    return client;
309                }
310            });
311            HCatClientWithToken clientWithToken = new HCatClientWithToken(hcatClient, delegationToken);
312            return clientWithToken;
313        }
314        catch (IOException e) {
315            throw new HCatAccessorException(ErrorCode.E1501, e.getMessage());
316        }
317        catch (Exception e) {
318            throw new HCatAccessorException(ErrorCode.E1501, e.getMessage());
319        }
320    }
321
322    private String getMetastoreConnectURI(URI uri) throws HCatAccessorException {
323        String metastoreURI;
324        // For unit tests
325        if (uri.getAuthority().equals("unittest-local")) {
326            metastoreURI = "";
327        }
328        else {
329            // Hardcoding hcat to thrift mapping till support for webhcat(templeton)
330            // is added
331            HCatURI hCatURI;
332            try {
333                hCatURI = new HCatURI(uri.toString());
334                metastoreURI = hCatURI.getServerEndPointWithScheme("thrift");
335            } catch (URISyntaxException e) {
336                throw new HCatAccessorException(ErrorCode.E0902, e);
337            }
338        }
339        return metastoreURI;
340    }
341
342    private boolean exists(URI uri, HCatClient client, boolean closeClient) throws HCatAccessorException {
343        try {
344            boolean flag;
345            HCatURI hcatURI = new HCatURI(uri.toString());
346            if (!hcatURI.getPartitionMap().isEmpty()) {
347                List<HCatPartition> partitions = client.getPartitions(hcatURI.getDb(), hcatURI.getTable(),
348                        hcatURI.getPartitionMap());
349                flag = partitions != null && !partitions.isEmpty();
350            } else {
351                List<String> tables = client.listTableNamesByPattern(hcatURI.getDb(), hcatURI.getTable());
352                flag = tables != null && !tables.isEmpty();
353            }
354            return (flag);
355        }
356        catch (ConnectionFailureException e) {
357            throw new HCatAccessorException(ErrorCode.E1501, e);
358        }
359        catch (HCatException e) {
360            throw new HCatAccessorException(ErrorCode.E0902, e);
361        }
362        catch (URISyntaxException e) {
363            throw new HCatAccessorException(ErrorCode.E0902, e);
364        }
365        finally {
366            closeQuietly(client, null, closeClient);
367        }
368    }
369
370    private void closeQuietly(HCatClient client, String delegationToken, boolean close) {
371        if (close && client != null) {
372            try {
373                if(delegationToken != null && !delegationToken.isEmpty()) {
374                    client.cancelDelegationToken(delegationToken);
375                }
376                client.close();
377            }
378            catch (Exception ignore) {
379                XLog.getLog(HCatURIHandler.class).warn("Error closing hcat client", ignore);
380            }
381        }
382    }
383
384    class HCatClientWithToken {
385        private HCatClient hcatClient;
386        private String token;
387
388        public HCatClientWithToken(HCatClient client, String delegationToken) {
389            this.hcatClient = client;
390            this.token = delegationToken;
391        }
392
393        public HCatClient getHCatClient() {
394            return this.hcatClient;
395        }
396
397        public String getDelegationToken() {
398            return this.token;
399        }
400    }
401
402    static class HCatContext extends Context {
403
404        private HCatClient hcatClient;
405        private String delegationToken;
406
407        /**
408         * Create a HCatContext that can be used to access a hcat URI
409         *
410         * @param conf Configuration to access the URI
411         * @param user name of the user the URI should be accessed as
412         * @param hcatClient HCatClient to talk to hcatalog server
413         */
414        public HCatContext(Configuration conf, String user, HCatClient hcatClient) {
415            super(conf, user);
416            this.hcatClient = hcatClient;
417        }
418
419        public HCatContext(Configuration conf, String user, HCatClientWithToken hcatClient) {
420            super(conf, user);
421            this.hcatClient = hcatClient.getHCatClient();
422            this.delegationToken = hcatClient.getDelegationToken();
423        }
424
425        /**
426         * Get the HCatClient to talk to hcatalog server
427         *
428         * @return HCatClient to talk to hcatalog server
429         */
430        public HCatClient getHCatClient() {
431            return hcatClient;
432        }
433
434        /**
435         * Get the Delegation token to access HCat
436         *
437         * @return delegationToken
438         */
439        public String getDelegationToken() {
440            return delegationToken;
441        }
442
443        @Override
444        public void destroy() {
445            try {
446                if (delegationToken != null && !delegationToken.isEmpty()) {
447                    hcatClient.cancelDelegationToken(delegationToken);
448                }
449                delegationToken = null;
450            }
451            catch (Exception ignore) {
452                XLog.getLog(HCatContext.class).warn("Error cancelling delegation token", ignore);
453            }
454            try {
455                hcatClient.close();
456            }
457            catch (Exception ignore) {
458                XLog.getLog(HCatContext.class).warn("Error closing hcat client", ignore);
459            }
460        }
461
462    }
463
464}