001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 * 
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 * 
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.action.hadoop;
019
020import java.io.IOException;
021import java.net.URI;
022import java.net.URISyntaxException;
023
024import org.apache.hadoop.fs.FileStatus;
025import org.apache.hadoop.fs.FileSystem;
026import org.apache.hadoop.fs.Path;
027import org.apache.hadoop.mapred.JobConf;
028import org.apache.oozie.DagELFunctions;
029import org.apache.oozie.client.WorkflowJob;
030import org.apache.oozie.service.HadoopAccessorException;
031import org.apache.oozie.service.Services;
032import org.apache.oozie.service.HadoopAccessorService;
033
034/**
035 * EL function for fs action executor.
036 */
037public class FsELFunctions {
038
039    private static FileSystem getFileSystem(URI uri) throws HadoopAccessorException {
040        WorkflowJob workflow = DagELFunctions.getWorkflow();
041        String user = workflow.getUser();
042        String group = workflow.getGroup();
043        HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
044        JobConf conf = has.createJobConf(uri.getAuthority());
045        return has.createFileSystem(user, uri, conf);
046    }
047
048    /**
049     * Get file status.
050     *
051     * @param pathUri fs path uri
052     * @return file status
053     * @throws URISyntaxException
054     * @throws IOException
055     * @throws Exception
056     */
057    private static FileStatus getFileStatus(String pathUri) throws Exception {
058        URI uri = new URI(pathUri);
059        String path = uri.getPath();
060        FileSystem fs = getFileSystem(uri);
061        Path p = new Path(path);
062        return fs.exists(p) ? fs.getFileStatus(p) : null;
063    }
064
065    /**
066     * Return if a path exists.
067     *
068     * @param pathUri file system path uri.
069     * @return <code>true</code> if the path exists, <code>false</code> if it does not.
070     * @throws Exception
071     */
072    public static boolean fs_exists(String pathUri) throws Exception {
073        URI uri = new URI(pathUri);
074        String path = uri.getPath();
075        FileSystem fs = getFileSystem(uri);
076        return fs.exists(new Path(path));
077    }
078
079    /**
080     * Return if a path is a directory.
081     *
082     * @param pathUri fs path uri.
083     * @return <code>true</code> if the path exists and it is a directory, <code>false</code> otherwise.
084     * @throws Exception
085     */
086    public static boolean fs_isDir(String pathUri) throws Exception {
087        boolean isDir = false;
088        FileStatus fileStatus = getFileStatus(pathUri);
089        if (fileStatus != null) {
090            isDir = fileStatus.isDir();
091        }
092        return isDir;
093    }
094
095    /**
096     * Return the len of a file.
097     *
098     * @param pathUri file system path uri.
099     * @return the file len in bytes, -1 if the file does not exist or if it is a directory.
100     * @throws Exception
101     */
102    public static long fs_fileSize(String pathUri) throws Exception {
103        long len = -1;
104        FileStatus fileStatus = getFileStatus(pathUri);
105        if (fileStatus != null) {
106            len = fileStatus.getLen();
107        }
108        return len;
109    }
110
111    /**
112     * Return the size of all files in the directory, it is not recursive.
113     *
114     * @param pathUri file system path uri.
115     * @return the size of all files in the directory, -1 if the directory does not exist or if it is a file.
116     * @throws Exception
117     */
118    public static long fs_dirSize(String pathUri) throws Exception {
119        URI uri = new URI(pathUri);
120        String path = uri.getPath();
121        long size = -1;
122        try {
123            FileSystem fs = getFileSystem(uri);
124            Path p = new Path(path);
125            if (fs.exists(p) && !fs.isFile(p)) {
126                FileStatus[] stati = fs.listStatus(p);
127                size = 0;
128                if (stati != null) {
129                    for (FileStatus status : stati) {
130                        if (!status.isDir()) {
131                            size += status.getLen();
132                        }
133                    }
134                }
135            }
136        }
137        catch (Exception ex) {
138            throw new RuntimeException(ex);
139        }
140        return size;
141    }
142
143    /**
144     * Return the file block size in bytes.
145     *
146     * @param pathUri file system path uri.
147     * @return the block size of the file in bytes, -1 if the file does not exist or if it is a directory.
148     * @throws Exception
149     */
150    public static long fs_blockSize(String pathUri) throws Exception {
151        long blockSize = -1;
152        FileStatus fileStatus = getFileStatus(pathUri);
153        if (fileStatus != null) {
154            blockSize = fileStatus.getBlockSize();
155        }
156        return blockSize;
157    }
158
159}