001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.dependency; 020 021import java.io.IOException; 022import java.net.URI; 023import java.util.List; 024import java.util.Set; 025 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.fs.FileSystem; 028import org.apache.hadoop.fs.Path; 029import org.apache.oozie.ErrorCode; 030import org.apache.oozie.action.hadoop.FSLauncherURIHandler; 031import org.apache.oozie.action.hadoop.LauncherURIHandler; 032import org.apache.oozie.service.HadoopAccessorException; 033import org.apache.oozie.service.HadoopAccessorService; 034import org.apache.oozie.service.Services; 035 036public class FSURIHandler implements URIHandler { 037 038 private HadoopAccessorService service; 039 private Set<String> supportedSchemes; 040 private List<Class<?>> classesToShip; 041 042 @Override 043 public void init(Configuration conf) { 044 service = Services.get().get(HadoopAccessorService.class); 045 supportedSchemes = service.getSupportedSchemes(); 046 classesToShip = new FSLauncherURIHandler().getClassesForLauncher(); 047 } 048 049 @Override 050 public Set<String> getSupportedSchemes() { 051 return supportedSchemes; 052 } 053 054 @Override 055 public Class<? extends LauncherURIHandler> getLauncherURIHandlerClass() { 056 return FSLauncherURIHandler.class; 057 } 058 059 @Override 060 public List<Class<?>> getClassesForLauncher() { 061 return classesToShip; 062 } 063 064 @Override 065 public DependencyType getDependencyType(URI uri) throws URIHandlerException { 066 return DependencyType.PULL; 067 } 068 069 @Override 070 public void registerForNotification(URI uri, Configuration conf, String user, String actionID) 071 throws URIHandlerException { 072 throw new UnsupportedOperationException("Notifications are not supported for " + uri.getScheme()); 073 } 074 075 @Override 076 public boolean unregisterFromNotification(URI uri, String actionID) { 077 throw new UnsupportedOperationException("Notifications are not supported for " + uri.getScheme()); 078 } 079 080 @Override 081 public Context getContext(URI uri, Configuration conf, String user, boolean readOnly) throws URIHandlerException { 082 FileSystem fs = getFileSystem(uri, conf, user); 083 return new FSContext(conf, user, fs); 084 } 085 086 @Override 087 public boolean exists(URI uri, Context context) throws URIHandlerException { 088 try { 089 FileSystem fs = ((FSContext) context).getFileSystem(); 090 return fs.exists(getNormalizedPath(uri)); 091 } 092 catch (IOException e) { 093 throw new HadoopAccessorException(ErrorCode.E0902, e); 094 } 095 } 096 097 @Override 098 public boolean exists(URI uri, Configuration conf, String user) throws URIHandlerException { 099 try { 100 FileSystem fs = getFileSystem(uri, conf, user); 101 return fs.exists(getNormalizedPath(uri)); 102 } 103 catch (HadoopAccessorException e) { 104 if (ErrorCode.E0902.equals(e.getErrorCode()) && e.getMessage() != null 105 && e.getMessage().indexOf("Invalid path for the Har Filesystem. No index file") != -1) { 106 return false; 107 } 108 else { 109 throw e; 110 } 111 } 112 catch (IOException e) { 113 throw new HadoopAccessorException(ErrorCode.E0902, e); 114 } 115 } 116 117 @Override 118 public String getURIWithDoneFlag(String uri, String doneFlag) throws URIHandlerException { 119 if (doneFlag.length() > 0) { 120 uri += "/" + doneFlag; 121 } 122 return uri; 123 } 124 125 @Override 126 public String getURIWithoutDoneFlag(String uri, String doneFlag) throws URIHandlerException { 127 if (doneFlag.length() > 0 && uri.endsWith(doneFlag)) { 128 return uri.substring(0, uri.lastIndexOf("/" + doneFlag)); 129 } 130 return uri; 131 } 132 133 134 @Override 135 public void validate(String uri) throws URIHandlerException { 136 } 137 138 @Override 139 public void destroy() { 140 141 } 142 143 @Override 144 public void delete(URI uri, Context context) throws URIHandlerException { 145 FileSystem fs = ((FSContext) context).getFileSystem(); 146 Path path = new Path(uri); 147 try { 148 if (fs.exists(path)) { 149 if (!fs.delete(path, true)) { 150 throw new URIHandlerException(ErrorCode.E0907, path.toString()); 151 } 152 } 153 } 154 catch (IOException e) { 155 throw new URIHandlerException(ErrorCode.E0907, path.toString()); 156 } 157 } 158 159 @Override 160 public void delete(URI uri, Configuration conf, String user) throws URIHandlerException { 161 Path path = new Path(uri); 162 FileSystem fs = getFileSystem(uri, conf, user); 163 try{ 164 if (fs.exists(path)) { 165 if (!fs.delete(path, true)) { 166 throw new URIHandlerException(ErrorCode.E0907, path.toString()); 167 } 168 } 169 } catch (IOException e){ 170 throw new URIHandlerException(ErrorCode.E0907, path.toString()); 171 } 172 } 173 174 private Path getNormalizedPath(URI uri) { 175 // Normalizes uri path replacing // with / in the path which users specify by mistake 176 return new Path(uri.getScheme(), uri.getAuthority(), uri.getPath()); 177 } 178 179 private FileSystem getFileSystem(URI uri, Configuration conf, String user) throws HadoopAccessorException { 180 if (user == null) { 181 throw new HadoopAccessorException(ErrorCode.E0902, "user has to be specified to access FileSystem"); 182 } 183 Configuration fsConf = service.createConfiguration(uri.getAuthority()); 184 return service.createFileSystem(user, uri, fsConf); 185 } 186 187 static class FSContext extends Context { 188 189 private FileSystem fs; 190 191 /** 192 * Create a FSContext that can be used to access a filesystem URI 193 * 194 * @param conf Configuration to access the URI 195 * @param user name of the user the URI should be accessed as 196 * @param fs FileSystem to access 197 */ 198 public FSContext(Configuration conf, String user, FileSystem fs) { 199 super(conf, user); 200 this.fs = fs; 201 } 202 203 /** 204 * Get the FileSystem to access the URI 205 * @return FileSystem to access the URI 206 */ 207 public FileSystem getFileSystem() { 208 return fs; 209 } 210 } 211 212}