001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.dependency;
020
021import java.io.IOException;
022import java.net.URI;
023import java.util.List;
024import java.util.Set;
025
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.fs.Path;
029import org.apache.oozie.ErrorCode;
030import org.apache.oozie.action.hadoop.FSLauncherURIHandler;
031import org.apache.oozie.action.hadoop.LauncherURIHandler;
032import org.apache.oozie.service.HadoopAccessorException;
033import org.apache.oozie.service.HadoopAccessorService;
034import org.apache.oozie.service.Services;
035
036public class FSURIHandler implements URIHandler {
037
038    private HadoopAccessorService service;
039    private Set<String> supportedSchemes;
040    private List<Class<?>> classesToShip;
041
042    @Override
043    public void init(Configuration conf) {
044        service = Services.get().get(HadoopAccessorService.class);
045        supportedSchemes = service.getSupportedSchemes();
046        classesToShip = new FSLauncherURIHandler().getClassesForLauncher();
047    }
048
049    @Override
050    public Set<String> getSupportedSchemes() {
051        return supportedSchemes;
052    }
053
054    @Override
055    public Class<? extends LauncherURIHandler> getLauncherURIHandlerClass() {
056        return FSLauncherURIHandler.class;
057    }
058
059    @Override
060    public List<Class<?>> getClassesForLauncher() {
061        return classesToShip;
062    }
063
064    @Override
065    public DependencyType getDependencyType(URI uri) throws URIHandlerException {
066        return DependencyType.PULL;
067    }
068
069    @Override
070    public void registerForNotification(URI uri, Configuration conf, String user, String actionID)
071            throws URIHandlerException {
072        throw new UnsupportedOperationException("Notifications are not supported for " + uri.getScheme());
073    }
074
075    @Override
076    public boolean unregisterFromNotification(URI uri, String actionID) {
077        throw new UnsupportedOperationException("Notifications are not supported for " + uri.getScheme());
078    }
079
080    @Override
081    public Context getContext(URI uri, Configuration conf, String user, boolean readOnly) throws URIHandlerException {
082        FileSystem fs = getFileSystem(uri, conf, user);
083        return new FSContext(conf, user, fs);
084    }
085
086    @Override
087    public boolean exists(URI uri, Context context) throws URIHandlerException {
088        try {
089            FileSystem fs = ((FSContext) context).getFileSystem();
090            return fs.exists(getNormalizedPath(uri));
091        }
092        catch (IOException e) {
093            throw new HadoopAccessorException(ErrorCode.E0902, e);
094        }
095    }
096
097    @Override
098    public boolean exists(URI uri, Configuration conf, String user) throws URIHandlerException {
099        try {
100            FileSystem fs = getFileSystem(uri, conf, user);
101            return fs.exists(getNormalizedPath(uri));
102        }
103        catch (HadoopAccessorException e) {
104            if (ErrorCode.E0902.equals(e.getErrorCode()) && e.getMessage() != null
105                    && e.getMessage().indexOf("Invalid path for the Har Filesystem. No index file") != -1) {
106                return false;
107            }
108            else {
109                throw e;
110            }
111        }
112        catch (IOException e) {
113            throw new HadoopAccessorException(ErrorCode.E0902, e);
114        }
115    }
116
117    @Override
118    public String getURIWithDoneFlag(String uri, String doneFlag) throws URIHandlerException {
119        if (doneFlag.length() > 0) {
120            uri += "/" + doneFlag;
121        }
122        return uri;
123    }
124
125    @Override
126    public String getURIWithoutDoneFlag(String uri, String doneFlag) throws URIHandlerException {
127        if (doneFlag.length() > 0 && uri.endsWith(doneFlag)) {
128            return uri.substring(0, uri.lastIndexOf("/" + doneFlag));
129        }
130        return uri;
131    }
132
133
134    @Override
135    public void validate(String uri) throws URIHandlerException {
136    }
137
138    @Override
139    public void destroy() {
140
141    }
142
143    @Override
144    public void delete(URI uri, Context context) throws URIHandlerException {
145        FileSystem fs = ((FSContext) context).getFileSystem();
146        Path path = new Path(uri);
147        try {
148            if (fs.exists(path)) {
149                if (!fs.delete(path, true)) {
150                    throw new URIHandlerException(ErrorCode.E0907, path.toString());
151                }
152            }
153        }
154        catch (IOException e) {
155            throw new URIHandlerException(ErrorCode.E0907, path.toString());
156        }
157    }
158
159    @Override
160    public void delete(URI uri, Configuration conf, String user) throws URIHandlerException {
161        Path path = new Path(uri);
162        FileSystem fs = getFileSystem(uri, conf, user);
163        try{
164            if (fs.exists(path)) {
165                if (!fs.delete(path, true)) {
166                    throw new URIHandlerException(ErrorCode.E0907, path.toString());
167                }
168            }
169        } catch (IOException e){
170            throw new URIHandlerException(ErrorCode.E0907, path.toString());
171        }
172    }
173
174    private Path getNormalizedPath(URI uri) {
175        // Normalizes uri path replacing // with / in the path which users specify by mistake
176        return new Path(uri.getScheme(), uri.getAuthority(), uri.getPath());
177    }
178
179    private FileSystem getFileSystem(URI uri, Configuration conf, String user) throws HadoopAccessorException {
180        if (user == null) {
181            throw new HadoopAccessorException(ErrorCode.E0902, "user has to be specified to access FileSystem");
182        }
183        Configuration fsConf = service.createJobConf(uri.getAuthority());
184        return service.createFileSystem(user, uri, fsConf);
185    }
186
187    static class FSContext extends Context {
188
189        private FileSystem fs;
190
191        /**
192         * Create a FSContext that can be used to access a filesystem URI
193         *
194         * @param conf Configuration to access the URI
195         * @param user name of the user the URI should be accessed as
196         * @param fs FileSystem to access
197         */
198        public FSContext(Configuration conf, String user, FileSystem fs) {
199            super(conf, user);
200            this.fs = fs;
201        }
202
203        /**
204         * Get the FileSystem to access the URI
205         * @return FileSystem to access the URI
206         */
207        public FileSystem getFileSystem() {
208            return fs;
209        }
210    }
211
212}