001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.action.hadoop; 020 021import java.io.IOException; 022import java.net.URI; 023import java.net.URISyntaxException; 024 025import org.apache.hadoop.fs.FileStatus; 026import org.apache.hadoop.fs.FileSystem; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.fs.PathFilter; 029import org.apache.hadoop.mapred.JobConf; 030import org.apache.oozie.DagELFunctions; 031import org.apache.oozie.action.ActionExecutorException; 032import org.apache.oozie.client.WorkflowJob; 033import org.apache.oozie.service.ConfigurationService; 034import org.apache.oozie.service.HadoopAccessorException; 035import org.apache.oozie.service.Services; 036import org.apache.oozie.service.HadoopAccessorService; 037 038/** 039 * EL function for fs action executor. 040 */ 041public class FsELFunctions { 042 043 private static FileSystem getFileSystem(URI uri) throws HadoopAccessorException { 044 WorkflowJob workflow = DagELFunctions.getWorkflow(); 045 String user = workflow.getUser(); 046 String group = workflow.getGroup(); 047 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 048 JobConf conf = has.createJobConf(uri.getAuthority()); 049 return has.createFileSystem(user, uri, conf); 050 } 051 052 /** 053 * Get file status. 054 * 055 * @param pathUri fs path uri 056 * @return file status 057 * @throws URISyntaxException 058 * @throws IOException 059 * @throws Exception 060 */ 061 private static FileStatus getFileStatus(String pathUri) throws Exception { 062 URI uri = new URI(pathUri); 063 String path = uri.getPath(); 064 FileSystem fs = getFileSystem(uri); 065 Path p = new Path(path); 066 return fs.exists(p) ? fs.getFileStatus(p) : null; 067 } 068 069 /** 070 * Return if a path exists. 071 * 072 * @param pathUri file system path uri. 073 * @return <code>true</code> if the path exists, <code>false</code> if it does not. 074 * @throws Exception 075 */ 076 public static boolean fs_exists(String pathUri) throws Exception { 077 Path path = new Path(pathUri); 078 FileSystem fs = getFileSystem(path.toUri()); 079 FileStatus[] pathArr; 080 try { 081 pathArr = fs.globStatus(path, new FSPathFilter()); 082 } 083 catch (ReachingGlobMaxException e) { 084 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS013", 085 "too many globbed files/dirs to do FS operation"); 086 } 087 return (pathArr != null && pathArr.length > 0); 088 } 089 090 /** 091 * Return if a path is a directory. 092 * 093 * @param pathUri fs path uri. 094 * @return <code>true</code> if the path exists and it is a directory, <code>false</code> otherwise. 095 * @throws Exception 096 */ 097 public static boolean fs_isDir(String pathUri) throws Exception { 098 boolean isDir = false; 099 FileStatus fileStatus = getFileStatus(pathUri); 100 if (fileStatus != null) { 101 isDir = fileStatus.isDir(); 102 } 103 return isDir; 104 } 105 106 /** 107 * Return the len of a file. 108 * 109 * @param pathUri file system path uri. 110 * @return the file len in bytes, -1 if the file does not exist or if it is a directory. 111 * @throws Exception 112 */ 113 public static long fs_fileSize(String pathUri) throws Exception { 114 long len = -1; 115 FileStatus fileStatus = getFileStatus(pathUri); 116 if (fileStatus != null) { 117 len = fileStatus.getLen(); 118 } 119 return len; 120 } 121 122 /** 123 * Return the size of all files in the directory, it is not recursive. 124 * 125 * @param pathUri file system path uri. 126 * @return the size of all files in the directory, -1 if the directory does not exist or if it is a file. 127 * @throws Exception 128 */ 129 public static long fs_dirSize(String pathUri) throws Exception { 130 URI uri = new URI(pathUri); 131 String path = uri.getPath(); 132 long size = -1; 133 try { 134 FileSystem fs = getFileSystem(uri); 135 Path p = new Path(path); 136 if (fs.exists(p) && !fs.isFile(p)) { 137 FileStatus[] stati = fs.listStatus(p); 138 size = 0; 139 if (stati != null) { 140 for (FileStatus status : stati) { 141 if (!status.isDir()) { 142 size += status.getLen(); 143 } 144 } 145 } 146 } 147 } 148 catch (Exception ex) { 149 throw new RuntimeException(ex); 150 } 151 return size; 152 } 153 154 /** 155 * Return the file block size in bytes. 156 * 157 * @param pathUri file system path uri. 158 * @return the block size of the file in bytes, -1 if the file does not exist or if it is a directory. 159 * @throws Exception 160 */ 161 public static long fs_blockSize(String pathUri) throws Exception { 162 long blockSize = -1; 163 FileStatus fileStatus = getFileStatus(pathUri); 164 if (fileStatus != null) { 165 blockSize = fileStatus.getBlockSize(); 166 } 167 return blockSize; 168 } 169 170 static class FSPathFilter implements PathFilter { 171 int count = 0; 172 int globMax = Integer.MAX_VALUE; 173 public FSPathFilter() { 174 globMax = ConfigurationService.getInt(LauncherMapper.CONF_OOZIE_ACTION_FS_GLOB_MAX); 175 } 176 @Override 177 public boolean accept(Path p) { 178 count++; 179 if(count > globMax) { 180 throw new ReachingGlobMaxException(); 181 } 182 return true; 183 } 184 } 185 186 /** 187 * ReachingGlobMaxException thrown when globbed file count exceeds the limit 188 */ 189 static class ReachingGlobMaxException extends RuntimeException { 190 } 191 192}