001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.action.hadoop;
019    
020    import java.io.IOException;
021    import java.net.URI;
022    import java.net.URISyntaxException;
023    
024    import org.apache.hadoop.fs.FileStatus;
025    import org.apache.hadoop.fs.FileSystem;
026    import org.apache.hadoop.fs.Path;
027    import org.apache.hadoop.mapred.JobConf;
028    import org.apache.oozie.DagELFunctions;
029    import org.apache.oozie.client.WorkflowJob;
030    import org.apache.oozie.service.HadoopAccessorException;
031    import org.apache.oozie.service.Services;
032    import org.apache.oozie.service.HadoopAccessorService;
033    
034    /**
035     * EL function for fs action executor.
036     */
037    public class FsELFunctions {
038    
039        private static FileSystem getFileSystem(URI uri) throws HadoopAccessorException {
040            WorkflowJob workflow = DagELFunctions.getWorkflow();
041            String user = workflow.getUser();
042            String group = workflow.getGroup();
043            HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
044            JobConf conf = has.createJobConf(uri.getAuthority());
045            return has.createFileSystem(user, uri, conf);
046        }
047    
048        /**
049         * Get file status.
050         *
051         * @param pathUri fs path uri
052         * @return file status
053         * @throws URISyntaxException
054         * @throws IOException
055         * @throws Exception
056         */
057        private static FileStatus getFileStatus(String pathUri) throws Exception {
058            URI uri = new URI(pathUri);
059            String path = uri.getPath();
060            FileSystem fs = getFileSystem(uri);
061            Path p = new Path(path);
062            return fs.exists(p) ? fs.getFileStatus(p) : null;
063        }
064    
065        /**
066         * Return if a path exists.
067         *
068         * @param pathUri file system path uri.
069         * @return <code>true</code> if the path exists, <code>false</code> if it does not.
070         * @throws Exception
071         */
072        public static boolean fs_exists(String pathUri) throws Exception {
073            URI uri = new URI(pathUri);
074            String path = uri.getPath();
075            FileSystem fs = getFileSystem(uri);
076            return fs.exists(new Path(path));
077        }
078    
079        /**
080         * Return if a path is a directory.
081         *
082         * @param pathUri fs path uri.
083         * @return <code>true</code> if the path exists and it is a directory, <code>false</code> otherwise.
084         * @throws Exception
085         */
086        public static boolean fs_isDir(String pathUri) throws Exception {
087            boolean isDir = false;
088            FileStatus fileStatus = getFileStatus(pathUri);
089            if (fileStatus != null) {
090                isDir = fileStatus.isDir();
091            }
092            return isDir;
093        }
094    
095        /**
096         * Return the len of a file.
097         *
098         * @param pathUri file system path uri.
099         * @return the file len in bytes, -1 if the file does not exist or if it is a directory.
100         * @throws Exception
101         */
102        public static long fs_fileSize(String pathUri) throws Exception {
103            long len = -1;
104            FileStatus fileStatus = getFileStatus(pathUri);
105            if (fileStatus != null) {
106                len = fileStatus.getLen();
107            }
108            return len;
109        }
110    
111        /**
112         * Return the size of all files in the directory, it is not recursive.
113         *
114         * @param pathUri file system path uri.
115         * @return the size of all files in the directory, -1 if the directory does not exist or if it is a file.
116         * @throws Exception
117         */
118        public static long fs_dirSize(String pathUri) throws Exception {
119            URI uri = new URI(pathUri);
120            String path = uri.getPath();
121            long size = -1;
122            try {
123                FileSystem fs = getFileSystem(uri);
124                Path p = new Path(path);
125                if (fs.exists(p) && !fs.isFile(p)) {
126                    FileStatus[] stati = fs.listStatus(p);
127                    size = 0;
128                    if (stati != null) {
129                        for (FileStatus status : stati) {
130                            if (!status.isDir()) {
131                                size += status.getLen();
132                            }
133                        }
134                    }
135                }
136            }
137            catch (Exception ex) {
138                throw new RuntimeException(ex);
139            }
140            return size;
141        }
142    
143        /**
144         * Return the file block size in bytes.
145         *
146         * @param pathUri file system path uri.
147         * @return the block size of the file in bytes, -1 if the file does not exist or if it is a directory.
148         * @throws Exception
149         */
150        public static long fs_blockSize(String pathUri) throws Exception {
151            long blockSize = -1;
152            FileStatus fileStatus = getFileStatus(pathUri);
153            if (fileStatus != null) {
154                blockSize = fileStatus.getBlockSize();
155            }
156            return blockSize;
157        }
158    
159    }