001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.service; 019 020 import java.net.URI; 021 import java.net.URISyntaxException; 022 import java.util.Arrays; 023 import java.util.HashMap; 024 import java.util.HashSet; 025 import java.util.List; 026 import java.util.Map; 027 import java.util.Set; 028 import org.apache.hadoop.conf.Configuration; 029 import org.apache.hadoop.util.ReflectionUtils; 030 import org.apache.oozie.ErrorCode; 031 import org.apache.oozie.action.hadoop.LauncherURIHandler; 032 import org.apache.oozie.action.hadoop.LauncherURIHandlerFactory; 033 import org.apache.oozie.dependency.FSURIHandler; 034 import org.apache.oozie.dependency.URIHandler; 035 import org.apache.oozie.dependency.URIHandlerException; 036 import org.apache.oozie.util.XLog; 037 038 public class URIHandlerService implements Service { 039 040 private static final String CONF_PREFIX = Service.CONF_PREFIX + "URIHandlerService."; 041 public static final String URI_HANDLERS = CONF_PREFIX + "uri.handlers"; 042 public static final String URI_HANDLER_DEFAULT = CONF_PREFIX + "uri.handler.default"; 043 public static final String URI_HANDLER_SUPPORTED_SCHEMES_PREFIX = CONF_PREFIX + "uri.handler."; 044 public static final String URI_HANDLER_SUPPORTED_SCHEMES_SUFFIX = ".supported.schemes"; 045 046 private static XLog LOG = XLog.getLog(URIHandlerService.class); 047 private Configuration launcherConf; 048 private Set<Class<?>> launcherClassesToShip; 049 private Map<String, URIHandler> cache; 050 private URIHandler defaultHandler; 051 052 @Override 053 public void init(Services services) throws ServiceException { 054 try { 055 init(services.getConf()); 056 } 057 catch (Exception e) { 058 throw new ServiceException(ErrorCode.E0902, e); 059 } 060 } 061 062 private void init(Configuration conf) throws ClassNotFoundException { 063 cache = new HashMap<String, URIHandler>(); 064 065 String[] classes = conf.getStrings(URI_HANDLERS, FSURIHandler.class.getName()); 066 for (String classname : classes) { 067 Class<?> clazz = Class.forName(classname.trim()); 068 URIHandler uriHandler = (URIHandler) ReflectionUtils.newInstance(clazz, null); 069 uriHandler.init(conf); 070 for (String scheme : uriHandler.getSupportedSchemes()) { 071 cache.put(scheme, uriHandler); 072 } 073 } 074 075 Class<?> defaultClass = conf.getClass(URI_HANDLER_DEFAULT, null); 076 defaultHandler = (defaultClass == null) ? new FSURIHandler() : (URIHandler) ReflectionUtils.newInstance( 077 defaultClass, null); 078 defaultHandler.init(conf); 079 for (String scheme : defaultHandler.getSupportedSchemes()) { 080 cache.put(scheme, defaultHandler); 081 } 082 083 initLauncherClassesToShip(); 084 initLauncherURIHandlerConf(); 085 086 LOG.info("Loaded urihandlers {0}", Arrays.toString(classes)); 087 LOG.info("Loaded default urihandler {0}", defaultHandler.getClass().getName()); 088 } 089 090 /** 091 * Initialize classes that need to be shipped for using LauncherURIHandler in the launcher job 092 */ 093 private void initLauncherClassesToShip(){ 094 launcherClassesToShip = new HashSet<Class<?>>(); 095 launcherClassesToShip.add(LauncherURIHandlerFactory.class); 096 launcherClassesToShip.add(LauncherURIHandler.class); 097 for (URIHandler handler : cache.values()) { 098 launcherClassesToShip.add(handler.getLauncherURIHandlerClass()); 099 List<Class<?>> classes = handler.getClassesForLauncher(); 100 if (classes != null) { 101 launcherClassesToShip.addAll(classes); 102 } 103 } 104 launcherClassesToShip.add(defaultHandler.getLauncherURIHandlerClass()); 105 } 106 107 /** 108 * Initialize configuration required for using LauncherURIHandler in the launcher job 109 */ 110 private void initLauncherURIHandlerConf() { 111 launcherConf = new Configuration(false); 112 113 for (URIHandler handler : cache.values()) { 114 for (String scheme : handler.getSupportedSchemes()) { 115 String schemeConf = LauncherURIHandlerFactory.CONF_LAUNCHER_URIHANDLER_SCHEME_PREFIX + scheme; 116 launcherConf.set(schemeConf, handler.getLauncherURIHandlerClass().getName()); 117 } 118 } 119 120 for (String scheme : defaultHandler.getSupportedSchemes()) { 121 String schemeConf = LauncherURIHandlerFactory.CONF_LAUNCHER_URIHANDLER_SCHEME_PREFIX + scheme; 122 launcherConf.set(schemeConf, defaultHandler.getLauncherURIHandlerClass().getName()); 123 } 124 } 125 126 @Override 127 public void destroy() { 128 Set<URIHandler> handlers = new HashSet<URIHandler>(); 129 handlers.addAll(cache.values()); 130 for (URIHandler handler : handlers) { 131 handler.destroy(); 132 } 133 cache.clear(); 134 } 135 136 @Override 137 public Class<? extends Service> getInterface() { 138 return URIHandlerService.class; 139 } 140 141 /** 142 * Return the classes to be shipped to the launcher 143 * @return the set of classes to be shipped to the launcher 144 */ 145 public Set<Class<?>> getClassesForLauncher() { 146 return launcherClassesToShip; 147 } 148 149 /** 150 * Return the configuration required to use LauncherURIHandler in the launcher 151 * @return configuration 152 */ 153 public Configuration getLauncherConfig() { 154 return launcherConf; 155 } 156 157 public URIHandler getURIHandler(String uri) throws URIHandlerException { 158 try { 159 return getURIHandler(new URI(uri)); 160 } 161 catch (URISyntaxException e) { 162 throw new URIHandlerException(ErrorCode.E0902, e); 163 } 164 } 165 166 public URIHandler getURIHandler(URI uri) throws URIHandlerException { 167 return getURIHandler(uri, false); 168 } 169 170 public URIHandler getURIHandler(URI uri, boolean validateURI) throws URIHandlerException { 171 if (uri.getScheme() == null) { 172 if (validateURI) { 173 throw new URIHandlerException(ErrorCode.E0905, uri); 174 } 175 else { 176 return defaultHandler; 177 } 178 } 179 else { 180 URIHandler handler = cache.get(uri.getScheme()); 181 if (handler == null) { 182 handler = cache.get("*"); 183 if (handler == null) { 184 throw new URIHandlerException(ErrorCode.E0904, uri.getScheme(), uri.toString()); 185 } 186 } 187 return handler; 188 } 189 } 190 191 /** 192 * Get the URI with scheme://host:port removing the path 193 * @param uri uri template 194 * @return URI with authority and scheme 195 * @throws URIHandlerException 196 */ 197 public URI getAuthorityWithScheme(String uri) throws URIHandlerException { 198 int index = uri.indexOf("://"); 199 try { 200 if (index == -1) { 201 LOG.trace("Relative path for uri-template "+uri); 202 return new URI("/"); 203 } 204 if (uri.indexOf(":///") != -1) { 205 return new URI(uri.substring(0, index + 4)); 206 } 207 int pathIndex = uri.indexOf("/", index + 4); 208 if (pathIndex == -1) { 209 return new URI(uri.substring(0)); 210 } 211 else { 212 return new URI(uri.substring(0, pathIndex)); 213 } 214 } 215 catch (URISyntaxException e) { 216 throw new URIHandlerException(ErrorCode.E0906, uri, e); 217 } 218 } 219 220 }