001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.dependency;
020
021import java.io.IOException;
022import java.net.URI;
023import java.net.URISyntaxException;
024import java.security.PrivilegedExceptionAction;
025import java.util.Arrays;
026import java.util.HashMap;
027import java.util.HashSet;
028import java.util.List;
029import java.util.Map;
030import java.util.Set;
031
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hive.conf.HiveConf;
034import org.apache.hadoop.security.UserGroupInformation;
035import org.apache.hive.hcatalog.api.ConnectionFailureException;
036import org.apache.hive.hcatalog.api.HCatClient;
037import org.apache.hive.hcatalog.api.HCatPartition;
038import org.apache.hive.hcatalog.common.HCatException;
039import org.apache.oozie.ErrorCode;
040import org.apache.oozie.action.hadoop.HCatLauncherURIHandler;
041import org.apache.oozie.action.hadoop.LauncherURIHandler;
042import org.apache.oozie.dependency.hcat.HCatMessageHandler;
043import org.apache.oozie.service.HCatAccessorException;
044import org.apache.oozie.service.HCatAccessorService;
045import org.apache.oozie.service.PartitionDependencyManagerService;
046import org.apache.oozie.service.Services;
047import org.apache.oozie.service.URIHandlerService;
048import org.apache.oozie.util.HCatURI;
049import org.apache.oozie.util.XLog;
050import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier;
051import org.apache.hadoop.io.Text;
052import org.apache.hadoop.security.token.Token;
053
054public class HCatURIHandler implements URIHandler {
055
056    private Set<String> supportedSchemes;
057    private Map<String, DependencyType> dependencyTypes;
058    private List<Class<?>> classesToShip;
059
060    @Override
061    public void init(Configuration conf) {
062        dependencyTypes = new HashMap<String, DependencyType>();
063        supportedSchemes = new HashSet<String>();
064        String[] schemes = conf.getStrings(URIHandlerService.URI_HANDLER_SUPPORTED_SCHEMES_PREFIX
065                + this.getClass().getSimpleName() + URIHandlerService.URI_HANDLER_SUPPORTED_SCHEMES_SUFFIX, "hcat");
066        supportedSchemes.addAll(Arrays.asList(schemes));
067        classesToShip = new HCatLauncherURIHandler().getClassesForLauncher();
068    }
069
070    @Override
071    public Set<String> getSupportedSchemes() {
072        return supportedSchemes;
073    }
074
075    @Override
076    public Class<? extends LauncherURIHandler> getLauncherURIHandlerClass() {
077        return HCatLauncherURIHandler.class;
078    }
079
080    @Override
081    public List<Class<?>> getClassesForLauncher() {
082        return classesToShip;
083    }
084
085    @Override
086    public DependencyType getDependencyType(URI uri) throws URIHandlerException {
087        DependencyType depType = DependencyType.PULL;
088        // Not initializing in constructor as this will be part of oozie.services.ext
089        // and will be initialized after URIHandlerService
090        HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
091        if (hcatService != null) {
092            depType = dependencyTypes.get(uri.getAuthority());
093            if (depType == null) {
094                depType = hcatService.isKnownPublisher(uri) ? DependencyType.PUSH : DependencyType.PULL;
095                dependencyTypes.put(uri.getAuthority(), depType);
096            }
097        }
098        return depType;
099    }
100
101    @Override
102    public void registerForNotification(URI uri, Configuration conf, String user, String actionID)
103            throws URIHandlerException {
104        HCatURI hcatURI;
105        try {
106            hcatURI = new HCatURI(uri);
107        }
108        catch (URISyntaxException e) {
109            throw new URIHandlerException(ErrorCode.E0906, uri, e);
110        }
111        HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
112        if (!hcatService.isRegisteredForNotification(hcatURI)) {
113            HCatClient client = getHCatClient(uri, conf);
114            try {
115                String topic = client.getMessageBusTopicName(hcatURI.getDb(), hcatURI.getTable());
116                if (topic == null) {
117                    return;
118                }
119                hcatService.registerForNotification(hcatURI, topic, new HCatMessageHandler(uri.getAuthority()));
120            }
121            catch (HCatException e) {
122                throw new HCatAccessorException(ErrorCode.E1501, e);
123            }
124            finally {
125                closeQuietly(client, null, true);
126            }
127        }
128        PartitionDependencyManagerService pdmService = Services.get().get(PartitionDependencyManagerService.class);
129        pdmService.addMissingDependency(hcatURI, actionID);
130    }
131
132    @Override
133    public boolean unregisterFromNotification(URI uri, String actionID) {
134        HCatURI hcatURI;
135        try {
136            hcatURI = new HCatURI(uri);
137        }
138        catch (URISyntaxException e) {
139            throw new RuntimeException(e); // Unexpected at this point
140        }
141        PartitionDependencyManagerService pdmService = Services.get().get(PartitionDependencyManagerService.class);
142        return pdmService.removeMissingDependency(hcatURI, actionID);
143    }
144
145    @Override
146    public Context getContext(URI uri, Configuration conf, String user, boolean readOnly)
147            throws URIHandlerException {
148        HCatContext context = null;
149        //read operations are allowed for any user in HCat and so accessing as Oozie server itself
150        //For write operations, perform doAs as user
151        if (readOnly) {
152            HCatClient client = getHCatClient(uri, conf);
153            context = new HCatContext(conf, user, client);
154        }
155        else {
156            HCatClientWithToken client = getHCatClient(uri, conf, user);
157            context = new HCatContext(conf, user, client);
158        }
159        return context;
160    }
161
162    @Override
163    public boolean exists(URI uri, Context context) throws URIHandlerException {
164        HCatClient client = ((HCatContext) context).getHCatClient();
165        return exists(uri, client, false);
166    }
167
168    @Override
169    public boolean exists(URI uri, Configuration conf, String user) throws URIHandlerException {
170        HCatClient client = getHCatClient(uri, conf);
171        return exists(uri, client, true);
172    }
173
174    @Override
175    public void delete(URI uri, Context context) throws URIHandlerException {
176        HCatClient client = ((HCatContext) context).getHCatClient();
177        try {
178            HCatURI hcatUri  = new HCatURI(uri);
179            client.dropPartitions(hcatUri.getDb(), hcatUri.getTable(), hcatUri.getPartitionMap(), true);
180        }
181        catch (URISyntaxException e) {
182            throw new HCatAccessorException(ErrorCode.E1501, e);
183        }
184        catch (HCatException e) {
185            throw new HCatAccessorException(ErrorCode.E1501, e);
186        }
187    }
188
189    @Override
190    public void delete(URI uri, Configuration conf, String user) throws URIHandlerException {
191        HCatClientWithToken client = null;
192        try {
193            HCatURI hcatUri = new HCatURI(uri);
194            client = getHCatClient(uri, conf, user);
195            client.getHCatClient().dropPartitions(hcatUri.getDb(), hcatUri.getTable(), hcatUri.getPartitionMap(), true);
196        }
197        catch (URISyntaxException e){
198            throw new HCatAccessorException(ErrorCode.E1501, e);
199        }
200        catch (HCatException e) {
201            throw new HCatAccessorException(ErrorCode.E1501, e);
202        }
203        finally {
204            closeQuietly(client.getHCatClient(), client.getDelegationToken(),true);
205        }
206    }
207
208    @Override
209    public String getURIWithDoneFlag(String uri, String doneFlag) throws URIHandlerException {
210        return uri;
211    }
212
213    @Override
214    public String getURIWithoutDoneFlag(String uri, String doneFlag) throws URIHandlerException {
215        return uri;
216    }
217
218    @Override
219    public void validate(String uri) throws URIHandlerException {
220        try {
221            new HCatURI(uri); // will fail if uri syntax is incorrect
222        }
223        catch (URISyntaxException e) {
224            throw new URIHandlerException(ErrorCode.E0906, uri, e);
225        }
226
227    }
228
229    @Override
230    public void destroy() {
231
232    }
233
234    private HiveConf getHiveConf(URI uri, Configuration conf){
235        HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
236        if (hcatService.getHCatConf() != null) {
237            conf = hcatService.getHCatConf();
238        }
239        HiveConf hiveConf = new HiveConf(conf, this.getClass());
240        String serverURI = getMetastoreConnectURI(uri);
241        if (!serverURI.equals("")) {
242            hiveConf.set("hive.metastore.local", "false");
243        }
244        hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, serverURI);
245        return hiveConf;
246    }
247
248    private HCatClient getHCatClient(URI uri, Configuration conf) throws HCatAccessorException {
249        HiveConf hiveConf = getHiveConf(uri, conf);
250        try {
251            XLog.getLog(HCatURIHandler.class).info("Creating HCatClient for login_user [{0}] and server [{1}] ",
252                    UserGroupInformation.getLoginUser(), hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname));
253            return HCatClient.create(hiveConf);
254        }
255        catch (HCatException e) {
256            throw new HCatAccessorException(ErrorCode.E1501, e);
257        }
258        catch (IOException e) {
259            throw new HCatAccessorException(ErrorCode.E1501, e);
260        }
261    }
262
263    private HCatClientWithToken getHCatClient(URI uri, Configuration conf, String user)
264            throws HCatAccessorException {
265        final HiveConf hiveConf = getHiveConf(uri, conf);
266        String delegationToken = null;
267        try {
268            // Get UGI to doAs() as the specified user
269            UserGroupInformation ugi = UserGroupInformation.createProxyUser(user, UserGroupInformation.getLoginUser());
270            // Define the label for the Delegation Token for the HCat instance.
271            hiveConf.set("hive.metastore.token.signature", "HCatTokenSignature");
272            if (hiveConf.getBoolean(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname, false)) {
273                HCatClient tokenClient = null;
274                try {
275                    // Retrieve Delegation token for HCatalog
276                    tokenClient = HCatClient.create(hiveConf);
277                    delegationToken = tokenClient.getDelegationToken(user, UserGroupInformation.getLoginUser()
278                            .getUserName());
279                    // Store Delegation token in the UGI
280                    Token<DelegationTokenIdentifier> token = new Token<DelegationTokenIdentifier>();
281                    token.decodeFromUrlString(delegationToken);
282                    token.setService(new Text(hiveConf.get("hive.metastore.token.signature")));
283                    ugi.addToken(token);
284                }
285                finally {
286                    if (tokenClient != null) {
287                        tokenClient.close();
288                    }
289                }
290            }
291            XLog.getLog(HCatURIHandler.class).info(
292                    "Creating HCatClient for user [{0}] login_user [{1}] and server [{2}] ", user,
293                    UserGroupInformation.getLoginUser(), hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname));
294            HCatClient hcatClient = ugi.doAs(new PrivilegedExceptionAction<HCatClient>() {
295                @Override
296                public HCatClient run() throws Exception {
297                    HCatClient client = HCatClient.create(hiveConf);
298                    return client;
299                }
300            });
301            HCatClientWithToken clientWithToken = new HCatClientWithToken(hcatClient, delegationToken);
302            return clientWithToken;
303        }
304        catch (IOException e) {
305            throw new HCatAccessorException(ErrorCode.E1501, e.getMessage());
306        }
307        catch (Exception e) {
308            throw new HCatAccessorException(ErrorCode.E1501, e.getMessage());
309        }
310    }
311
312    private String getMetastoreConnectURI(URI uri) {
313        String metastoreURI;
314        // For unit tests
315        if (uri.getAuthority().equals("unittest-local")) {
316            metastoreURI = "";
317        }
318        else {
319            // Hardcoding hcat to thrift mapping till support for webhcat(templeton)
320            // is added
321            metastoreURI = "thrift://" + uri.getAuthority();
322        }
323        return metastoreURI;
324    }
325
326    private boolean exists(URI uri, HCatClient client, boolean closeClient) throws HCatAccessorException {
327        try {
328            HCatURI hcatURI = new HCatURI(uri.toString());
329            List<HCatPartition> partitions = client.getPartitions(hcatURI.getDb(), hcatURI.getTable(),
330                    hcatURI.getPartitionMap());
331            return (partitions != null && !partitions.isEmpty());
332        }
333        catch (ConnectionFailureException e) {
334            throw new HCatAccessorException(ErrorCode.E1501, e);
335        }
336        catch (HCatException e) {
337            throw new HCatAccessorException(ErrorCode.E0902, e);
338        }
339        catch (URISyntaxException e) {
340            throw new HCatAccessorException(ErrorCode.E0902, e);
341        }
342        finally {
343            closeQuietly(client, null, closeClient);
344        }
345    }
346
347    private void closeQuietly(HCatClient client, String delegationToken, boolean close) {
348        if (close && client != null) {
349            try {
350                if(delegationToken != null && !delegationToken.isEmpty()) {
351                    client.cancelDelegationToken(delegationToken);
352                }
353                client.close();
354            }
355            catch (Exception ignore) {
356                XLog.getLog(HCatURIHandler.class).warn("Error closing hcat client", ignore);
357            }
358        }
359    }
360
361    class HCatClientWithToken {
362        private HCatClient hcatClient;
363        private String token;
364
365        public HCatClientWithToken(HCatClient client, String delegationToken) {
366            this.hcatClient = client;
367            this.token = delegationToken;
368        }
369
370        public HCatClient getHCatClient() {
371            return this.hcatClient;
372        }
373
374        public String getDelegationToken() {
375            return this.token;
376        }
377    }
378
379    static class HCatContext extends Context {
380
381        private HCatClient hcatClient;
382        private String delegationToken;
383
384        /**
385         * Create a HCatContext that can be used to access a hcat URI
386         *
387         * @param conf Configuration to access the URI
388         * @param user name of the user the URI should be accessed as
389         * @param hcatClient HCatClient to talk to hcatalog server
390         */
391        public HCatContext(Configuration conf, String user, HCatClient hcatClient) {
392            super(conf, user);
393            this.hcatClient = hcatClient;
394        }
395
396        public HCatContext(Configuration conf, String user, HCatClientWithToken hcatClient) {
397            super(conf, user);
398            this.hcatClient = hcatClient.getHCatClient();
399            this.delegationToken = hcatClient.getDelegationToken();
400        }
401
402        /**
403         * Get the HCatClient to talk to hcatalog server
404         *
405         * @return HCatClient to talk to hcatalog server
406         */
407        public HCatClient getHCatClient() {
408            return hcatClient;
409        }
410
411        /**
412         * Get the Delegation token to access HCat
413         *
414         * @return delegationToken
415         */
416        public String getDelegationToken() {
417            return delegationToken;
418        }
419
420        @Override
421        public void destroy() {
422            try {
423                if (delegationToken != null && !delegationToken.isEmpty()) {
424                    hcatClient.cancelDelegationToken(delegationToken);
425                }
426                delegationToken = null;
427            }
428            catch (Exception ignore) {
429                XLog.getLog(HCatContext.class).warn("Error cancelling delegation token", ignore);
430            }
431            try {
432                hcatClient.close();
433            }
434            catch (Exception ignore) {
435                XLog.getLog(HCatContext.class).warn("Error closing hcat client", ignore);
436            }
437        }
438
439    }
440
441}