This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.dependency;
020
021import java.io.IOException;
022import java.net.URI;
023import java.util.List;
024import java.util.Set;
025
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.fs.Path;
029import org.apache.oozie.ErrorCode;
030import org.apache.oozie.action.hadoop.FSLauncherURIHandler;
031import org.apache.oozie.action.hadoop.LauncherURIHandler;
032import org.apache.oozie.service.HadoopAccessorException;
033import org.apache.oozie.service.HadoopAccessorService;
034import org.apache.oozie.service.Services;
035
036public class FSURIHandler implements URIHandler {
037
038    private HadoopAccessorService service;
039    private Set<String> supportedSchemes;
040    private List<Class<?>> classesToShip;
041
042    @Override
043    public void init(Configuration conf) {
044        service = Services.get().get(HadoopAccessorService.class);
045        supportedSchemes = service.getSupportedSchemes();
046        classesToShip = new FSLauncherURIHandler().getClassesForLauncher();
047    }
048
049    @Override
050    public Set<String> getSupportedSchemes() {
051        return supportedSchemes;
052    }
053
054    @Override
055    public Class<? extends LauncherURIHandler> getLauncherURIHandlerClass() {
056        return FSLauncherURIHandler.class;
057    }
058
059    @Override
060    public List<Class<?>> getClassesForLauncher() {
061        return classesToShip;
062    }
063
064    @Override
065    public DependencyType getDependencyType(URI uri) throws URIHandlerException {
066        return DependencyType.PULL;
067    }
068
069    @Override
070    public void registerForNotification(URI uri, Configuration conf, String user, String actionID)
071            throws URIHandlerException {
072        throw new UnsupportedOperationException("Notifications are not supported for " + uri.getScheme());
073    }
074
075    @Override
076    public boolean unregisterFromNotification(URI uri, String actionID) {
077        throw new UnsupportedOperationException("Notifications are not supported for " + uri.getScheme());
078    }
079
080    @Override
081    public Context getContext(URI uri, Configuration conf, String user, boolean readOnly) throws URIHandlerException {
082        FileSystem fs = getFileSystem(uri, conf, user);
083        return new FSContext(conf, user, fs);
084    }
085
086    @Override
087    public boolean exists(URI uri, Context context) throws URIHandlerException {
088        try {
089            FileSystem fs = ((FSContext) context).getFileSystem();
090            return fs.exists(getNormalizedPath(uri));
091        }
092        catch (IOException e) {
093            throw new HadoopAccessorException(ErrorCode.E0902, e);
094        }
095    }
096
097    @Override
098    public boolean exists(URI uri, Configuration conf, String user) throws URIHandlerException {
099        try {
100            FileSystem fs = getFileSystem(uri, conf, user);
101            return fs.exists(getNormalizedPath(uri));
102        }
103        catch (IOException e) {
104            throw new HadoopAccessorException(ErrorCode.E0902, e);
105        }
106    }
107
108    @Override
109    public String getURIWithDoneFlag(String uri, String doneFlag) throws URIHandlerException {
110        if (doneFlag.length() > 0) {
111            uri += "/" + doneFlag;
112        }
113        return uri;
114    }
115
116    @Override
117    public void validate(String uri) throws URIHandlerException {
118    }
119
120    @Override
121    public void destroy() {
122
123    }
124
125    @Override
126    public void delete(URI uri, Context context) throws URIHandlerException {
127        FileSystem fs = ((FSContext) context).getFileSystem();
128        Path path = new Path(uri);
129        try {
130            if (fs.exists(path)) {
131                if (!fs.delete(path, true)) {
132                    throw new URIHandlerException(ErrorCode.E0907, path.toString());
133                }
134            }
135        }
136        catch (IOException e) {
137            throw new URIHandlerException(ErrorCode.E0907, path.toString());
138        }
139    }
140
141    @Override
142    public void delete(URI uri, Configuration conf, String user) throws URIHandlerException {
143        Path path = new Path(uri);
144        FileSystem fs = getFileSystem(uri, conf, user);
145        try{
146            if (fs.exists(path)) {
147                if (!fs.delete(path, true)) {
148                    throw new URIHandlerException(ErrorCode.E0907, path.toString());
149                }
150            }
151        } catch (IOException e){
152            throw new URIHandlerException(ErrorCode.E0907, path.toString());
153        }
154    }
155
156    private Path getNormalizedPath(URI uri) {
157        // Normalizes uri path replacing // with / in the path which users specify by mistake
158        return new Path(uri.getScheme(), uri.getAuthority(), uri.getPath());
159    }
160
161    private FileSystem getFileSystem(URI uri, Configuration conf, String user) throws HadoopAccessorException {
162        if (user == null) {
163            throw new HadoopAccessorException(ErrorCode.E0902, "user has to be specified to access FileSystem");
164        }
165        Configuration fsConf = service.createJobConf(uri.getAuthority());
166        return service.createFileSystem(user, uri, fsConf);
167    }
168
169    static class FSContext extends Context {
170
171        private FileSystem fs;
172
173        /**
174         * Create a FSContext that can be used to access a filesystem URI
175         *
176         * @param conf Configuration to access the URI
177         * @param user name of the user the URI should be accessed as
178         * @param fs FileSystem to access
179         */
180        public FSContext(Configuration conf, String user, FileSystem fs) {
181            super(conf, user);
182            this.fs = fs;
183        }
184
185        /**
186         * Get the FileSystem to access the URI
187         * @return FileSystem to access the URI
188         */
189        public FileSystem getFileSystem() {
190            return fs;
191        }
192    }
193
194}