001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.action.hadoop;
020
021import java.io.IOException;
022import java.net.URI;
023import java.net.URISyntaxException;
024
025import org.apache.hadoop.fs.FileStatus;
026import org.apache.hadoop.fs.FileSystem;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.fs.PathFilter;
029import org.apache.hadoop.mapred.JobConf;
030import org.apache.oozie.DagELFunctions;
031import org.apache.oozie.action.ActionExecutorException;
032import org.apache.oozie.client.WorkflowJob;
033import org.apache.oozie.service.ConfigurationService;
034import org.apache.oozie.service.HadoopAccessorException;
035import org.apache.oozie.service.Services;
036import org.apache.oozie.service.HadoopAccessorService;
037
038/**
039 * EL function for fs action executor.
040 */
041public class FsELFunctions {
042
043    private static FileSystem getFileSystem(URI uri) throws HadoopAccessorException {
044        WorkflowJob workflow = DagELFunctions.getWorkflow();
045        String user = workflow.getUser();
046        String group = workflow.getGroup();
047        HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
048        JobConf conf = has.createJobConf(uri.getAuthority());
049        return has.createFileSystem(user, uri, conf);
050    }
051
052    /**
053     * Get file status.
054     *
055     * @param pathUri fs path uri
056     * @return file status
057     * @throws URISyntaxException
058     * @throws IOException
059     * @throws Exception
060     */
061    private static FileStatus getFileStatus(String pathUri) throws Exception {
062        URI uri = new URI(pathUri);
063        String path = uri.getPath();
064        FileSystem fs = getFileSystem(uri);
065        Path p = new Path(path);
066        return fs.exists(p) ? fs.getFileStatus(p) : null;
067    }
068
069    /**
070     * Return if a path exists.
071     *
072     * @param pathUri file system path uri.
073     * @return <code>true</code> if the path exists, <code>false</code> if it does not.
074     * @throws Exception
075     */
076    public static boolean fs_exists(String pathUri) throws Exception {
077        Path path = new Path(pathUri);
078        FileSystem fs = getFileSystem(path.toUri());
079        FileStatus[] pathArr;
080        try {
081            pathArr = fs.globStatus(path, new FSPathFilter());
082        }
083        catch (ReachingGlobMaxException e) {
084            throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS013",
085                    "too many globbed files/dirs to do FS operation");
086        }
087        return (pathArr != null && pathArr.length > 0);
088    }
089
090    /**
091     * Return if a path is a directory.
092     *
093     * @param pathUri fs path uri.
094     * @return <code>true</code> if the path exists and it is a directory, <code>false</code> otherwise.
095     * @throws Exception
096     */
097    public static boolean fs_isDir(String pathUri) throws Exception {
098        boolean isDir = false;
099        FileStatus fileStatus = getFileStatus(pathUri);
100        if (fileStatus != null) {
101            isDir = fileStatus.isDir();
102        }
103        return isDir;
104    }
105
106    /**
107     * Return the len of a file.
108     *
109     * @param pathUri file system path uri.
110     * @return the file len in bytes, -1 if the file does not exist or if it is a directory.
111     * @throws Exception
112     */
113    public static long fs_fileSize(String pathUri) throws Exception {
114        long len = -1;
115        FileStatus fileStatus = getFileStatus(pathUri);
116        if (fileStatus != null) {
117            len = fileStatus.getLen();
118        }
119        return len;
120    }
121
122    /**
123     * Return the size of all files in the directory, it is not recursive.
124     *
125     * @param pathUri file system path uri.
126     * @return the size of all files in the directory, -1 if the directory does not exist or if it is a file.
127     * @throws Exception
128     */
129    public static long fs_dirSize(String pathUri) throws Exception {
130        URI uri = new URI(pathUri);
131        String path = uri.getPath();
132        long size = -1;
133        try {
134            FileSystem fs = getFileSystem(uri);
135            Path p = new Path(path);
136            if (fs.exists(p) && !fs.isFile(p)) {
137                FileStatus[] stati = fs.listStatus(p);
138                size = 0;
139                if (stati != null) {
140                    for (FileStatus status : stati) {
141                        if (!status.isDir()) {
142                            size += status.getLen();
143                        }
144                    }
145                }
146            }
147        }
148        catch (Exception ex) {
149            throw new RuntimeException(ex);
150        }
151        return size;
152    }
153
154    /**
155     * Return the file block size in bytes.
156     *
157     * @param pathUri file system path uri.
158     * @return the block size of the file in bytes, -1 if the file does not exist or if it is a directory.
159     * @throws Exception
160     */
161    public static long fs_blockSize(String pathUri) throws Exception {
162        long blockSize = -1;
163        FileStatus fileStatus = getFileStatus(pathUri);
164        if (fileStatus != null) {
165            blockSize = fileStatus.getBlockSize();
166        }
167        return blockSize;
168    }
169
170    static class FSPathFilter implements PathFilter {
171        int count = 0;
172        int globMax = Integer.MAX_VALUE;
173        public FSPathFilter() {
174            globMax = ConfigurationService.getInt(LauncherMapper.CONF_OOZIE_ACTION_FS_GLOB_MAX);
175        }
176        @Override
177        public boolean accept(Path p) {
178            count++;
179            if(count > globMax) {
180                throw new ReachingGlobMaxException();
181            }
182            return true;
183        }
184    }
185
186    /**
187     * ReachingGlobMaxException thrown when globbed file count exceeds the limit
188     */
189    static class ReachingGlobMaxException extends RuntimeException {
190    }
191
192}