001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.dependency; 019 020 import java.io.IOException; 021 import java.net.URI; 022 import java.util.List; 023 import java.util.Set; 024 025 import org.apache.hadoop.conf.Configuration; 026 import org.apache.hadoop.fs.FileSystem; 027 import org.apache.hadoop.fs.Path; 028 import org.apache.oozie.ErrorCode; 029 import org.apache.oozie.action.hadoop.FSLauncherURIHandler; 030 import org.apache.oozie.action.hadoop.LauncherURIHandler; 031 import org.apache.oozie.service.HadoopAccessorException; 032 import org.apache.oozie.service.HadoopAccessorService; 033 import org.apache.oozie.service.Services; 034 035 public class FSURIHandler implements URIHandler { 036 037 private HadoopAccessorService service; 038 private Set<String> supportedSchemes; 039 private List<Class<?>> classesToShip; 040 041 @Override 042 public void init(Configuration conf) { 043 service = Services.get().get(HadoopAccessorService.class); 044 supportedSchemes = service.getSupportedSchemes(); 045 classesToShip = new FSLauncherURIHandler().getClassesForLauncher(); 046 } 047 048 @Override 049 public Set<String> getSupportedSchemes() { 050 return supportedSchemes; 051 } 052 053 @Override 054 public Class<? extends LauncherURIHandler> getLauncherURIHandlerClass() { 055 return FSLauncherURIHandler.class; 056 } 057 058 @Override 059 public List<Class<?>> getClassesForLauncher() { 060 return classesToShip; 061 } 062 063 @Override 064 public DependencyType getDependencyType(URI uri) throws URIHandlerException { 065 return DependencyType.PULL; 066 } 067 068 @Override 069 public void registerForNotification(URI uri, Configuration conf, String user, String actionID) 070 throws URIHandlerException { 071 throw new UnsupportedOperationException("Notifications are not supported for " + uri.getScheme()); 072 } 073 074 @Override 075 public boolean unregisterFromNotification(URI uri, String actionID) { 076 throw new UnsupportedOperationException("Notifications are not supported for " + uri.getScheme()); 077 } 078 079 @Override 080 public Context getContext(URI uri, Configuration conf, String user) throws URIHandlerException { 081 FileSystem fs = getFileSystem(uri, conf, user); 082 return new FSContext(conf, user, fs); 083 } 084 085 @Override 086 public boolean exists(URI uri, Context context) throws URIHandlerException { 087 try { 088 FileSystem fs = ((FSContext) context).getFileSystem(); 089 return fs.exists(getNormalizedPath(uri)); 090 } 091 catch (IOException e) { 092 throw new HadoopAccessorException(ErrorCode.E0902, e); 093 } 094 } 095 096 @Override 097 public boolean exists(URI uri, Configuration conf, String user) throws URIHandlerException { 098 try { 099 FileSystem fs = getFileSystem(uri, conf, user); 100 return fs.exists(getNormalizedPath(uri)); 101 } 102 catch (IOException e) { 103 throw new HadoopAccessorException(ErrorCode.E0902, e); 104 } 105 } 106 107 @Override 108 public String getURIWithDoneFlag(String uri, String doneFlag) throws URIHandlerException { 109 if (doneFlag.length() > 0) { 110 uri += "/" + doneFlag; 111 } 112 return uri; 113 } 114 115 @Override 116 public void validate(String uri) throws URIHandlerException { 117 } 118 119 @Override 120 public void destroy() { 121 122 } 123 124 private Path getNormalizedPath(URI uri) { 125 // Normalizes uri path replacing // with / in the path which users specify by mistake 126 return new Path(uri.getScheme(), uri.getAuthority(), uri.getPath()); 127 } 128 129 private FileSystem getFileSystem(URI uri, Configuration conf, String user) throws HadoopAccessorException { 130 if (user == null) { 131 throw new HadoopAccessorException(ErrorCode.E0902, "user has to be specified to access FileSystem"); 132 } 133 Configuration fsConf = service.createJobConf(uri.getAuthority()); 134 return service.createFileSystem(user, uri, fsConf); 135 } 136 137 static class FSContext extends Context { 138 139 private FileSystem fs; 140 141 /** 142 * Create a FSContext that can be used to access a filesystem URI 143 * 144 * @param conf Configuration to access the URI 145 * @param user name of the user the URI should be accessed as 146 * @param fs FileSystem to access 147 */ 148 public FSContext(Configuration conf, String user, FileSystem fs) { 149 super(conf, user); 150 this.fs = fs; 151 } 152 153 /** 154 * Get the FileSystem to access the URI 155 * @return FileSystem to access the URI 156 */ 157 public FileSystem getFileSystem() { 158 return fs; 159 } 160 } 161 162 }