001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.dependency;
019
020import java.io.IOException;
021import java.net.URI;
022import java.util.List;
023import java.util.Set;
024
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.FileSystem;
027import org.apache.hadoop.fs.Path;
028import org.apache.oozie.ErrorCode;
029import org.apache.oozie.action.hadoop.FSLauncherURIHandler;
030import org.apache.oozie.action.hadoop.LauncherURIHandler;
031import org.apache.oozie.service.HadoopAccessorException;
032import org.apache.oozie.service.HadoopAccessorService;
033import org.apache.oozie.service.Services;
034
035public class FSURIHandler implements URIHandler {
036
037    private HadoopAccessorService service;
038    private Set<String> supportedSchemes;
039    private List<Class<?>> classesToShip;
040
041    @Override
042    public void init(Configuration conf) {
043        service = Services.get().get(HadoopAccessorService.class);
044        supportedSchemes = service.getSupportedSchemes();
045        classesToShip = new FSLauncherURIHandler().getClassesForLauncher();
046    }
047
048    @Override
049    public Set<String> getSupportedSchemes() {
050        return supportedSchemes;
051    }
052
053    @Override
054    public Class<? extends LauncherURIHandler> getLauncherURIHandlerClass() {
055        return FSLauncherURIHandler.class;
056    }
057
058    @Override
059    public List<Class<?>> getClassesForLauncher() {
060        return classesToShip;
061    }
062
063    @Override
064    public DependencyType getDependencyType(URI uri) throws URIHandlerException {
065        return DependencyType.PULL;
066    }
067
068    @Override
069    public void registerForNotification(URI uri, Configuration conf, String user, String actionID)
070            throws URIHandlerException {
071        throw new UnsupportedOperationException("Notifications are not supported for " + uri.getScheme());
072    }
073
074    @Override
075    public boolean unregisterFromNotification(URI uri, String actionID) {
076        throw new UnsupportedOperationException("Notifications are not supported for " + uri.getScheme());
077    }
078
079    @Override
080    public Context getContext(URI uri, Configuration conf, String user) throws URIHandlerException {
081        FileSystem fs = getFileSystem(uri, conf, user);
082        return new FSContext(conf, user, fs);
083    }
084
085    @Override
086    public boolean exists(URI uri, Context context) throws URIHandlerException {
087        try {
088            FileSystem fs = ((FSContext) context).getFileSystem();
089            return fs.exists(getNormalizedPath(uri));
090        }
091        catch (IOException e) {
092            throw new HadoopAccessorException(ErrorCode.E0902, e);
093        }
094    }
095
096    @Override
097    public boolean exists(URI uri, Configuration conf, String user) throws URIHandlerException {
098        try {
099            FileSystem fs = getFileSystem(uri, conf, user);
100            return fs.exists(getNormalizedPath(uri));
101        }
102        catch (IOException e) {
103            throw new HadoopAccessorException(ErrorCode.E0902, e);
104        }
105    }
106
107    @Override
108    public String getURIWithDoneFlag(String uri, String doneFlag) throws URIHandlerException {
109        if (doneFlag.length() > 0) {
110            uri += "/" + doneFlag;
111        }
112        return uri;
113    }
114
115    @Override
116    public void validate(String uri) throws URIHandlerException {
117    }
118
119    @Override
120    public void destroy() {
121
122    }
123
124    private Path getNormalizedPath(URI uri) {
125        // Normalizes uri path replacing // with / in the path which users specify by mistake
126        return new Path(uri.getScheme(), uri.getAuthority(), uri.getPath());
127    }
128
129    private FileSystem getFileSystem(URI uri, Configuration conf, String user) throws HadoopAccessorException {
130        if (user == null) {
131            throw new HadoopAccessorException(ErrorCode.E0902, "user has to be specified to access FileSystem");
132        }
133        Configuration fsConf = service.createJobConf(uri.getAuthority());
134        return service.createFileSystem(user, uri, fsConf);
135    }
136
137    static class FSContext extends Context {
138
139        private FileSystem fs;
140
141        /**
142         * Create a FSContext that can be used to access a filesystem URI
143         *
144         * @param conf Configuration to access the URI
145         * @param user name of the user the URI should be accessed as
146         * @param fs FileSystem to access
147         */
148        public FSContext(Configuration conf, String user, FileSystem fs) {
149            super(conf, user);
150            this.fs = fs;
151        }
152
153        /**
154         * Get the FileSystem to access the URI
155         * @return FileSystem to access the URI
156         */
157        public FileSystem getFileSystem() {
158            return fs;
159        }
160    }
161
162}