001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.service; 020 021import java.net.URI; 022import java.net.URISyntaxException; 023import java.util.Arrays; 024import java.util.HashMap; 025import java.util.HashSet; 026import java.util.List; 027import java.util.Map; 028import java.util.Set; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.util.ReflectionUtils; 031import org.apache.oozie.ErrorCode; 032import org.apache.oozie.action.hadoop.LauncherURIHandler; 033import org.apache.oozie.action.hadoop.LauncherURIHandlerFactory; 034import org.apache.oozie.dependency.FSURIHandler; 035import org.apache.oozie.dependency.URIHandler; 036import org.apache.oozie.dependency.URIHandlerException; 037import org.apache.oozie.util.XLog; 038 039public class URIHandlerService implements Service { 040 041 private static final String CONF_PREFIX = Service.CONF_PREFIX + "URIHandlerService."; 042 public static final String URI_HANDLERS = CONF_PREFIX + "uri.handlers"; 043 public static final String URI_HANDLER_DEFAULT = CONF_PREFIX + "uri.handler.default"; 044 public static final String URI_HANDLER_SUPPORTED_SCHEMES_PREFIX = CONF_PREFIX + "uri.handler."; 045 public static final String URI_HANDLER_SUPPORTED_SCHEMES_SUFFIX = ".supported.schemes"; 046 047 private static XLog LOG = XLog.getLog(URIHandlerService.class); 048 private Configuration launcherConf; 049 private Set<Class<?>> launcherClassesToShip; 050 private Map<String, URIHandler> cache; 051 private URIHandler defaultHandler; 052 053 @Override 054 public void init(Services services) throws ServiceException { 055 try { 056 init(services.getConf()); 057 } 058 catch (Exception e) { 059 throw new ServiceException(ErrorCode.E0902, e); 060 } 061 } 062 063 private void init(Configuration conf) throws ClassNotFoundException { 064 cache = new HashMap<String, URIHandler>(); 065 066 String[] classes = ConfigurationService.getStrings(conf, URI_HANDLERS); 067 for (String classname : classes) { 068 Class<?> clazz = Class.forName(classname.trim()); 069 URIHandler uriHandler = (URIHandler) ReflectionUtils.newInstance(clazz, null); 070 uriHandler.init(conf); 071 for (String scheme : uriHandler.getSupportedSchemes()) { 072 cache.put(scheme, uriHandler); 073 } 074 } 075 076 Class<?> defaultClass = conf.getClass(URI_HANDLER_DEFAULT, null); 077 defaultHandler = (defaultClass == null) ? new FSURIHandler() : (URIHandler) ReflectionUtils.newInstance( 078 defaultClass, null); 079 defaultHandler.init(conf); 080 for (String scheme : defaultHandler.getSupportedSchemes()) { 081 cache.put(scheme, defaultHandler); 082 } 083 084 initLauncherClassesToShip(); 085 initLauncherURIHandlerConf(); 086 087 LOG.info("Loaded urihandlers {0}", Arrays.toString(classes)); 088 LOG.info("Loaded default urihandler {0}", defaultHandler.getClass().getName()); 089 } 090 091 /** 092 * Initialize classes that need to be shipped for using LauncherURIHandler in the launcher job 093 */ 094 private void initLauncherClassesToShip(){ 095 launcherClassesToShip = new HashSet<Class<?>>(); 096 launcherClassesToShip.add(LauncherURIHandlerFactory.class); 097 launcherClassesToShip.add(LauncherURIHandler.class); 098 for (URIHandler handler : cache.values()) { 099 launcherClassesToShip.add(handler.getLauncherURIHandlerClass()); 100 List<Class<?>> classes = handler.getClassesForLauncher(); 101 if (classes != null) { 102 launcherClassesToShip.addAll(classes); 103 } 104 } 105 launcherClassesToShip.add(defaultHandler.getLauncherURIHandlerClass()); 106 } 107 108 /** 109 * Initialize configuration required for using LauncherURIHandler in the launcher job 110 */ 111 private void initLauncherURIHandlerConf() { 112 launcherConf = new Configuration(false); 113 114 for (URIHandler handler : cache.values()) { 115 for (String scheme : handler.getSupportedSchemes()) { 116 String schemeConf = LauncherURIHandlerFactory.CONF_LAUNCHER_URIHANDLER_SCHEME_PREFIX + scheme; 117 launcherConf.set(schemeConf, handler.getLauncherURIHandlerClass().getName()); 118 } 119 } 120 121 for (String scheme : defaultHandler.getSupportedSchemes()) { 122 String schemeConf = LauncherURIHandlerFactory.CONF_LAUNCHER_URIHANDLER_SCHEME_PREFIX + scheme; 123 launcherConf.set(schemeConf, defaultHandler.getLauncherURIHandlerClass().getName()); 124 } 125 } 126 127 @Override 128 public void destroy() { 129 Set<URIHandler> handlers = new HashSet<URIHandler>(); 130 handlers.addAll(cache.values()); 131 for (URIHandler handler : handlers) { 132 handler.destroy(); 133 } 134 cache.clear(); 135 } 136 137 @Override 138 public Class<? extends Service> getInterface() { 139 return URIHandlerService.class; 140 } 141 142 /** 143 * Return the classes to be shipped to the launcher 144 * @return the set of classes to be shipped to the launcher 145 */ 146 public Set<Class<?>> getClassesForLauncher() { 147 return launcherClassesToShip; 148 } 149 150 /** 151 * Return the configuration required to use LauncherURIHandler in the launcher 152 * @return configuration 153 */ 154 public Configuration getLauncherConfig() { 155 return launcherConf; 156 } 157 158 public URIHandler getURIHandler(String uri) throws URIHandlerException { 159 try { 160 return getURIHandler(new URI(uri)); 161 } 162 catch (URISyntaxException e) { 163 throw new URIHandlerException(ErrorCode.E0902, e); 164 } 165 } 166 167 public URIHandler getURIHandler(URI uri) throws URIHandlerException { 168 return getURIHandler(uri, false); 169 } 170 171 public URIHandler getURIHandler(URI uri, boolean validateURI) throws URIHandlerException { 172 if (uri.getScheme() == null) { 173 if (validateURI) { 174 throw new URIHandlerException(ErrorCode.E0905, uri); 175 } 176 else { 177 return defaultHandler; 178 } 179 } 180 else { 181 URIHandler handler = cache.get(uri.getScheme()); 182 if (handler == null) { 183 handler = cache.get("*"); 184 if (handler == null) { 185 throw new URIHandlerException(ErrorCode.E0904, uri.getScheme(), uri.toString()); 186 } 187 } 188 return handler; 189 } 190 } 191 192 /** 193 * Get the URI with scheme://host:port removing the path 194 * @param uri uri template 195 * @return URI with authority and scheme 196 * @throws URIHandlerException 197 */ 198 public URI getAuthorityWithScheme(String uri) throws URIHandlerException { 199 int index = uri.indexOf("://"); 200 try { 201 if (index == -1) { 202 LOG.trace("Relative path for uri-template "+uri); 203 return new URI("/"); 204 } 205 if (uri.indexOf(":///") != -1) { 206 return new URI(uri.substring(0, index + 4)); 207 } 208 int pathIndex = uri.indexOf("/", index + 4); 209 if (pathIndex == -1) { 210 return new URI(uri.substring(0)); 211 } 212 else { 213 return new URI(uri.substring(0, pathIndex)); 214 } 215 } 216 catch (URISyntaxException e) { 217 throw new URIHandlerException(ErrorCode.E0906, uri, e); 218 } 219 } 220 221}