001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.dependency;
019    
020    import java.io.IOException;
021    import java.net.URI;
022    import java.util.List;
023    import java.util.Set;
024    
025    import org.apache.hadoop.conf.Configuration;
026    import org.apache.hadoop.fs.FileSystem;
027    import org.apache.hadoop.fs.Path;
028    import org.apache.oozie.ErrorCode;
029    import org.apache.oozie.action.hadoop.FSLauncherURIHandler;
030    import org.apache.oozie.action.hadoop.LauncherURIHandler;
031    import org.apache.oozie.service.HadoopAccessorException;
032    import org.apache.oozie.service.HadoopAccessorService;
033    import org.apache.oozie.service.Services;
034    
035    public class FSURIHandler implements URIHandler {
036    
037        private HadoopAccessorService service;
038        private Set<String> supportedSchemes;
039        private List<Class<?>> classesToShip;
040    
041        @Override
042        public void init(Configuration conf) {
043            service = Services.get().get(HadoopAccessorService.class);
044            supportedSchemes = service.getSupportedSchemes();
045            classesToShip = new FSLauncherURIHandler().getClassesForLauncher();
046        }
047    
048        @Override
049        public Set<String> getSupportedSchemes() {
050            return supportedSchemes;
051        }
052    
053        @Override
054        public Class<? extends LauncherURIHandler> getLauncherURIHandlerClass() {
055            return FSLauncherURIHandler.class;
056        }
057    
058        @Override
059        public List<Class<?>> getClassesForLauncher() {
060            return classesToShip;
061        }
062    
063        @Override
064        public DependencyType getDependencyType(URI uri) throws URIHandlerException {
065            return DependencyType.PULL;
066        }
067    
068        @Override
069        public void registerForNotification(URI uri, Configuration conf, String user, String actionID)
070                throws URIHandlerException {
071            throw new UnsupportedOperationException("Notifications are not supported for " + uri.getScheme());
072        }
073    
074        @Override
075        public boolean unregisterFromNotification(URI uri, String actionID) {
076            throw new UnsupportedOperationException("Notifications are not supported for " + uri.getScheme());
077        }
078    
079        @Override
080        public Context getContext(URI uri, Configuration conf, String user) throws URIHandlerException {
081            FileSystem fs = getFileSystem(uri, conf, user);
082            return new FSContext(conf, user, fs);
083        }
084    
085        @Override
086        public boolean exists(URI uri, Context context) throws URIHandlerException {
087            try {
088                FileSystem fs = ((FSContext) context).getFileSystem();
089                return fs.exists(getNormalizedPath(uri));
090            }
091            catch (IOException e) {
092                throw new HadoopAccessorException(ErrorCode.E0902, e);
093            }
094        }
095    
096        @Override
097        public boolean exists(URI uri, Configuration conf, String user) throws URIHandlerException {
098            try {
099                FileSystem fs = getFileSystem(uri, conf, user);
100                return fs.exists(getNormalizedPath(uri));
101            }
102            catch (IOException e) {
103                throw new HadoopAccessorException(ErrorCode.E0902, e);
104            }
105        }
106    
107        @Override
108        public String getURIWithDoneFlag(String uri, String doneFlag) throws URIHandlerException {
109            if (doneFlag.length() > 0) {
110                uri += "/" + doneFlag;
111            }
112            return uri;
113        }
114    
115        @Override
116        public void validate(String uri) throws URIHandlerException {
117        }
118    
119        @Override
120        public void destroy() {
121    
122        }
123    
124        private Path getNormalizedPath(URI uri) {
125            // Normalizes uri path replacing // with / in the path which users specify by mistake
126            return new Path(uri.getScheme(), uri.getAuthority(), uri.getPath());
127        }
128    
129        private FileSystem getFileSystem(URI uri, Configuration conf, String user) throws HadoopAccessorException {
130            if (user == null) {
131                throw new HadoopAccessorException(ErrorCode.E0902, "user has to be specified to access FileSystem");
132            }
133            Configuration fsConf = service.createJobConf(uri.getAuthority());
134            return service.createFileSystem(user, uri, fsConf);
135        }
136    
137        static class FSContext extends Context {
138    
139            private FileSystem fs;
140    
141            /**
142             * Create a FSContext that can be used to access a filesystem URI
143             *
144             * @param conf Configuration to access the URI
145             * @param user name of the user the URI should be accessed as
146             * @param fs FileSystem to access
147             */
148            public FSContext(Configuration conf, String user, FileSystem fs) {
149                super(conf, user);
150                this.fs = fs;
151            }
152    
153            /**
154             * Get the FileSystem to access the URI
155             * @return FileSystem to access the URI
156             */
157            public FileSystem getFileSystem() {
158                return fs;
159            }
160        }
161    
162    }