001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.action.hadoop; 019 020import java.io.IOException; 021import java.net.URISyntaxException; 022import java.util.ArrayList; 023import java.util.HashMap; 024import java.util.List; 025import java.util.Map; 026 027import org.apache.hadoop.fs.FSDataOutputStream; 028import org.apache.hadoop.fs.FileStatus; 029import org.apache.hadoop.fs.FileSystem; 030import org.apache.hadoop.fs.FileUtil; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.fs.permission.FsPermission; 033import org.apache.hadoop.mapred.JobConf; 034import org.apache.oozie.action.ActionExecutor; 035import org.apache.oozie.action.ActionExecutorException; 036import org.apache.oozie.client.WorkflowAction; 037import org.apache.oozie.service.HadoopAccessorException; 038import org.apache.oozie.service.HadoopAccessorService; 039import org.apache.oozie.service.Services; 040import org.apache.oozie.util.XConfiguration; 041import org.apache.oozie.util.XmlUtils; 042import org.jdom.Element; 043 044/** 045 * File system action executor. <p/> This executes the file system mkdir, move and delete commands 046 */ 047public class FsActionExecutor extends ActionExecutor { 048 049 private final int maxGlobCount; 050 051 public FsActionExecutor() { 052 super("fs"); 053 maxGlobCount = getOozieConf().getInt(LauncherMapper.CONF_OOZIE_ACTION_FS_GLOB_MAX, 054 LauncherMapper.GLOB_MAX_DEFAULT); 055 } 056 057 Path getPath(Element element, String attribute) { 058 String str = element.getAttributeValue(attribute).trim(); 059 return new Path(str); 060 } 061 062 void validatePath(Path path, boolean withScheme) throws ActionExecutorException { 063 try { 064 String scheme = path.toUri().getScheme(); 065 if (withScheme) { 066 if (scheme == null) { 067 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS001", 068 "Missing scheme in path [{0}]", path); 069 } 070 else { 071 Services.get().get(HadoopAccessorService.class).checkSupportedFilesystem(path.toUri()); 072 } 073 } 074 else { 075 if (scheme != null) { 076 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS002", 077 "Scheme [{0}] not allowed in path [{1}]", scheme, path); 078 } 079 } 080 } 081 catch (HadoopAccessorException hex) { 082 throw convertException(hex); 083 } 084 } 085 086 Path resolveToFullPath(Path nameNode, Path path, boolean withScheme) throws ActionExecutorException { 087 Path fullPath; 088 089 // If no nameNode is given, validate the path as-is and return it as-is 090 if (nameNode == null) { 091 validatePath(path, withScheme); 092 fullPath = path; 093 } else { 094 // If the path doesn't have a scheme or authority, use the nameNode which should have already been verified earlier 095 String pathScheme = path.toUri().getScheme(); 096 String pathAuthority = path.toUri().getAuthority(); 097 if (pathScheme == null || pathAuthority == null) { 098 if (path.isAbsolute()) { 099 String nameNodeSchemeAuthority = nameNode.toUri().getScheme() + "://" + nameNode.toUri().getAuthority(); 100 fullPath = new Path(nameNodeSchemeAuthority + path.toString()); 101 } else { 102 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS011", 103 "Path [{0}] cannot be relative", path); 104 } 105 } else { 106 // If the path has a scheme and authority, but its not the nameNode then validate the path as-is and return it as-is 107 // If it is the nameNode, then it should have already been verified earlier so return it as-is 108 if (!nameNode.toUri().getScheme().equals(pathScheme) || !nameNode.toUri().getAuthority().equals(pathAuthority)) { 109 validatePath(path, withScheme); 110 } 111 fullPath = path; 112 } 113 } 114 return fullPath; 115 } 116 117 void validateSameNN(Path source, Path dest) throws ActionExecutorException { 118 Path destPath = new Path(source, dest); 119 String t = destPath.toUri().getScheme() + destPath.toUri().getAuthority(); 120 String s = source.toUri().getScheme() + source.toUri().getAuthority(); 121 122 //checking whether NN prefix of source and target is same. can modify this to adjust for a set of multiple whitelisted NN 123 if(!t.equals(s)) { 124 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS007", 125 "move, target NN URI different from that of source", dest); 126 } 127 } 128 129 @SuppressWarnings("unchecked") 130 void doOperations(Context context, Element element) throws ActionExecutorException { 131 try { 132 FileSystem fs = context.getAppFileSystem(); 133 boolean recovery = fs.exists(getRecoveryPath(context)); 134 if (!recovery) { 135 fs.mkdirs(getRecoveryPath(context)); 136 } 137 138 Path nameNodePath = null; 139 Element nameNodeElement = element.getChild("name-node", element.getNamespace()); 140 if (nameNodeElement != null) { 141 String nameNode = nameNodeElement.getTextTrim(); 142 if (nameNode != null) { 143 nameNodePath = new Path(nameNode); 144 // Verify the name node now 145 validatePath(nameNodePath, true); 146 } 147 } 148 149 XConfiguration fsConf = new XConfiguration(); 150 Path appPath = new Path(context.getWorkflow().getAppPath()); 151 // app path could be a file 152 if (fs.isFile(appPath)) { 153 appPath = appPath.getParent(); 154 } 155 JavaActionExecutor.parseJobXmlAndConfiguration(context, element, appPath, fsConf); 156 157 for (Element commandElement : (List<Element>) element.getChildren()) { 158 String command = commandElement.getName(); 159 if (command.equals("mkdir")) { 160 Path path = getPath(commandElement, "path"); 161 mkdir(context, fsConf, nameNodePath, path); 162 } 163 else { 164 if (command.equals("delete")) { 165 Path path = getPath(commandElement, "path"); 166 delete(context, fsConf, nameNodePath, path); 167 } 168 else { 169 if (command.equals("move")) { 170 Path source = getPath(commandElement, "source"); 171 Path target = getPath(commandElement, "target"); 172 move(context, fsConf, nameNodePath, source, target, recovery); 173 } 174 else { 175 if (command.equals("chmod")) { 176 Path path = getPath(commandElement, "path"); 177 boolean recursive = commandElement.getChild("recursive", commandElement.getNamespace()) != null; 178 String str = commandElement.getAttributeValue("dir-files"); 179 boolean dirFiles = (str == null) || Boolean.parseBoolean(str); 180 String permissionsMask = commandElement.getAttributeValue("permissions").trim(); 181 chmod(context, fsConf, nameNodePath, path, permissionsMask, dirFiles, recursive); 182 } 183 else { 184 if (command.equals("touchz")) { 185 Path path = getPath(commandElement, "path"); 186 touchz(context, fsConf, nameNodePath, path); 187 } 188 else { 189 if (command.equals("chgrp")) { 190 Path path = getPath(commandElement, "path"); 191 boolean recursive = commandElement.getChild("recursive", 192 commandElement.getNamespace()) != null; 193 String group = commandElement.getAttributeValue("group"); 194 String str = commandElement.getAttributeValue("dir-files"); 195 boolean dirFiles = (str == null) || Boolean.parseBoolean(str); 196 chgrp(context, fsConf, nameNodePath, path, context.getWorkflow().getUser(), 197 group, dirFiles, recursive); 198 } 199 } 200 } 201 } 202 } 203 } 204 } 205 } 206 catch (Exception ex) { 207 throw convertException(ex); 208 } 209 } 210 211 void chgrp(Context context, XConfiguration fsConf, Path nameNodePath, Path path, String user, String group, 212 boolean dirFiles, boolean recursive) throws ActionExecutorException { 213 214 HashMap<String, String> argsMap = new HashMap<String, String>(); 215 argsMap.put("user", user); 216 argsMap.put("group", group); 217 try { 218 FileSystem fs = getFileSystemFor(path, context, fsConf); 219 path = resolveToFullPath(nameNodePath, path, true); 220 Path[] pathArr = FileUtil.stat2Paths(fs.globStatus(path)); 221 if (pathArr == null || pathArr.length == 0) { 222 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS009", "chgrp" 223 + ", path(s) that matches [{0}] does not exist", path); 224 } 225 checkGlobMax(pathArr); 226 for (Path p : pathArr) { 227 recursiveFsOperation("chgrp", fs, nameNodePath, p, argsMap, dirFiles, recursive, true); 228 } 229 } 230 catch (Exception ex) { 231 throw convertException(ex); 232 } 233 } 234 235 private void recursiveFsOperation(String op, FileSystem fs, Path nameNodePath, Path path, 236 Map<String, String> argsMap, boolean dirFiles, boolean recursive, boolean isRoot) 237 throws ActionExecutorException { 238 239 try { 240 FileStatus pathStatus = fs.getFileStatus(path); 241 List<Path> paths = new ArrayList<Path>(); 242 243 if (dirFiles && pathStatus.isDir()) { 244 if (isRoot) { 245 paths.add(path); 246 } 247 FileStatus[] filesStatus = fs.listStatus(path); 248 for (int i = 0; i < filesStatus.length; i++) { 249 Path p = filesStatus[i].getPath(); 250 paths.add(p); 251 if (recursive && filesStatus[i].isDir()) { 252 recursiveFsOperation(op, fs, null, p, argsMap, dirFiles, recursive, false); 253 } 254 } 255 } 256 else { 257 paths.add(path); 258 } 259 for (Path p : paths) { 260 doFsOperation(op, fs, p, argsMap); 261 } 262 } 263 catch (Exception ex) { 264 throw convertException(ex); 265 } 266 } 267 268 private void doFsOperation(String op, FileSystem fs, Path p, Map<String, String> argsMap) 269 throws ActionExecutorException, IOException { 270 if (op.equals("chmod")) { 271 String permissions = argsMap.get("permissions"); 272 FsPermission newFsPermission = createShortPermission(permissions, p); 273 fs.setPermission(p, newFsPermission); 274 } 275 else if (op.equals("chgrp")) { 276 String user = argsMap.get("user"); 277 String group = argsMap.get("group"); 278 fs.setOwner(p, user, group); 279 } 280 } 281 282 /** 283 * @param path 284 * @param context 285 * @param fsConf 286 * @return FileSystem 287 * @throws HadoopAccessorException 288 */ 289 private FileSystem getFileSystemFor(Path path, Context context, XConfiguration fsConf) throws HadoopAccessorException { 290 String user = context.getWorkflow().getUser(); 291 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 292 JobConf conf = has.createJobConf(path.toUri().getAuthority()); 293 XConfiguration.copy(context.getProtoActionConf(), conf); 294 if (fsConf != null) { 295 XConfiguration.copy(fsConf, conf); 296 } 297 return has.createFileSystem(user, path.toUri(), conf); 298 } 299 300 /** 301 * @param path 302 * @param user 303 * @param group 304 * @return FileSystem 305 * @throws HadoopAccessorException 306 */ 307 private FileSystem getFileSystemFor(Path path, String user) throws HadoopAccessorException { 308 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 309 JobConf jobConf = has.createJobConf(path.toUri().getAuthority()); 310 return has.createFileSystem(user, path.toUri(), jobConf); 311 } 312 313 void mkdir(Context context, Path path) throws ActionExecutorException { 314 mkdir(context, null, null, path); 315 } 316 317 void mkdir(Context context, XConfiguration fsConf, Path nameNodePath, Path path) throws ActionExecutorException { 318 try { 319 path = resolveToFullPath(nameNodePath, path, true); 320 FileSystem fs = getFileSystemFor(path, context, fsConf); 321 322 if (!fs.exists(path)) { 323 if (!fs.mkdirs(path)) { 324 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS004", 325 "mkdir, path [{0}] could not create directory", path); 326 } 327 } 328 } 329 catch (Exception ex) { 330 throw convertException(ex); 331 } 332 } 333 334 /** 335 * Delete path 336 * 337 * @param context 338 * @param path 339 * @throws ActionExecutorException 340 */ 341 public void delete(Context context, Path path) throws ActionExecutorException { 342 delete(context, null, null, path); 343 } 344 345 /** 346 * Delete path 347 * 348 * @param context 349 * @param fsConf 350 * @param nameNodePath 351 * @param path 352 * @throws ActionExecutorException 353 */ 354 public void delete(Context context, XConfiguration fsConf, Path nameNodePath, Path path) throws ActionExecutorException { 355 try { 356 path = resolveToFullPath(nameNodePath, path, true); 357 FileSystem fs = getFileSystemFor(path, context, fsConf); 358 Path[] pathArr = FileUtil.stat2Paths(fs.globStatus(path)); 359 if (pathArr != null && pathArr.length > 0) { 360 checkGlobMax(pathArr); 361 for (Path p : pathArr) { 362 if (fs.exists(p)) { 363 if (!fs.delete(p, true)) { 364 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS005", 365 "delete, path [{0}] could not delete path", p); 366 } 367 } 368 } 369 } 370 } 371 catch (Exception ex) { 372 throw convertException(ex); 373 } 374 } 375 376 /** 377 * Delete path 378 * 379 * @param user 380 * @param group 381 * @param path 382 * @throws ActionExecutorException 383 */ 384 public void delete(String user, String group, Path path) throws ActionExecutorException { 385 try { 386 validatePath(path, true); 387 FileSystem fs = getFileSystemFor(path, user); 388 389 if (fs.exists(path)) { 390 if (!fs.delete(path, true)) { 391 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS005", 392 "delete, path [{0}] could not delete path", path); 393 } 394 } 395 } 396 catch (Exception ex) { 397 throw convertException(ex); 398 } 399 } 400 401 /** 402 * Move source to target 403 * 404 * @param context 405 * @param source 406 * @param target 407 * @param recovery 408 * @throws ActionExecutorException 409 */ 410 public void move(Context context, Path source, Path target, boolean recovery) throws ActionExecutorException { 411 move(context, null, null, source, target, recovery); 412 } 413 414 /** 415 * Move source to target 416 * 417 * @param context 418 * @param fsConf 419 * @param nameNodePath 420 * @param source 421 * @param target 422 * @param recovery 423 * @throws ActionExecutorException 424 */ 425 public void move(Context context, XConfiguration fsConf, Path nameNodePath, Path source, Path target, boolean recovery) 426 throws ActionExecutorException { 427 try { 428 source = resolveToFullPath(nameNodePath, source, true); 429 validateSameNN(source, target); 430 FileSystem fs = getFileSystemFor(source, context, fsConf); 431 Path[] pathArr = FileUtil.stat2Paths(fs.globStatus(source)); 432 if (( pathArr == null || pathArr.length == 0 ) ){ 433 if (!recovery) { 434 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS006", 435 "move, source path [{0}] does not exist", source); 436 } else { 437 return; 438 } 439 } 440 if (pathArr.length > 1 && (!fs.exists(target) || fs.isFile(target))) { 441 if(!recovery) { 442 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS012", 443 "move, could not rename multiple sources to the same target name"); 444 } else { 445 return; 446 } 447 } 448 checkGlobMax(pathArr); 449 for (Path p : pathArr) { 450 if (!fs.rename(p, target) && !recovery) { 451 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS008", 452 "move, could not move [{0}] to [{1}]", p, target); 453 } 454 } 455 } 456 catch (Exception ex) { 457 throw convertException(ex); 458 } 459 } 460 461 void chmod(Context context, Path path, String permissions, boolean dirFiles, boolean recursive) throws ActionExecutorException { 462 chmod(context, null, null, path, permissions, dirFiles, recursive); 463 } 464 465 void chmod(Context context, XConfiguration fsConf, Path nameNodePath, Path path, String permissions, 466 boolean dirFiles, boolean recursive) throws ActionExecutorException { 467 468 HashMap<String, String> argsMap = new HashMap<String, String>(); 469 argsMap.put("permissions", permissions); 470 try { 471 FileSystem fs = getFileSystemFor(path, context, fsConf); 472 path = resolveToFullPath(nameNodePath, path, true); 473 Path[] pathArr = FileUtil.stat2Paths(fs.globStatus(path)); 474 if (pathArr == null || pathArr.length == 0) { 475 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS009", "chmod" 476 + ", path(s) that matches [{0}] does not exist", path); 477 } 478 checkGlobMax(pathArr); 479 for (Path p : pathArr) { 480 recursiveFsOperation("chmod", fs, nameNodePath, p, argsMap, dirFiles, recursive, true); 481 } 482 483 } 484 catch (Exception ex) { 485 throw convertException(ex); 486 } 487 } 488 489 void touchz(Context context, Path path) throws ActionExecutorException { 490 touchz(context, null, null, path); 491 } 492 493 void touchz(Context context, XConfiguration fsConf, Path nameNodePath, Path path) throws ActionExecutorException { 494 try { 495 path = resolveToFullPath(nameNodePath, path, true); 496 FileSystem fs = getFileSystemFor(path, context, fsConf); 497 498 FileStatus st; 499 if (fs.exists(path)) { 500 st = fs.getFileStatus(path); 501 if (st.isDir()) { 502 throw new Exception(path.toString() + " is a directory"); 503 } else if (st.getLen() != 0) 504 throw new Exception(path.toString() + " must be a zero-length file"); 505 } 506 FSDataOutputStream out = fs.create(path); 507 out.close(); 508 } 509 catch (Exception ex) { 510 throw convertException(ex); 511 } 512 } 513 514 FsPermission createShortPermission(String permissions, Path path) throws ActionExecutorException { 515 if (permissions.length() == 3) { 516 char user = permissions.charAt(0); 517 char group = permissions.charAt(1); 518 char other = permissions.charAt(2); 519 int useri = user - '0'; 520 int groupi = group - '0'; 521 int otheri = other - '0'; 522 int mask = useri * 100 + groupi * 10 + otheri; 523 short omask = Short.parseShort(Integer.toString(mask), 8); 524 return new FsPermission(omask); 525 } 526 else { 527 if (permissions.length() == 10) { 528 return FsPermission.valueOf(permissions); 529 } 530 else { 531 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS010", 532 "chmod, path [{0}] invalid permissions mask [{1}]", path, permissions); 533 } 534 } 535 } 536 537 @Override 538 public void check(Context context, WorkflowAction action) throws ActionExecutorException { 539 } 540 541 @Override 542 public void kill(Context context, WorkflowAction action) throws ActionExecutorException { 543 } 544 545 @Override 546 public void start(Context context, WorkflowAction action) throws ActionExecutorException { 547 try { 548 context.setStartData("-", "-", "-"); 549 Element actionXml = XmlUtils.parseXml(action.getConf()); 550 doOperations(context, actionXml); 551 context.setExecutionData("OK", null); 552 } 553 catch (Exception ex) { 554 throw convertException(ex); 555 } 556 } 557 558 @Override 559 public void end(Context context, WorkflowAction action) throws ActionExecutorException { 560 String externalStatus = action.getExternalStatus(); 561 WorkflowAction.Status status = externalStatus.equals("OK") ? WorkflowAction.Status.OK : 562 WorkflowAction.Status.ERROR; 563 context.setEndData(status, getActionSignal(status)); 564 if (!context.getProtoActionConf().getBoolean("oozie.action.keep.action.dir", false)) { 565 try { 566 FileSystem fs = context.getAppFileSystem(); 567 fs.delete(context.getActionDir(), true); 568 } 569 catch (Exception ex) { 570 throw convertException(ex); 571 } 572 } 573 } 574 575 @Override 576 public boolean isCompleted(String externalStatus) { 577 return true; 578 } 579 580 /** 581 * @param context 582 * @return 583 * @throws HadoopAccessorException 584 * @throws IOException 585 * @throws URISyntaxException 586 */ 587 public Path getRecoveryPath(Context context) throws HadoopAccessorException, IOException, URISyntaxException { 588 return new Path(context.getActionDir(), "fs-" + context.getRecoveryId()); 589 } 590 591 private void checkGlobMax(Path[] pathArr) throws ActionExecutorException { 592 if(pathArr.length > maxGlobCount) { 593 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS013", 594 "too many globbed files/dirs to do FS operation"); 595 } 596 } 597 598}