001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.action.hadoop; 019 020 import java.io.IOException; 021 import java.net.URISyntaxException; 022 import java.util.List; 023 024 import org.apache.hadoop.fs.FileStatus; 025 import org.apache.hadoop.fs.FileSystem; 026 import org.apache.hadoop.fs.Path; 027 import org.apache.hadoop.fs.permission.FsPermission; 028 import org.apache.hadoop.mapred.JobConf; 029 import org.apache.oozie.action.ActionExecutor; 030 import org.apache.oozie.action.ActionExecutorException; 031 import org.apache.oozie.client.WorkflowAction; 032 import org.apache.oozie.service.HadoopAccessorException; 033 import org.apache.oozie.service.HadoopAccessorService; 034 import org.apache.oozie.service.Services; 035 import org.apache.oozie.util.XConfiguration; 036 import org.apache.oozie.util.XmlUtils; 037 import org.jdom.Element; 038 039 /** 040 * File system action executor. <p/> This executes the file system mkdir, move and delete commands 041 */ 042 public class FsActionExecutor extends ActionExecutor { 043 044 public FsActionExecutor() { 045 super("fs"); 046 } 047 048 Path getPath(Element element, String attribute) { 049 String str = element.getAttributeValue(attribute).trim(); 050 return new Path(str); 051 } 052 053 void validatePath(Path path, boolean withScheme) throws ActionExecutorException { 054 String scheme = path.toUri().getScheme(); 055 if (withScheme) { 056 if (scheme == null) { 057 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS001", 058 "Missing scheme in path [{0}]", path); 059 } 060 else { 061 if (!scheme.equals("hdfs")) { 062 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS002", 063 "Scheme [{0}] not supported in path [{1}]", scheme, path); 064 } 065 } 066 } 067 else { 068 if (scheme != null) { 069 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS003", 070 "Scheme [{0}] not allowed in path [{1}]", scheme, path); 071 } 072 } 073 } 074 075 void validateSameNN(Path source, Path dest) throws ActionExecutorException { 076 Path destPath = new Path(source, dest); 077 String t = destPath.toUri().getScheme() + destPath.toUri().getAuthority(); 078 String s = source.toUri().getScheme() + source.toUri().getAuthority(); 079 080 //checking whether NN prefix of source and target is same. can modify this to adjust for a set of multiple whitelisted NN 081 if(!t.equals(s)) { 082 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS007", 083 "move, target NN URI different from that of source", dest); 084 } 085 } 086 087 @SuppressWarnings("unchecked") 088 void doOperations(Context context, Element element) throws ActionExecutorException { 089 try { 090 FileSystem fs = context.getAppFileSystem(); 091 boolean recovery = fs.exists(getRecoveryPath(context)); 092 if (!recovery) { 093 fs.mkdirs(getRecoveryPath(context)); 094 } 095 for (Element commandElement : (List<Element>) element.getChildren()) { 096 String command = commandElement.getName(); 097 if (command.equals("mkdir")) { 098 Path path = getPath(commandElement, "path"); 099 mkdir(context, path); 100 } 101 else { 102 if (command.equals("delete")) { 103 Path path = getPath(commandElement, "path"); 104 delete(context, path); 105 } 106 else { 107 if (command.equals("move")) { 108 Path source = getPath(commandElement, "source"); 109 Path target = getPath(commandElement, "target"); 110 move(context, source, target, recovery); 111 } 112 else { 113 if (command.equals("chmod")) { 114 Path path = getPath(commandElement, "path"); 115 String str = commandElement.getAttributeValue("dir-files"); 116 boolean dirFiles = (str == null) || Boolean.parseBoolean(str); 117 String permissionsMask = commandElement.getAttributeValue("permissions").trim(); 118 chmod(context, path, permissionsMask, dirFiles); 119 } 120 } 121 } 122 } 123 } 124 } 125 catch (Exception ex) { 126 throw convertException(ex); 127 } 128 } 129 130 /** 131 * @param path 132 * @param context 133 * @return FileSystem 134 * @throws HadoopAccessorException 135 */ 136 private FileSystem getFileSystemFor(Path path, Context context) throws HadoopAccessorException { 137 String user = context.getWorkflow().getUser(); 138 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 139 JobConf conf = has.createJobConf(path.toUri().getAuthority()); 140 XConfiguration.copy(context.getProtoActionConf(), conf); 141 return has.createFileSystem(user, path.toUri(), conf); 142 } 143 144 /** 145 * @param path 146 * @param user 147 * @param group 148 * @return FileSystem 149 * @throws HadoopAccessorException 150 */ 151 private FileSystem getFileSystemFor(Path path, String user) throws HadoopAccessorException { 152 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 153 JobConf jobConf = has.createJobConf(path.toUri().getAuthority()); 154 return has.createFileSystem(user, path.toUri(), jobConf); 155 } 156 157 void mkdir(Context context, Path path) throws ActionExecutorException { 158 try { 159 validatePath(path, true); 160 FileSystem fs = getFileSystemFor(path, context); 161 162 if (!fs.exists(path)) { 163 if (!fs.mkdirs(path)) { 164 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS004", 165 "mkdir, path [{0}] could not create directory", path); 166 } 167 } 168 } 169 catch (Exception ex) { 170 throw convertException(ex); 171 } 172 } 173 174 /** 175 * Delete path 176 * 177 * @param context 178 * @param path 179 * @throws ActionExecutorException 180 */ 181 public void delete(Context context, Path path) throws ActionExecutorException { 182 try { 183 validatePath(path, true); 184 FileSystem fs = getFileSystemFor(path, context); 185 186 if (fs.exists(path)) { 187 if (!fs.delete(path, true)) { 188 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS005", 189 "delete, path [{0}] could not delete path", path); 190 } 191 } 192 } 193 catch (Exception ex) { 194 throw convertException(ex); 195 } 196 } 197 198 /** 199 * Delete path 200 * 201 * @param user 202 * @param group 203 * @param path 204 * @throws ActionExecutorException 205 */ 206 public void delete(String user, String group, Path path) throws ActionExecutorException { 207 try { 208 validatePath(path, true); 209 FileSystem fs = getFileSystemFor(path, user); 210 211 if (fs.exists(path)) { 212 if (!fs.delete(path, true)) { 213 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS005", 214 "delete, path [{0}] could not delete path", path); 215 } 216 } 217 } 218 catch (Exception ex) { 219 throw convertException(ex); 220 } 221 } 222 223 /** 224 * Move source to target 225 * 226 * @param context 227 * @param source 228 * @param target 229 * @param recovery 230 * @throws ActionExecutorException 231 */ 232 public void move(Context context, Path source, Path target, boolean recovery) throws ActionExecutorException { 233 try { 234 validatePath(source, true); 235 validateSameNN(source, target); 236 FileSystem fs = getFileSystemFor(source, context); 237 238 if (!fs.exists(source) && !recovery) { 239 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS006", 240 "move, source path [{0}] does not exist", source); 241 } 242 243 if (!fs.rename(source, target) && !recovery) { 244 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS008", 245 "move, could not move [{0}] to [{1}]", source, target); 246 } 247 } 248 catch (Exception ex) { 249 throw convertException(ex); 250 } 251 } 252 253 void chmod(Context context, Path path, String permissions, boolean dirFiles) throws ActionExecutorException { 254 try { 255 validatePath(path, true); 256 FileSystem fs = getFileSystemFor(path, context); 257 258 if (!fs.exists(path)) { 259 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS009", 260 "chmod, path [{0}] does not exist", path); 261 } 262 263 FileStatus pathStatus = fs.getFileStatus(path); 264 265 Path[] paths; 266 if (dirFiles && pathStatus.isDir()) { 267 FileStatus[] filesStatus = fs.listStatus(path); 268 paths = new Path[filesStatus.length]; 269 for (int i = 0; i < filesStatus.length; i++) { 270 paths[i] = filesStatus[i].getPath(); 271 } 272 } 273 else { 274 paths = new Path[]{path}; 275 } 276 277 FsPermission newFsPermission = createShortPermission(permissions, path); 278 fs.setPermission(path, newFsPermission); 279 for (Path p : paths) { 280 fs.setPermission(p, newFsPermission); 281 } 282 } 283 catch (Exception ex) { 284 throw convertException(ex); 285 } 286 } 287 288 FsPermission createShortPermission(String permissions, Path path) throws ActionExecutorException { 289 if (permissions.length() == 3) { 290 char user = permissions.charAt(0); 291 char group = permissions.charAt(1); 292 char other = permissions.charAt(2); 293 int useri = user - '0'; 294 int groupi = group - '0'; 295 int otheri = other - '0'; 296 int mask = useri * 100 + groupi * 10 + otheri; 297 short omask = Short.parseShort(Integer.toString(mask), 8); 298 return new FsPermission(omask); 299 } 300 else { 301 if (permissions.length() == 10) { 302 return FsPermission.valueOf(permissions); 303 } 304 else { 305 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS010", 306 "chmod, path [{0}] invalid permissions mask [{1}]", path, permissions); 307 } 308 } 309 } 310 311 @Override 312 public void check(Context context, WorkflowAction action) throws ActionExecutorException { 313 } 314 315 @Override 316 public void kill(Context context, WorkflowAction action) throws ActionExecutorException { 317 } 318 319 @Override 320 public void start(Context context, WorkflowAction action) throws ActionExecutorException { 321 try { 322 context.setStartData("-", "-", "-"); 323 Element actionXml = XmlUtils.parseXml(action.getConf()); 324 doOperations(context, actionXml); 325 context.setExecutionData("OK", null); 326 } 327 catch (Exception ex) { 328 throw convertException(ex); 329 } 330 } 331 332 @Override 333 public void end(Context context, WorkflowAction action) throws ActionExecutorException { 334 String externalStatus = action.getExternalStatus(); 335 WorkflowAction.Status status = externalStatus.equals("OK") ? WorkflowAction.Status.OK : 336 WorkflowAction.Status.ERROR; 337 context.setEndData(status, getActionSignal(status)); 338 if (!context.getProtoActionConf().getBoolean("oozie.action.keep.action.dir", false)) { 339 try { 340 FileSystem fs = context.getAppFileSystem(); 341 fs.delete(context.getActionDir(), true); 342 } 343 catch (Exception ex) { 344 throw convertException(ex); 345 } 346 } 347 } 348 349 @Override 350 public boolean isCompleted(String externalStatus) { 351 return true; 352 } 353 354 /** 355 * @param context 356 * @return 357 * @throws HadoopAccessorException 358 * @throws IOException 359 * @throws URISyntaxException 360 */ 361 public Path getRecoveryPath(Context context) throws HadoopAccessorException, IOException, URISyntaxException { 362 return new Path(context.getActionDir(), "fs-" + context.getRecoveryId()); 363 } 364 365 }