001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.service;
019    
020    import java.net.URI;
021    import java.net.URISyntaxException;
022    import java.util.Arrays;
023    import java.util.HashMap;
024    import java.util.HashSet;
025    import java.util.List;
026    import java.util.Map;
027    import java.util.Set;
028    import org.apache.hadoop.conf.Configuration;
029    import org.apache.hadoop.util.ReflectionUtils;
030    import org.apache.oozie.ErrorCode;
031    import org.apache.oozie.action.hadoop.LauncherURIHandler;
032    import org.apache.oozie.action.hadoop.LauncherURIHandlerFactory;
033    import org.apache.oozie.dependency.FSURIHandler;
034    import org.apache.oozie.dependency.URIHandler;
035    import org.apache.oozie.dependency.URIHandlerException;
036    import org.apache.oozie.util.XLog;
037    
038    public class URIHandlerService implements Service {
039    
040        private static final String CONF_PREFIX = Service.CONF_PREFIX + "URIHandlerService.";
041        public static final String URI_HANDLERS = CONF_PREFIX + "uri.handlers";
042        public static final String URI_HANDLER_DEFAULT = CONF_PREFIX + "uri.handler.default";
043        public static final String URI_HANDLER_SUPPORTED_SCHEMES_PREFIX = CONF_PREFIX + "uri.handler.";
044        public static final String URI_HANDLER_SUPPORTED_SCHEMES_SUFFIX = ".supported.schemes";
045    
046        private static XLog LOG = XLog.getLog(URIHandlerService.class);
047        private Configuration launcherConf;
048        private Set<Class<?>> launcherClassesToShip;
049        private Map<String, URIHandler> cache;
050        private URIHandler defaultHandler;
051    
052        @Override
053        public void init(Services services) throws ServiceException {
054            try {
055                init(services.getConf());
056            }
057            catch (Exception e) {
058                throw new ServiceException(ErrorCode.E0902, e);
059            }
060        }
061    
062        private void init(Configuration conf) throws ClassNotFoundException {
063            cache = new HashMap<String, URIHandler>();
064    
065            String[] classes = conf.getStrings(URI_HANDLERS, FSURIHandler.class.getName());
066            for (String classname : classes) {
067                Class<?> clazz = Class.forName(classname.trim());
068                URIHandler uriHandler = (URIHandler) ReflectionUtils.newInstance(clazz, null);
069                uriHandler.init(conf);
070                for (String scheme : uriHandler.getSupportedSchemes()) {
071                    cache.put(scheme, uriHandler);
072                }
073            }
074    
075            Class<?> defaultClass = conf.getClass(URI_HANDLER_DEFAULT, null);
076            defaultHandler = (defaultClass == null) ? new FSURIHandler() : (URIHandler) ReflectionUtils.newInstance(
077                    defaultClass, null);
078            defaultHandler.init(conf);
079            for (String scheme : defaultHandler.getSupportedSchemes()) {
080                cache.put(scheme, defaultHandler);
081            }
082    
083            initLauncherClassesToShip();
084            initLauncherURIHandlerConf();
085    
086            LOG.info("Loaded urihandlers {0}", Arrays.toString(classes));
087            LOG.info("Loaded default urihandler {0}", defaultHandler.getClass().getName());
088        }
089    
090        /**
091         * Initialize classes that need to be shipped for using LauncherURIHandler in the launcher job
092         */
093        private void initLauncherClassesToShip(){
094            launcherClassesToShip = new HashSet<Class<?>>();
095            launcherClassesToShip.add(LauncherURIHandlerFactory.class);
096            launcherClassesToShip.add(LauncherURIHandler.class);
097            for (URIHandler handler : cache.values()) {
098                launcherClassesToShip.add(handler.getLauncherURIHandlerClass());
099                List<Class<?>> classes = handler.getClassesForLauncher();
100                if (classes != null) {
101                    launcherClassesToShip.addAll(classes);
102                }
103            }
104            launcherClassesToShip.add(defaultHandler.getLauncherURIHandlerClass());
105        }
106    
107        /**
108         * Initialize configuration required for using LauncherURIHandler in the launcher job
109         */
110        private void initLauncherURIHandlerConf() {
111            launcherConf = new Configuration(false);
112    
113            for (URIHandler handler : cache.values()) {
114                for (String scheme : handler.getSupportedSchemes()) {
115                    String schemeConf = LauncherURIHandlerFactory.CONF_LAUNCHER_URIHANDLER_SCHEME_PREFIX + scheme;
116                    launcherConf.set(schemeConf, handler.getLauncherURIHandlerClass().getName());
117                }
118            }
119    
120            for (String scheme : defaultHandler.getSupportedSchemes()) {
121                String schemeConf = LauncherURIHandlerFactory.CONF_LAUNCHER_URIHANDLER_SCHEME_PREFIX + scheme;
122                launcherConf.set(schemeConf, defaultHandler.getLauncherURIHandlerClass().getName());
123            }
124        }
125    
126        @Override
127        public void destroy() {
128            Set<URIHandler> handlers = new HashSet<URIHandler>();
129            handlers.addAll(cache.values());
130            for (URIHandler handler : handlers) {
131                handler.destroy();
132            }
133            cache.clear();
134        }
135    
136        @Override
137        public Class<? extends Service> getInterface() {
138            return URIHandlerService.class;
139        }
140    
141        /**
142         * Return the classes to be shipped to the launcher
143         * @return the set of classes to be shipped to the launcher
144         */
145        public Set<Class<?>> getClassesForLauncher() {
146            return launcherClassesToShip;
147        }
148    
149        /**
150         * Return the configuration required to use LauncherURIHandler in the launcher
151         * @return configuration
152         */
153        public Configuration getLauncherConfig() {
154            return launcherConf;
155        }
156    
157        public URIHandler getURIHandler(String uri) throws URIHandlerException {
158            try {
159                return getURIHandler(new URI(uri));
160            }
161            catch (URISyntaxException e) {
162                throw new URIHandlerException(ErrorCode.E0902, e);
163            }
164        }
165    
166        public URIHandler getURIHandler(URI uri) throws URIHandlerException {
167            return getURIHandler(uri, false);
168        }
169    
170        public URIHandler getURIHandler(URI uri, boolean validateURI) throws URIHandlerException {
171            if (uri.getScheme() == null) {
172                if (validateURI) {
173                    throw new URIHandlerException(ErrorCode.E0905, uri);
174                }
175                else {
176                    return defaultHandler;
177                }
178            }
179            else {
180                URIHandler handler = cache.get(uri.getScheme());
181                if (handler == null) {
182                    handler = cache.get("*");
183                    if (handler == null) {
184                        throw new URIHandlerException(ErrorCode.E0904, uri.getScheme(), uri.toString());
185                    }
186                }
187                return handler;
188            }
189        }
190    
191        /**
192         * Get the URI with scheme://host:port removing the path
193         * @param uri uri template
194         * @return URI with authority and scheme
195         * @throws URIHandlerException
196         */
197        public URI getAuthorityWithScheme(String uri) throws URIHandlerException {
198            int index = uri.indexOf("://");
199            try {
200                if (index == -1) {
201                    LOG.trace("Relative path for uri-template "+uri);
202                    return new URI("/");
203                }
204                if (uri.indexOf(":///") != -1) {
205                    return new URI(uri.substring(0, index + 4));
206                }
207                int pathIndex = uri.indexOf("/", index + 4);
208                if (pathIndex == -1) {
209                    return new URI(uri.substring(0));
210                }
211                else {
212                    return new URI(uri.substring(0, pathIndex));
213                }
214            }
215            catch (URISyntaxException e) {
216                throw new URIHandlerException(ErrorCode.E0906, uri, e);
217            }
218        }
219    
220    }