001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.service;
019
020import java.net.URI;
021import java.net.URISyntaxException;
022import java.util.Arrays;
023import java.util.HashMap;
024import java.util.HashSet;
025import java.util.List;
026import java.util.Map;
027import java.util.Set;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.util.ReflectionUtils;
030import org.apache.oozie.ErrorCode;
031import org.apache.oozie.action.hadoop.LauncherURIHandler;
032import org.apache.oozie.action.hadoop.LauncherURIHandlerFactory;
033import org.apache.oozie.dependency.FSURIHandler;
034import org.apache.oozie.dependency.URIHandler;
035import org.apache.oozie.dependency.URIHandlerException;
036import org.apache.oozie.util.XLog;
037
038public class URIHandlerService implements Service {
039
040    private static final String CONF_PREFIX = Service.CONF_PREFIX + "URIHandlerService.";
041    public static final String URI_HANDLERS = CONF_PREFIX + "uri.handlers";
042    public static final String URI_HANDLER_DEFAULT = CONF_PREFIX + "uri.handler.default";
043    public static final String URI_HANDLER_SUPPORTED_SCHEMES_PREFIX = CONF_PREFIX + "uri.handler.";
044    public static final String URI_HANDLER_SUPPORTED_SCHEMES_SUFFIX = ".supported.schemes";
045
046    private static XLog LOG = XLog.getLog(URIHandlerService.class);
047    private Configuration launcherConf;
048    private Set<Class<?>> launcherClassesToShip;
049    private Map<String, URIHandler> cache;
050    private URIHandler defaultHandler;
051
052    @Override
053    public void init(Services services) throws ServiceException {
054        try {
055            init(services.getConf());
056        }
057        catch (Exception e) {
058            throw new ServiceException(ErrorCode.E0902, e);
059        }
060    }
061
062    private void init(Configuration conf) throws ClassNotFoundException {
063        cache = new HashMap<String, URIHandler>();
064
065        String[] classes = conf.getStrings(URI_HANDLERS, FSURIHandler.class.getName());
066        for (String classname : classes) {
067            Class<?> clazz = Class.forName(classname.trim());
068            URIHandler uriHandler = (URIHandler) ReflectionUtils.newInstance(clazz, null);
069            uriHandler.init(conf);
070            for (String scheme : uriHandler.getSupportedSchemes()) {
071                cache.put(scheme, uriHandler);
072            }
073        }
074
075        Class<?> defaultClass = conf.getClass(URI_HANDLER_DEFAULT, null);
076        defaultHandler = (defaultClass == null) ? new FSURIHandler() : (URIHandler) ReflectionUtils.newInstance(
077                defaultClass, null);
078        defaultHandler.init(conf);
079        for (String scheme : defaultHandler.getSupportedSchemes()) {
080            cache.put(scheme, defaultHandler);
081        }
082
083        initLauncherClassesToShip();
084        initLauncherURIHandlerConf();
085
086        LOG.info("Loaded urihandlers {0}", Arrays.toString(classes));
087        LOG.info("Loaded default urihandler {0}", defaultHandler.getClass().getName());
088    }
089
090    /**
091     * Initialize classes that need to be shipped for using LauncherURIHandler in the launcher job
092     */
093    private void initLauncherClassesToShip(){
094        launcherClassesToShip = new HashSet<Class<?>>();
095        launcherClassesToShip.add(LauncherURIHandlerFactory.class);
096        launcherClassesToShip.add(LauncherURIHandler.class);
097        for (URIHandler handler : cache.values()) {
098            launcherClassesToShip.add(handler.getLauncherURIHandlerClass());
099            List<Class<?>> classes = handler.getClassesForLauncher();
100            if (classes != null) {
101                launcherClassesToShip.addAll(classes);
102            }
103        }
104        launcherClassesToShip.add(defaultHandler.getLauncherURIHandlerClass());
105    }
106
107    /**
108     * Initialize configuration required for using LauncherURIHandler in the launcher job
109     */
110    private void initLauncherURIHandlerConf() {
111        launcherConf = new Configuration(false);
112
113        for (URIHandler handler : cache.values()) {
114            for (String scheme : handler.getSupportedSchemes()) {
115                String schemeConf = LauncherURIHandlerFactory.CONF_LAUNCHER_URIHANDLER_SCHEME_PREFIX + scheme;
116                launcherConf.set(schemeConf, handler.getLauncherURIHandlerClass().getName());
117            }
118        }
119
120        for (String scheme : defaultHandler.getSupportedSchemes()) {
121            String schemeConf = LauncherURIHandlerFactory.CONF_LAUNCHER_URIHANDLER_SCHEME_PREFIX + scheme;
122            launcherConf.set(schemeConf, defaultHandler.getLauncherURIHandlerClass().getName());
123        }
124    }
125
126    @Override
127    public void destroy() {
128        Set<URIHandler> handlers = new HashSet<URIHandler>();
129        handlers.addAll(cache.values());
130        for (URIHandler handler : handlers) {
131            handler.destroy();
132        }
133        cache.clear();
134    }
135
136    @Override
137    public Class<? extends Service> getInterface() {
138        return URIHandlerService.class;
139    }
140
141    /**
142     * Return the classes to be shipped to the launcher
143     * @return the set of classes to be shipped to the launcher
144     */
145    public Set<Class<?>> getClassesForLauncher() {
146        return launcherClassesToShip;
147    }
148
149    /**
150     * Return the configuration required to use LauncherURIHandler in the launcher
151     * @return configuration
152     */
153    public Configuration getLauncherConfig() {
154        return launcherConf;
155    }
156
157    public URIHandler getURIHandler(String uri) throws URIHandlerException {
158        try {
159            return getURIHandler(new URI(uri));
160        }
161        catch (URISyntaxException e) {
162            throw new URIHandlerException(ErrorCode.E0902, e);
163        }
164    }
165
166    public URIHandler getURIHandler(URI uri) throws URIHandlerException {
167        return getURIHandler(uri, false);
168    }
169
170    public URIHandler getURIHandler(URI uri, boolean validateURI) throws URIHandlerException {
171        if (uri.getScheme() == null) {
172            if (validateURI) {
173                throw new URIHandlerException(ErrorCode.E0905, uri);
174            }
175            else {
176                return defaultHandler;
177            }
178        }
179        else {
180            URIHandler handler = cache.get(uri.getScheme());
181            if (handler == null) {
182                handler = cache.get("*");
183                if (handler == null) {
184                    throw new URIHandlerException(ErrorCode.E0904, uri.getScheme(), uri.toString());
185                }
186            }
187            return handler;
188        }
189    }
190
191    /**
192     * Get the URI with scheme://host:port removing the path
193     * @param uri uri template
194     * @return URI with authority and scheme
195     * @throws URIHandlerException
196     */
197    public URI getAuthorityWithScheme(String uri) throws URIHandlerException {
198        int index = uri.indexOf("://");
199        try {
200            if (index == -1) {
201                LOG.trace("Relative path for uri-template "+uri);
202                return new URI("/");
203            }
204            if (uri.indexOf(":///") != -1) {
205                return new URI(uri.substring(0, index + 4));
206            }
207            int pathIndex = uri.indexOf("/", index + 4);
208            if (pathIndex == -1) {
209                return new URI(uri.substring(0));
210            }
211            else {
212                return new URI(uri.substring(0, pathIndex));
213            }
214        }
215        catch (URISyntaxException e) {
216            throw new URIHandlerException(ErrorCode.E0906, uri, e);
217        }
218    }
219
220}