001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.dependency; 020 021import java.io.IOException; 022import java.net.URI; 023import java.util.List; 024import java.util.Set; 025 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.fs.FileSystem; 028import org.apache.hadoop.fs.Path; 029import org.apache.oozie.ErrorCode; 030import org.apache.oozie.action.hadoop.FSLauncherURIHandler; 031import org.apache.oozie.action.hadoop.LauncherURIHandler; 032import org.apache.oozie.service.HadoopAccessorException; 033import org.apache.oozie.service.HadoopAccessorService; 034import org.apache.oozie.service.Services; 035 036public class FSURIHandler implements URIHandler { 037 038 private HadoopAccessorService service; 039 private Set<String> supportedSchemes; 040 private List<Class<?>> classesToShip; 041 042 @Override 043 public void init(Configuration conf) { 044 service = Services.get().get(HadoopAccessorService.class); 045 supportedSchemes = service.getSupportedSchemes(); 046 classesToShip = new FSLauncherURIHandler().getClassesForLauncher(); 047 } 048 049 @Override 050 public Set<String> getSupportedSchemes() { 051 return supportedSchemes; 052 } 053 054 @Override 055 public Class<? extends LauncherURIHandler> getLauncherURIHandlerClass() { 056 return FSLauncherURIHandler.class; 057 } 058 059 @Override 060 public List<Class<?>> getClassesForLauncher() { 061 return classesToShip; 062 } 063 064 @Override 065 public DependencyType getDependencyType(URI uri) throws URIHandlerException { 066 return DependencyType.PULL; 067 } 068 069 @Override 070 public void registerForNotification(URI uri, Configuration conf, String user, String actionID) 071 throws URIHandlerException { 072 throw new UnsupportedOperationException("Notifications are not supported for " + uri.getScheme()); 073 } 074 075 @Override 076 public boolean unregisterFromNotification(URI uri, String actionID) { 077 throw new UnsupportedOperationException("Notifications are not supported for " + uri.getScheme()); 078 } 079 080 @Override 081 public Context getContext(URI uri, Configuration conf, String user, boolean readOnly) throws URIHandlerException { 082 FileSystem fs = getFileSystem(uri, conf, user); 083 return new FSContext(conf, user, fs); 084 } 085 086 @Override 087 public boolean exists(URI uri, Context context) throws URIHandlerException { 088 try { 089 FileSystem fs = ((FSContext) context).getFileSystem(); 090 return fs.exists(getNormalizedPath(uri)); 091 } 092 catch (IOException e) { 093 throw new HadoopAccessorException(ErrorCode.E0902, e); 094 } 095 } 096 097 @Override 098 public boolean exists(URI uri, Configuration conf, String user) throws URIHandlerException { 099 try { 100 FileSystem fs = getFileSystem(uri, conf, user); 101 return fs.exists(getNormalizedPath(uri)); 102 } 103 catch (IOException e) { 104 throw new HadoopAccessorException(ErrorCode.E0902, e); 105 } 106 } 107 108 @Override 109 public String getURIWithDoneFlag(String uri, String doneFlag) throws URIHandlerException { 110 if (doneFlag.length() > 0) { 111 uri += "/" + doneFlag; 112 } 113 return uri; 114 } 115 116 @Override 117 public void validate(String uri) throws URIHandlerException { 118 } 119 120 @Override 121 public void destroy() { 122 123 } 124 125 @Override 126 public void delete(URI uri, Context context) throws URIHandlerException { 127 FileSystem fs = ((FSContext) context).getFileSystem(); 128 Path path = new Path(uri); 129 try { 130 if (fs.exists(path)) { 131 if (!fs.delete(path, true)) { 132 throw new URIHandlerException(ErrorCode.E0907, path.toString()); 133 } 134 } 135 } 136 catch (IOException e) { 137 throw new URIHandlerException(ErrorCode.E0907, path.toString()); 138 } 139 } 140 141 @Override 142 public void delete(URI uri, Configuration conf, String user) throws URIHandlerException { 143 Path path = new Path(uri); 144 FileSystem fs = getFileSystem(uri, conf, user); 145 try{ 146 if (fs.exists(path)) { 147 if (!fs.delete(path, true)) { 148 throw new URIHandlerException(ErrorCode.E0907, path.toString()); 149 } 150 } 151 } catch (IOException e){ 152 throw new URIHandlerException(ErrorCode.E0907, path.toString()); 153 } 154 } 155 156 private Path getNormalizedPath(URI uri) { 157 // Normalizes uri path replacing // with / in the path which users specify by mistake 158 return new Path(uri.getScheme(), uri.getAuthority(), uri.getPath()); 159 } 160 161 private FileSystem getFileSystem(URI uri, Configuration conf, String user) throws HadoopAccessorException { 162 if (user == null) { 163 throw new HadoopAccessorException(ErrorCode.E0902, "user has to be specified to access FileSystem"); 164 } 165 Configuration fsConf = service.createJobConf(uri.getAuthority()); 166 return service.createFileSystem(user, uri, fsConf); 167 } 168 169 static class FSContext extends Context { 170 171 private FileSystem fs; 172 173 /** 174 * Create a FSContext that can be used to access a filesystem URI 175 * 176 * @param conf Configuration to access the URI 177 * @param user name of the user the URI should be accessed as 178 * @param fs FileSystem to access 179 */ 180 public FSContext(Configuration conf, String user, FileSystem fs) { 181 super(conf, user); 182 this.fs = fs; 183 } 184 185 /** 186 * Get the FileSystem to access the URI 187 * @return FileSystem to access the URI 188 */ 189 public FileSystem getFileSystem() { 190 return fs; 191 } 192 } 193 194}