001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.dependency;
019
020import java.io.IOException;
021import java.net.URI;
022import java.net.URISyntaxException;
023import java.util.Arrays;
024import java.util.HashMap;
025import java.util.HashSet;
026import java.util.List;
027import java.util.Map;
028import java.util.Set;
029
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hive.conf.HiveConf;
032import org.apache.hadoop.security.UserGroupInformation;
033import org.apache.hive.hcatalog.api.ConnectionFailureException;
034import org.apache.hive.hcatalog.api.HCatClient;
035import org.apache.hive.hcatalog.api.HCatPartition;
036import org.apache.hive.hcatalog.common.HCatException;
037import org.apache.oozie.ErrorCode;
038import org.apache.oozie.action.hadoop.HCatLauncherURIHandler;
039import org.apache.oozie.action.hadoop.LauncherURIHandler;
040import org.apache.oozie.dependency.hcat.HCatMessageHandler;
041import org.apache.oozie.service.HCatAccessorException;
042import org.apache.oozie.service.HCatAccessorService;
043import org.apache.oozie.service.PartitionDependencyManagerService;
044import org.apache.oozie.service.Services;
045import org.apache.oozie.service.URIHandlerService;
046import org.apache.oozie.util.HCatURI;
047import org.apache.oozie.util.XLog;
048
049public class HCatURIHandler implements URIHandler {
050
051    private Set<String> supportedSchemes;
052    private Map<String, DependencyType> dependencyTypes;
053    private List<Class<?>> classesToShip;
054
055    @Override
056    public void init(Configuration conf) {
057        dependencyTypes = new HashMap<String, DependencyType>();
058        supportedSchemes = new HashSet<String>();
059        String[] schemes = conf.getStrings(URIHandlerService.URI_HANDLER_SUPPORTED_SCHEMES_PREFIX
060                + this.getClass().getSimpleName() + URIHandlerService.URI_HANDLER_SUPPORTED_SCHEMES_SUFFIX, "hcat");
061        supportedSchemes.addAll(Arrays.asList(schemes));
062        classesToShip = new HCatLauncherURIHandler().getClassesForLauncher();
063    }
064
065    @Override
066    public Set<String> getSupportedSchemes() {
067        return supportedSchemes;
068    }
069
070    @Override
071    public Class<? extends LauncherURIHandler> getLauncherURIHandlerClass() {
072        return HCatLauncherURIHandler.class;
073    }
074
075    @Override
076    public List<Class<?>> getClassesForLauncher() {
077        return classesToShip;
078    }
079
080    @Override
081    public DependencyType getDependencyType(URI uri) throws URIHandlerException {
082        DependencyType depType = DependencyType.PULL;
083        // Not initializing in constructor as this will be part of oozie.services.ext
084        // and will be initialized after URIHandlerService
085        HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
086        if (hcatService != null) {
087            depType = dependencyTypes.get(uri.getAuthority());
088            if (depType == null) {
089                depType = hcatService.isKnownPublisher(uri) ? DependencyType.PUSH : DependencyType.PULL;
090                dependencyTypes.put(uri.getAuthority(), depType);
091            }
092        }
093        return depType;
094    }
095
096    @Override
097    public void registerForNotification(URI uri, Configuration conf, String user, String actionID)
098            throws URIHandlerException {
099        HCatURI hcatURI;
100        try {
101            hcatURI = new HCatURI(uri);
102        }
103        catch (URISyntaxException e) {
104            throw new URIHandlerException(ErrorCode.E0906, uri, e);
105        }
106        HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
107        if (!hcatService.isRegisteredForNotification(hcatURI)) {
108            HCatClient client = getHCatClient(uri, conf, user);
109            try {
110                String topic = client.getMessageBusTopicName(hcatURI.getDb(), hcatURI.getTable());
111                if (topic == null) {
112                    return;
113                }
114                hcatService.registerForNotification(hcatURI, topic, new HCatMessageHandler(uri.getAuthority()));
115            }
116            catch (HCatException e) {
117                throw new HCatAccessorException(ErrorCode.E1501, e);
118            }
119            finally {
120                closeQuietly(client, true);
121            }
122        }
123        PartitionDependencyManagerService pdmService = Services.get().get(PartitionDependencyManagerService.class);
124        pdmService.addMissingDependency(hcatURI, actionID);
125    }
126
127    @Override
128    public boolean unregisterFromNotification(URI uri, String actionID) {
129        HCatURI hcatURI;
130        try {
131            hcatURI = new HCatURI(uri);
132        }
133        catch (URISyntaxException e) {
134            throw new RuntimeException(e); // Unexpected at this point
135        }
136        PartitionDependencyManagerService pdmService = Services.get().get(PartitionDependencyManagerService.class);
137        return pdmService.removeMissingDependency(hcatURI, actionID);
138    }
139
140    @Override
141    public Context getContext(URI uri, Configuration conf, String user) throws URIHandlerException {
142        HCatClient client = getHCatClient(uri, conf, user);
143        return new HCatContext(conf, user, client);
144    }
145
146    @Override
147    public boolean exists(URI uri, Context context) throws URIHandlerException {
148        HCatClient client = ((HCatContext) context).getHCatClient();
149        return exists(uri, client, false);
150    }
151
152    @Override
153    public boolean exists(URI uri, Configuration conf, String user) throws URIHandlerException {
154        HCatClient client = getHCatClient(uri, conf, user);
155        return exists(uri, client, true);
156    }
157
158    @Override
159    public String getURIWithDoneFlag(String uri, String doneFlag) throws URIHandlerException {
160        return uri;
161    }
162
163    @Override
164    public void validate(String uri) throws URIHandlerException {
165        try {
166            new HCatURI(uri); // will fail if uri syntax is incorrect
167        }
168        catch (URISyntaxException e) {
169            throw new URIHandlerException(ErrorCode.E0906, uri, e);
170        }
171
172    }
173
174    @Override
175    public void destroy() {
176
177    }
178
179    private HCatClient getHCatClient(URI uri, Configuration conf, String user) throws HCatAccessorException {
180        HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
181        if (hcatService.getHCatConf() != null) {
182            conf = hcatService.getHCatConf();
183        }
184        final HiveConf hiveConf = new HiveConf(conf, this.getClass());
185        String serverURI = getMetastoreConnectURI(uri);
186        if (!serverURI.equals("")) {
187            hiveConf.set("hive.metastore.local", "false");
188        }
189        hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, serverURI);
190        try {
191            XLog.getLog(HCatURIHandler.class).info(
192                    "Creating HCatClient for user [{0}] login_user [{1}] and server [{2}] ", user,
193                    UserGroupInformation.getLoginUser(), serverURI);
194
195            // HiveMetastoreClient (hive 0.9) currently does not work if UGI has doAs
196            // We are good to connect as the oozie user since listPartitions does not require
197            // authorization
198            /*
199            UserGroupInformation ugi = ugiService.getProxyUser(user);
200            return ugi.doAs(new PrivilegedExceptionAction<HCatClient>() {
201                public HCatClient run() throws Exception {
202                    return HCatClient.create(hiveConf);
203                }
204            });
205            */
206
207            return HCatClient.create(hiveConf);
208        }
209        catch (HCatException e) {
210            throw new HCatAccessorException(ErrorCode.E1501, e);
211        }
212        catch (IOException e) {
213            throw new HCatAccessorException(ErrorCode.E1501, e);
214        }
215
216    }
217
218    private String getMetastoreConnectURI(URI uri) {
219        String metastoreURI;
220        // For unit tests
221        if (uri.getAuthority().equals("unittest-local")) {
222            metastoreURI = "";
223        }
224        else {
225            // Hardcoding hcat to thrift mapping till support for webhcat(templeton)
226            // is added
227            metastoreURI = "thrift://" + uri.getAuthority();
228        }
229        return metastoreURI;
230    }
231
232    private boolean exists(URI uri, HCatClient client, boolean closeClient) throws HCatAccessorException {
233        try {
234            HCatURI hcatURI = new HCatURI(uri.toString());
235            List<HCatPartition> partitions = client.getPartitions(hcatURI.getDb(), hcatURI.getTable(),
236                    hcatURI.getPartitionMap());
237            return (partitions != null && !partitions.isEmpty());
238        }
239        catch (ConnectionFailureException e) {
240            throw new HCatAccessorException(ErrorCode.E1501, e);
241        }
242        catch (HCatException e) {
243            throw new HCatAccessorException(ErrorCode.E0902, e);
244        }
245        catch (URISyntaxException e) {
246            throw new HCatAccessorException(ErrorCode.E0902, e);
247        }
248        finally {
249            closeQuietly(client, closeClient);
250        }
251    }
252
253    private void closeQuietly(HCatClient client, boolean close) {
254        if (close && client != null) {
255            try {
256                client.close();
257            }
258            catch (Exception ignore) {
259                XLog.getLog(HCatURIHandler.class).warn("Error closing hcat client", ignore);
260            }
261        }
262    }
263
264    static class HCatContext extends Context {
265
266        private HCatClient hcatClient;
267
268        /**
269         * Create a HCatContext that can be used to access a hcat URI
270         *
271         * @param conf Configuration to access the URI
272         * @param user name of the user the URI should be accessed as
273         * @param hcatClient HCatClient to talk to hcatalog server
274         */
275        public HCatContext(Configuration conf, String user, HCatClient hcatClient) {
276            super(conf, user);
277            this.hcatClient = hcatClient;
278        }
279
280        /**
281         * Get the HCatClient to talk to hcatalog server
282         *
283         * @return HCatClient to talk to hcatalog server
284         */
285        public HCatClient getHCatClient() {
286            return hcatClient;
287        }
288
289        @Override
290        public void destroy() {
291            try {
292                hcatClient.close();
293            }
294            catch (Exception ignore) {
295                XLog.getLog(HCatContext.class).warn("Error closing hcat client", ignore);
296            }
297        }
298
299    }
300
301}