This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.service;
019
020 import java.net.URI;
021 import java.net.URISyntaxException;
022 import java.util.Arrays;
023 import java.util.HashMap;
024 import java.util.HashSet;
025 import java.util.List;
026 import java.util.Map;
027 import java.util.Set;
028 import org.apache.hadoop.conf.Configuration;
029 import org.apache.hadoop.util.ReflectionUtils;
030 import org.apache.oozie.ErrorCode;
031 import org.apache.oozie.action.hadoop.LauncherURIHandler;
032 import org.apache.oozie.action.hadoop.LauncherURIHandlerFactory;
033 import org.apache.oozie.dependency.FSURIHandler;
034 import org.apache.oozie.dependency.URIHandler;
035 import org.apache.oozie.dependency.URIHandlerException;
036 import org.apache.oozie.util.XLog;
037
038 public class URIHandlerService implements Service {
039
040 private static final String CONF_PREFIX = Service.CONF_PREFIX + "URIHandlerService.";
041 public static final String URI_HANDLERS = CONF_PREFIX + "uri.handlers";
042 public static final String URI_HANDLER_DEFAULT = CONF_PREFIX + "uri.handler.default";
043 public static final String URI_HANDLER_SUPPORTED_SCHEMES_PREFIX = CONF_PREFIX + "uri.handler.";
044 public static final String URI_HANDLER_SUPPORTED_SCHEMES_SUFFIX = ".supported.schemes";
045
046 private static XLog LOG = XLog.getLog(URIHandlerService.class);
047 private Configuration launcherConf;
048 private Set<Class<?>> launcherClassesToShip;
049 private Map<String, URIHandler> cache;
050 private URIHandler defaultHandler;
051
052 @Override
053 public void init(Services services) throws ServiceException {
054 try {
055 init(services.getConf());
056 }
057 catch (Exception e) {
058 throw new ServiceException(ErrorCode.E0902, e);
059 }
060 }
061
062 private void init(Configuration conf) throws ClassNotFoundException {
063 cache = new HashMap<String, URIHandler>();
064
065 String[] classes = conf.getStrings(URI_HANDLERS, FSURIHandler.class.getName());
066 for (String classname : classes) {
067 Class<?> clazz = Class.forName(classname.trim());
068 URIHandler uriHandler = (URIHandler) ReflectionUtils.newInstance(clazz, null);
069 uriHandler.init(conf);
070 for (String scheme : uriHandler.getSupportedSchemes()) {
071 cache.put(scheme, uriHandler);
072 }
073 }
074
075 Class<?> defaultClass = conf.getClass(URI_HANDLER_DEFAULT, null);
076 defaultHandler = (defaultClass == null) ? new FSURIHandler() : (URIHandler) ReflectionUtils.newInstance(
077 defaultClass, null);
078 defaultHandler.init(conf);
079 for (String scheme : defaultHandler.getSupportedSchemes()) {
080 cache.put(scheme, defaultHandler);
081 }
082
083 initLauncherClassesToShip();
084 initLauncherURIHandlerConf();
085
086 LOG.info("Loaded urihandlers {0}", Arrays.toString(classes));
087 LOG.info("Loaded default urihandler {0}", defaultHandler.getClass().getName());
088 }
089
090 /**
091 * Initialize classes that need to be shipped for using LauncherURIHandler in the launcher job
092 */
093 private void initLauncherClassesToShip(){
094 launcherClassesToShip = new HashSet<Class<?>>();
095 launcherClassesToShip.add(LauncherURIHandlerFactory.class);
096 launcherClassesToShip.add(LauncherURIHandler.class);
097 for (URIHandler handler : cache.values()) {
098 launcherClassesToShip.add(handler.getLauncherURIHandlerClass());
099 List<Class<?>> classes = handler.getClassesForLauncher();
100 if (classes != null) {
101 launcherClassesToShip.addAll(classes);
102 }
103 }
104 launcherClassesToShip.add(defaultHandler.getLauncherURIHandlerClass());
105 }
106
107 /**
108 * Initialize configuration required for using LauncherURIHandler in the launcher job
109 */
110 private void initLauncherURIHandlerConf() {
111 launcherConf = new Configuration(false);
112
113 for (URIHandler handler : cache.values()) {
114 for (String scheme : handler.getSupportedSchemes()) {
115 String schemeConf = LauncherURIHandlerFactory.CONF_LAUNCHER_URIHANDLER_SCHEME_PREFIX + scheme;
116 launcherConf.set(schemeConf, handler.getLauncherURIHandlerClass().getName());
117 }
118 }
119
120 for (String scheme : defaultHandler.getSupportedSchemes()) {
121 String schemeConf = LauncherURIHandlerFactory.CONF_LAUNCHER_URIHANDLER_SCHEME_PREFIX + scheme;
122 launcherConf.set(schemeConf, defaultHandler.getLauncherURIHandlerClass().getName());
123 }
124 }
125
126 @Override
127 public void destroy() {
128 Set<URIHandler> handlers = new HashSet<URIHandler>();
129 handlers.addAll(cache.values());
130 for (URIHandler handler : handlers) {
131 handler.destroy();
132 }
133 cache.clear();
134 }
135
136 @Override
137 public Class<? extends Service> getInterface() {
138 return URIHandlerService.class;
139 }
140
141 /**
142 * Return the classes to be shipped to the launcher
143 * @return the set of classes to be shipped to the launcher
144 */
145 public Set<Class<?>> getClassesForLauncher() {
146 return launcherClassesToShip;
147 }
148
149 /**
150 * Return the configuration required to use LauncherURIHandler in the launcher
151 * @return configuration
152 */
153 public Configuration getLauncherConfig() {
154 return launcherConf;
155 }
156
157 public URIHandler getURIHandler(String uri) throws URIHandlerException {
158 try {
159 return getURIHandler(new URI(uri));
160 }
161 catch (URISyntaxException e) {
162 throw new URIHandlerException(ErrorCode.E0902, e);
163 }
164 }
165
166 public URIHandler getURIHandler(URI uri) throws URIHandlerException {
167 return getURIHandler(uri, false);
168 }
169
170 public URIHandler getURIHandler(URI uri, boolean validateURI) throws URIHandlerException {
171 if (uri.getScheme() == null) {
172 if (validateURI) {
173 throw new URIHandlerException(ErrorCode.E0905, uri);
174 }
175 else {
176 return defaultHandler;
177 }
178 }
179 else {
180 URIHandler handler = cache.get(uri.getScheme());
181 if (handler == null) {
182 handler = cache.get("*");
183 if (handler == null) {
184 throw new URIHandlerException(ErrorCode.E0904, uri.getScheme(), uri.toString());
185 }
186 }
187 return handler;
188 }
189 }
190
191 /**
192 * Get the URI with scheme://host:port removing the path
193 * @param uri uri template
194 * @return URI with authority and scheme
195 * @throws URIHandlerException
196 */
197 public URI getAuthorityWithScheme(String uri) throws URIHandlerException {
198 int index = uri.indexOf("://");
199 try {
200 if (index == -1) {
201 LOG.trace("Relative path for uri-template "+uri);
202 return new URI("/");
203 }
204 if (uri.indexOf(":///") != -1) {
205 return new URI(uri.substring(0, index + 4));
206 }
207 int pathIndex = uri.indexOf("/", index + 4);
208 if (pathIndex == -1) {
209 return new URI(uri.substring(0));
210 }
211 else {
212 return new URI(uri.substring(0, pathIndex));
213 }
214 }
215 catch (URISyntaxException e) {
216 throw new URIHandlerException(ErrorCode.E0906, uri, e);
217 }
218 }
219
220 }