001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.action.hadoop; 019 020 import java.io.IOException; 021 import java.io.StringReader; 022 import java.net.URISyntaxException; 023 import java.util.Iterator; 024 import java.util.List; 025 026 import org.apache.hadoop.fs.FSDataOutputStream; 027 import org.apache.hadoop.fs.FileStatus; 028 import org.apache.hadoop.fs.FileSystem; 029 import org.apache.hadoop.fs.Path; 030 import org.apache.hadoop.fs.permission.FsPermission; 031 import org.apache.hadoop.mapred.JobConf; 032 import org.apache.oozie.action.ActionExecutor; 033 import org.apache.oozie.action.ActionExecutorException; 034 import org.apache.oozie.client.WorkflowAction; 035 import org.apache.oozie.service.HadoopAccessorException; 036 import org.apache.oozie.service.HadoopAccessorService; 037 import org.apache.oozie.service.Services; 038 import org.apache.oozie.util.XConfiguration; 039 import org.apache.oozie.util.XmlUtils; 040 import org.jdom.Element; 041 042 /** 043 * File system action executor. <p/> This executes the file system mkdir, move and delete commands 044 */ 045 public class FsActionExecutor extends ActionExecutor { 046 047 public FsActionExecutor() { 048 super("fs"); 049 } 050 051 Path getPath(Element element, String attribute) { 052 String str = element.getAttributeValue(attribute).trim(); 053 return new Path(str); 054 } 055 056 void validatePath(Path path, boolean withScheme) throws ActionExecutorException { 057 try { 058 String scheme = path.toUri().getScheme(); 059 if (withScheme) { 060 if (scheme == null) { 061 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS001", 062 "Missing scheme in path [{0}]", path); 063 } 064 else { 065 Services.get().get(HadoopAccessorService.class).checkSupportedFilesystem(path.toUri()); 066 } 067 } 068 else { 069 if (scheme != null) { 070 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS002", 071 "Scheme [{0}] not allowed in path [{1}]", scheme, path); 072 } 073 } 074 } 075 catch (HadoopAccessorException hex) { 076 throw convertException(hex); 077 } 078 } 079 080 Path resolveToFullPath(Path nameNode, Path path, boolean withScheme) throws ActionExecutorException { 081 Path fullPath; 082 083 // If no nameNode is given, validate the path as-is and return it as-is 084 if (nameNode == null) { 085 validatePath(path, withScheme); 086 fullPath = path; 087 } else { 088 // If the path doesn't have a scheme or authority, use the nameNode which should have already been verified earlier 089 String pathScheme = path.toUri().getScheme(); 090 String pathAuthority = path.toUri().getAuthority(); 091 if (pathScheme == null || pathAuthority == null) { 092 if (path.isAbsolute()) { 093 String nameNodeSchemeAuthority = nameNode.toUri().getScheme() + "://" + nameNode.toUri().getAuthority(); 094 fullPath = new Path(nameNodeSchemeAuthority + path.toString()); 095 } else { 096 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS011", 097 "Path [{0}] cannot be relative", path); 098 } 099 } else { 100 // If the path has a scheme and authority, but its not the nameNode then validate the path as-is and return it as-is 101 // If it is the nameNode, then it should have already been verified earlier so return it as-is 102 if (!nameNode.toUri().getScheme().equals(pathScheme) || !nameNode.toUri().getAuthority().equals(pathAuthority)) { 103 validatePath(path, withScheme); 104 } 105 fullPath = path; 106 } 107 } 108 return fullPath; 109 } 110 111 void validateSameNN(Path source, Path dest) throws ActionExecutorException { 112 Path destPath = new Path(source, dest); 113 String t = destPath.toUri().getScheme() + destPath.toUri().getAuthority(); 114 String s = source.toUri().getScheme() + source.toUri().getAuthority(); 115 116 //checking whether NN prefix of source and target is same. can modify this to adjust for a set of multiple whitelisted NN 117 if(!t.equals(s)) { 118 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS007", 119 "move, target NN URI different from that of source", dest); 120 } 121 } 122 123 @SuppressWarnings("unchecked") 124 void doOperations(Context context, Element element) throws ActionExecutorException { 125 try { 126 FileSystem fs = context.getAppFileSystem(); 127 boolean recovery = fs.exists(getRecoveryPath(context)); 128 if (!recovery) { 129 fs.mkdirs(getRecoveryPath(context)); 130 } 131 132 Path nameNodePath = null; 133 Element nameNodeElement = element.getChild("name-node", element.getNamespace()); 134 if (nameNodeElement != null) { 135 String nameNode = nameNodeElement.getTextTrim(); 136 if (nameNode != null) { 137 nameNodePath = new Path(nameNode); 138 // Verify the name node now 139 validatePath(nameNodePath, true); 140 } 141 } 142 143 XConfiguration fsConf = new XConfiguration(); 144 Path appPath = new Path(context.getWorkflow().getAppPath()); 145 // app path could be a file 146 if (fs.isFile(appPath)) { 147 appPath = appPath.getParent(); 148 } 149 JavaActionExecutor.parseJobXmlAndConfiguration(context, element, appPath, fsConf); 150 151 for (Element commandElement : (List<Element>) element.getChildren()) { 152 String command = commandElement.getName(); 153 if (command.equals("mkdir")) { 154 Path path = getPath(commandElement, "path"); 155 mkdir(context, fsConf, nameNodePath, path); 156 } 157 else { 158 if (command.equals("delete")) { 159 Path path = getPath(commandElement, "path"); 160 delete(context, fsConf,nameNodePath, path); 161 } 162 else { 163 if (command.equals("move")) { 164 Path source = getPath(commandElement, "source"); 165 Path target = getPath(commandElement, "target"); 166 move(context, fsConf,nameNodePath, source, target, recovery); 167 } 168 else { 169 if (command.equals("chmod")) { 170 Path path = getPath(commandElement, "path"); 171 boolean recursive = commandElement.getChild("recursive", commandElement.getNamespace()) != null; 172 String str = commandElement.getAttributeValue("dir-files"); 173 boolean dirFiles = (str == null) || Boolean.parseBoolean(str); 174 String permissionsMask = commandElement.getAttributeValue("permissions").trim(); 175 chmod(context, fsConf,nameNodePath, path, permissionsMask, dirFiles, recursive); 176 } 177 else { 178 if (command.equals("touchz")) { 179 Path path = getPath(commandElement, "path"); 180 touchz(context, fsConf,nameNodePath, path); 181 } 182 } 183 } 184 } 185 } 186 } 187 } 188 catch (Exception ex) { 189 throw convertException(ex); 190 } 191 } 192 193 /** 194 * @param path 195 * @param context 196 * @param fsConf 197 * @return FileSystem 198 * @throws HadoopAccessorException 199 */ 200 private FileSystem getFileSystemFor(Path path, Context context, XConfiguration fsConf) throws HadoopAccessorException { 201 String user = context.getWorkflow().getUser(); 202 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 203 JobConf conf = has.createJobConf(path.toUri().getAuthority()); 204 XConfiguration.copy(context.getProtoActionConf(), conf); 205 if (fsConf != null) { 206 XConfiguration.copy(fsConf, conf); 207 } 208 return has.createFileSystem(user, path.toUri(), conf); 209 } 210 211 /** 212 * @param path 213 * @param user 214 * @param group 215 * @return FileSystem 216 * @throws HadoopAccessorException 217 */ 218 private FileSystem getFileSystemFor(Path path, String user) throws HadoopAccessorException { 219 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 220 JobConf jobConf = has.createJobConf(path.toUri().getAuthority()); 221 return has.createFileSystem(user, path.toUri(), jobConf); 222 } 223 224 void mkdir(Context context, Path path) throws ActionExecutorException { 225 mkdir(context, null, null, path); 226 } 227 228 void mkdir(Context context, XConfiguration fsConf, Path nameNodePath, Path path) throws ActionExecutorException { 229 try { 230 path = resolveToFullPath(nameNodePath, path, true); 231 FileSystem fs = getFileSystemFor(path, context, fsConf); 232 233 if (!fs.exists(path)) { 234 if (!fs.mkdirs(path)) { 235 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS004", 236 "mkdir, path [{0}] could not create directory", path); 237 } 238 } 239 } 240 catch (Exception ex) { 241 throw convertException(ex); 242 } 243 } 244 245 /** 246 * Delete path 247 * 248 * @param context 249 * @param path 250 * @throws ActionExecutorException 251 */ 252 public void delete(Context context, Path path) throws ActionExecutorException { 253 delete(context, null, null, path); 254 } 255 256 /** 257 * Delete path 258 * 259 * @param context 260 * @param fsConf 261 * @param nameNodePath 262 * @param path 263 * @throws ActionExecutorException 264 */ 265 public void delete(Context context, XConfiguration fsConf, Path nameNodePath, Path path) throws ActionExecutorException { 266 try { 267 path = resolveToFullPath(nameNodePath, path, true); 268 FileSystem fs = getFileSystemFor(path, context, fsConf); 269 270 if (fs.exists(path)) { 271 if (!fs.delete(path, true)) { 272 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS005", 273 "delete, path [{0}] could not delete path", path); 274 } 275 } 276 } 277 catch (Exception ex) { 278 throw convertException(ex); 279 } 280 } 281 282 /** 283 * Delete path 284 * 285 * @param user 286 * @param group 287 * @param path 288 * @throws ActionExecutorException 289 */ 290 public void delete(String user, String group, Path path) throws ActionExecutorException { 291 try { 292 validatePath(path, true); 293 FileSystem fs = getFileSystemFor(path, user); 294 295 if (fs.exists(path)) { 296 if (!fs.delete(path, true)) { 297 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS005", 298 "delete, path [{0}] could not delete path", path); 299 } 300 } 301 } 302 catch (Exception ex) { 303 throw convertException(ex); 304 } 305 } 306 307 /** 308 * Move source to target 309 * 310 * @param context 311 * @param source 312 * @param target 313 * @param recovery 314 * @throws ActionExecutorException 315 */ 316 public void move(Context context, Path source, Path target, boolean recovery) throws ActionExecutorException { 317 move(context, null, null, source, target, recovery); 318 } 319 320 /** 321 * Move source to target 322 * 323 * @param context 324 * @param fsConf 325 * @param nameNodePath 326 * @param source 327 * @param target 328 * @param recovery 329 * @throws ActionExecutorException 330 */ 331 public void move(Context context, XConfiguration fsConf, Path nameNodePath, Path source, Path target, boolean recovery) 332 throws ActionExecutorException { 333 try { 334 source = resolveToFullPath(nameNodePath, source, true); 335 validateSameNN(source, target); 336 FileSystem fs = getFileSystemFor(source, context, fsConf); 337 338 if (!fs.exists(source) && !recovery) { 339 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS006", 340 "move, source path [{0}] does not exist", source); 341 } 342 343 if (!fs.rename(source, target) && !recovery) { 344 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS008", 345 "move, could not move [{0}] to [{1}]", source, target); 346 } 347 } 348 catch (Exception ex) { 349 throw convertException(ex); 350 } 351 } 352 353 void chmod(Context context, Path path, String permissions, boolean dirFiles, boolean recursive) throws ActionExecutorException { 354 chmod(context, null, null, path, permissions, dirFiles, recursive); 355 } 356 357 void chmod(Context context, XConfiguration fsConf, Path nameNodePath, Path path, String permissions, boolean dirFiles, 358 boolean recursive) throws ActionExecutorException { 359 try { 360 path = resolveToFullPath(nameNodePath, path, true); 361 FileSystem fs = getFileSystemFor(path, context, fsConf); 362 363 if (!fs.exists(path)) { 364 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS009", 365 "chmod, path [{0}] does not exist", path); 366 } 367 368 FileStatus pathStatus = fs.getFileStatus(path); 369 370 Path[] paths; 371 if (dirFiles && pathStatus.isDir()) { 372 FileStatus[] filesStatus = fs.listStatus(path); 373 paths = new Path[filesStatus.length]; 374 for (int i = 0; i < filesStatus.length; i++) { 375 paths[i] = filesStatus[i].getPath(); 376 if (recursive && filesStatus[i].isDir()){ 377 chmod(context, fsConf, nameNodePath, paths[i], permissions, dirFiles, recursive); 378 } 379 } 380 } 381 else { 382 paths = new Path[]{path}; 383 } 384 385 FsPermission newFsPermission = createShortPermission(permissions, path); 386 fs.setPermission(path, newFsPermission); 387 for (Path p : paths) { 388 fs.setPermission(p, newFsPermission); 389 } 390 } 391 catch (Exception ex) { 392 throw convertException(ex); 393 } 394 } 395 396 void touchz(Context context, Path path) throws ActionExecutorException { 397 touchz(context, null, null, path); 398 } 399 400 void touchz(Context context, XConfiguration fsConf, Path nameNodePath, Path path) throws ActionExecutorException { 401 try { 402 path = resolveToFullPath(nameNodePath, path, true); 403 FileSystem fs = getFileSystemFor(path, context, fsConf); 404 405 FileStatus st; 406 if (fs.exists(path)) { 407 st = fs.getFileStatus(path); 408 if (st.isDir()) { 409 throw new Exception(path.toString() + " is a directory"); 410 } else if (st.getLen() != 0) 411 throw new Exception(path.toString() + " must be a zero-length file"); 412 } 413 FSDataOutputStream out = fs.create(path); 414 out.close(); 415 } 416 catch (Exception ex) { 417 throw convertException(ex); 418 } 419 } 420 421 FsPermission createShortPermission(String permissions, Path path) throws ActionExecutorException { 422 if (permissions.length() == 3) { 423 char user = permissions.charAt(0); 424 char group = permissions.charAt(1); 425 char other = permissions.charAt(2); 426 int useri = user - '0'; 427 int groupi = group - '0'; 428 int otheri = other - '0'; 429 int mask = useri * 100 + groupi * 10 + otheri; 430 short omask = Short.parseShort(Integer.toString(mask), 8); 431 return new FsPermission(omask); 432 } 433 else { 434 if (permissions.length() == 10) { 435 return FsPermission.valueOf(permissions); 436 } 437 else { 438 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS010", 439 "chmod, path [{0}] invalid permissions mask [{1}]", path, permissions); 440 } 441 } 442 } 443 444 @Override 445 public void check(Context context, WorkflowAction action) throws ActionExecutorException { 446 } 447 448 @Override 449 public void kill(Context context, WorkflowAction action) throws ActionExecutorException { 450 } 451 452 @Override 453 public void start(Context context, WorkflowAction action) throws ActionExecutorException { 454 try { 455 context.setStartData("-", "-", "-"); 456 Element actionXml = XmlUtils.parseXml(action.getConf()); 457 doOperations(context, actionXml); 458 context.setExecutionData("OK", null); 459 } 460 catch (Exception ex) { 461 throw convertException(ex); 462 } 463 } 464 465 @Override 466 public void end(Context context, WorkflowAction action) throws ActionExecutorException { 467 String externalStatus = action.getExternalStatus(); 468 WorkflowAction.Status status = externalStatus.equals("OK") ? WorkflowAction.Status.OK : 469 WorkflowAction.Status.ERROR; 470 context.setEndData(status, getActionSignal(status)); 471 if (!context.getProtoActionConf().getBoolean("oozie.action.keep.action.dir", false)) { 472 try { 473 FileSystem fs = context.getAppFileSystem(); 474 fs.delete(context.getActionDir(), true); 475 } 476 catch (Exception ex) { 477 throw convertException(ex); 478 } 479 } 480 } 481 482 @Override 483 public boolean isCompleted(String externalStatus) { 484 return true; 485 } 486 487 /** 488 * @param context 489 * @return 490 * @throws HadoopAccessorException 491 * @throws IOException 492 * @throws URISyntaxException 493 */ 494 public Path getRecoveryPath(Context context) throws HadoopAccessorException, IOException, URISyntaxException { 495 return new Path(context.getActionDir(), "fs-" + context.getRecoveryId()); 496 } 497 498 }