This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.dependency;
019
020 import java.io.IOException;
021 import java.net.URI;
022 import java.util.List;
023 import java.util.Set;
024
025 import org.apache.hadoop.conf.Configuration;
026 import org.apache.hadoop.fs.FileSystem;
027 import org.apache.hadoop.fs.Path;
028 import org.apache.oozie.ErrorCode;
029 import org.apache.oozie.action.hadoop.FSLauncherURIHandler;
030 import org.apache.oozie.action.hadoop.LauncherURIHandler;
031 import org.apache.oozie.service.HadoopAccessorException;
032 import org.apache.oozie.service.HadoopAccessorService;
033 import org.apache.oozie.service.Services;
034
035 public class FSURIHandler implements URIHandler {
036
037 private HadoopAccessorService service;
038 private Set<String> supportedSchemes;
039 private List<Class<?>> classesToShip;
040
041 @Override
042 public void init(Configuration conf) {
043 service = Services.get().get(HadoopAccessorService.class);
044 supportedSchemes = service.getSupportedSchemes();
045 classesToShip = new FSLauncherURIHandler().getClassesForLauncher();
046 }
047
048 @Override
049 public Set<String> getSupportedSchemes() {
050 return supportedSchemes;
051 }
052
053 @Override
054 public Class<? extends LauncherURIHandler> getLauncherURIHandlerClass() {
055 return FSLauncherURIHandler.class;
056 }
057
058 @Override
059 public List<Class<?>> getClassesForLauncher() {
060 return classesToShip;
061 }
062
063 @Override
064 public DependencyType getDependencyType(URI uri) throws URIHandlerException {
065 return DependencyType.PULL;
066 }
067
068 @Override
069 public void registerForNotification(URI uri, Configuration conf, String user, String actionID)
070 throws URIHandlerException {
071 throw new UnsupportedOperationException("Notifications are not supported for " + uri.getScheme());
072 }
073
074 @Override
075 public boolean unregisterFromNotification(URI uri, String actionID) {
076 throw new UnsupportedOperationException("Notifications are not supported for " + uri.getScheme());
077 }
078
079 @Override
080 public Context getContext(URI uri, Configuration conf, String user) throws URIHandlerException {
081 FileSystem fs = getFileSystem(uri, conf, user);
082 return new FSContext(conf, user, fs);
083 }
084
085 @Override
086 public boolean exists(URI uri, Context context) throws URIHandlerException {
087 try {
088 FileSystem fs = ((FSContext) context).getFileSystem();
089 return fs.exists(getNormalizedPath(uri));
090 }
091 catch (IOException e) {
092 throw new HadoopAccessorException(ErrorCode.E0902, e);
093 }
094 }
095
096 @Override
097 public boolean exists(URI uri, Configuration conf, String user) throws URIHandlerException {
098 try {
099 FileSystem fs = getFileSystem(uri, conf, user);
100 return fs.exists(getNormalizedPath(uri));
101 }
102 catch (IOException e) {
103 throw new HadoopAccessorException(ErrorCode.E0902, e);
104 }
105 }
106
107 @Override
108 public String getURIWithDoneFlag(String uri, String doneFlag) throws URIHandlerException {
109 if (doneFlag.length() > 0) {
110 uri += "/" + doneFlag;
111 }
112 return uri;
113 }
114
115 @Override
116 public void validate(String uri) throws URIHandlerException {
117 }
118
119 @Override
120 public void destroy() {
121
122 }
123
124 private Path getNormalizedPath(URI uri) {
125 // Normalizes uri path replacing // with / in the path which users specify by mistake
126 return new Path(uri.getScheme(), uri.getAuthority(), uri.getPath());
127 }
128
129 private FileSystem getFileSystem(URI uri, Configuration conf, String user) throws HadoopAccessorException {
130 if (user == null) {
131 throw new HadoopAccessorException(ErrorCode.E0902, "user has to be specified to access FileSystem");
132 }
133 Configuration fsConf = service.createJobConf(uri.getAuthority());
134 return service.createFileSystem(user, uri, fsConf);
135 }
136
137 static class FSContext extends Context {
138
139 private FileSystem fs;
140
141 /**
142 * Create a FSContext that can be used to access a filesystem URI
143 *
144 * @param conf Configuration to access the URI
145 * @param user name of the user the URI should be accessed as
146 * @param fs FileSystem to access
147 */
148 public FSContext(Configuration conf, String user, FileSystem fs) {
149 super(conf, user);
150 this.fs = fs;
151 }
152
153 /**
154 * Get the FileSystem to access the URI
155 * @return FileSystem to access the URI
156 */
157 public FileSystem getFileSystem() {
158 return fs;
159 }
160 }
161
162 }