001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.dependency;
019    
020    import java.io.IOException;
021    import java.net.URI;
022    import java.net.URISyntaxException;
023    import java.util.Arrays;
024    import java.util.HashMap;
025    import java.util.HashSet;
026    import java.util.List;
027    import java.util.Map;
028    import java.util.Set;
029    
030    import org.apache.hadoop.conf.Configuration;
031    import org.apache.hadoop.hive.conf.HiveConf;
032    import org.apache.hadoop.security.UserGroupInformation;
033    import org.apache.hcatalog.api.ConnectionFailureException;
034    import org.apache.hcatalog.api.HCatClient;
035    import org.apache.hcatalog.api.HCatPartition;
036    import org.apache.hcatalog.common.HCatException;
037    import org.apache.oozie.ErrorCode;
038    import org.apache.oozie.action.hadoop.HCatLauncherURIHandler;
039    import org.apache.oozie.action.hadoop.LauncherURIHandler;
040    import org.apache.oozie.dependency.hcat.HCatMessageHandler;
041    import org.apache.oozie.service.HCatAccessorException;
042    import org.apache.oozie.service.HCatAccessorService;
043    import org.apache.oozie.service.PartitionDependencyManagerService;
044    import org.apache.oozie.service.Services;
045    import org.apache.oozie.service.URIHandlerService;
046    import org.apache.oozie.util.HCatURI;
047    import org.apache.oozie.util.XLog;
048    
049    public class HCatURIHandler implements URIHandler {
050    
051        private Set<String> supportedSchemes;
052        private Map<String, DependencyType> dependencyTypes;
053        private List<Class<?>> classesToShip;
054    
055        @Override
056        public void init(Configuration conf) {
057            dependencyTypes = new HashMap<String, DependencyType>();
058            supportedSchemes = new HashSet<String>();
059            String[] schemes = conf.getStrings(URIHandlerService.URI_HANDLER_SUPPORTED_SCHEMES_PREFIX
060                    + this.getClass().getSimpleName() + URIHandlerService.URI_HANDLER_SUPPORTED_SCHEMES_SUFFIX, "hcat");
061            supportedSchemes.addAll(Arrays.asList(schemes));
062            classesToShip = new HCatLauncherURIHandler().getClassesForLauncher();
063        }
064    
065        @Override
066        public Set<String> getSupportedSchemes() {
067            return supportedSchemes;
068        }
069    
070        @Override
071        public Class<? extends LauncherURIHandler> getLauncherURIHandlerClass() {
072            return HCatLauncherURIHandler.class;
073        }
074    
075        @Override
076        public List<Class<?>> getClassesForLauncher() {
077            return classesToShip;
078        }
079    
080        @Override
081        public DependencyType getDependencyType(URI uri) throws URIHandlerException {
082            DependencyType depType = DependencyType.PULL;
083            // Not initializing in constructor as this will be part of oozie.services.ext
084            // and will be initialized after URIHandlerService
085            HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
086            if (hcatService != null) {
087                depType = dependencyTypes.get(uri.getAuthority());
088                if (depType == null) {
089                    depType = hcatService.isKnownPublisher(uri) ? DependencyType.PUSH : DependencyType.PULL;
090                    dependencyTypes.put(uri.getAuthority(), depType);
091                }
092            }
093            return depType;
094        }
095    
096        @Override
097        public void registerForNotification(URI uri, Configuration conf, String user, String actionID)
098                throws URIHandlerException {
099            HCatURI hcatURI;
100            try {
101                hcatURI = new HCatURI(uri);
102            }
103            catch (URISyntaxException e) {
104                throw new URIHandlerException(ErrorCode.E0906, uri, e);
105            }
106            HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
107            if (!hcatService.isRegisteredForNotification(hcatURI)) {
108                HCatClient client = getHCatClient(uri, conf, user);
109                try {
110                    String topic = client.getMessageBusTopicName(hcatURI.getDb(), hcatURI.getTable());
111                    if (topic == null) {
112                        return;
113                    }
114                    hcatService.registerForNotification(hcatURI, topic, new HCatMessageHandler(uri.getAuthority()));
115                }
116                catch (HCatException e) {
117                    throw new HCatAccessorException(ErrorCode.E1501, e);
118                }
119                finally {
120                    closeQuietly(client, true);
121                }
122            }
123            PartitionDependencyManagerService pdmService = Services.get().get(PartitionDependencyManagerService.class);
124            pdmService.addMissingDependency(hcatURI, actionID);
125        }
126    
127        @Override
128        public boolean unregisterFromNotification(URI uri, String actionID) {
129            HCatURI hcatURI;
130            try {
131                hcatURI = new HCatURI(uri);
132            }
133            catch (URISyntaxException e) {
134                throw new RuntimeException(e); // Unexpected at this point
135            }
136            PartitionDependencyManagerService pdmService = Services.get().get(PartitionDependencyManagerService.class);
137            return pdmService.removeMissingDependency(hcatURI, actionID);
138        }
139    
140        @Override
141        public Context getContext(URI uri, Configuration conf, String user) throws URIHandlerException {
142            HCatClient client = getHCatClient(uri, conf, user);
143            return new HCatContext(conf, user, client);
144        }
145    
146        @Override
147        public boolean exists(URI uri, Context context) throws URIHandlerException {
148            HCatClient client = ((HCatContext) context).getHCatClient();
149            return exists(uri, client, false);
150        }
151    
152        @Override
153        public boolean exists(URI uri, Configuration conf, String user) throws URIHandlerException {
154            HCatClient client = getHCatClient(uri, conf, user);
155            return exists(uri, client, true);
156        }
157    
158        @Override
159        public String getURIWithDoneFlag(String uri, String doneFlag) throws URIHandlerException {
160            return uri;
161        }
162    
163        @Override
164        public void validate(String uri) throws URIHandlerException {
165            try {
166                new HCatURI(uri); // will fail if uri syntax is incorrect
167            }
168            catch (URISyntaxException e) {
169                throw new URIHandlerException(ErrorCode.E0906, uri, e);
170            }
171    
172        }
173    
174        @Override
175        public void destroy() {
176    
177        }
178    
179        private HCatClient getHCatClient(URI uri, Configuration conf, String user) throws HCatAccessorException {
180            final HiveConf hiveConf = new HiveConf(conf, this.getClass());
181            String serverURI = getMetastoreConnectURI(uri);
182            if (!serverURI.equals("")) {
183                hiveConf.set("hive.metastore.local", "false");
184            }
185            hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, serverURI);
186            try {
187                XLog.getLog(HCatURIHandler.class).info(
188                        "Creating HCatClient for user [{0}] login_user [{1}] and server [{2}] ", user,
189                        UserGroupInformation.getLoginUser(), serverURI);
190    
191                // HiveMetastoreClient (hive 0.9) currently does not work if UGI has doAs
192                // We are good to connect as the oozie user since listPartitions does not require
193                // authorization
194                /*
195                UserGroupInformation ugi = ugiService.getProxyUser(user);
196                return ugi.doAs(new PrivilegedExceptionAction<HCatClient>() {
197                    public HCatClient run() throws Exception {
198                        return HCatClient.create(hiveConf);
199                    }
200                });
201                */
202    
203                return HCatClient.create(hiveConf);
204            }
205            catch (HCatException e) {
206                throw new HCatAccessorException(ErrorCode.E1501, e);
207            }
208            catch (IOException e) {
209                throw new HCatAccessorException(ErrorCode.E1501, e);
210            }
211    
212        }
213    
214        private String getMetastoreConnectURI(URI uri) {
215            String metastoreURI;
216            // For unit tests
217            if (uri.getAuthority().equals("unittest-local")) {
218                metastoreURI = "";
219            }
220            else {
221                // Hardcoding hcat to thrift mapping till support for webhcat(templeton)
222                // is added
223                metastoreURI = "thrift://" + uri.getAuthority();
224            }
225            return metastoreURI;
226        }
227    
228        private boolean exists(URI uri, HCatClient client, boolean closeClient) throws HCatAccessorException {
229            try {
230                HCatURI hcatURI = new HCatURI(uri.toString());
231                List<HCatPartition> partitions = client.getPartitions(hcatURI.getDb(), hcatURI.getTable(),
232                        hcatURI.getPartitionMap());
233                return (partitions != null && !partitions.isEmpty());
234            }
235            catch (ConnectionFailureException e) {
236                throw new HCatAccessorException(ErrorCode.E1501, e);
237            }
238            catch (HCatException e) {
239                throw new HCatAccessorException(ErrorCode.E0902, e);
240            }
241            catch (URISyntaxException e) {
242                throw new HCatAccessorException(ErrorCode.E0902, e);
243            }
244            finally {
245                closeQuietly(client, closeClient);
246            }
247        }
248    
249        private void closeQuietly(HCatClient client, boolean close) {
250            if (close && client != null) {
251                try {
252                    client.close();
253                }
254                catch (Exception ignore) {
255                    XLog.getLog(HCatURIHandler.class).warn("Error closing hcat client", ignore);
256                }
257            }
258        }
259    
260        static class HCatContext extends Context {
261    
262            private HCatClient hcatClient;
263    
264            /**
265             * Create a HCatContext that can be used to access a hcat URI
266             *
267             * @param conf Configuration to access the URI
268             * @param user name of the user the URI should be accessed as
269             * @param hcatClient HCatClient to talk to hcatalog server
270             */
271            public HCatContext(Configuration conf, String user, HCatClient hcatClient) {
272                super(conf, user);
273                this.hcatClient = hcatClient;
274            }
275    
276            /**
277             * Get the HCatClient to talk to hcatalog server
278             *
279             * @return HCatClient to talk to hcatalog server
280             */
281            public HCatClient getHCatClient() {
282                return hcatClient;
283            }
284    
285            @Override
286            public void destroy() {
287                try {
288                    hcatClient.close();
289                }
290                catch (Exception ignore) {
291                    XLog.getLog(HCatContext.class).warn("Error closing hcat client", ignore);
292                }
293            }
294    
295        }
296    
297    }