001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.action.hadoop; 020 021import java.io.IOException; 022import java.net.URI; 023import java.net.URISyntaxException; 024import java.security.PrivilegedExceptionAction; 025import java.util.ArrayList; 026import java.util.HashMap; 027import java.util.List; 028import java.util.Map; 029 030import org.apache.hadoop.fs.FSDataOutputStream; 031import org.apache.hadoop.fs.FileStatus; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.FileUtil; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.fs.Trash; 036import org.apache.hadoop.fs.permission.FsPermission; 037import org.apache.hadoop.mapred.JobConf; 038import org.apache.hadoop.security.AccessControlException; 039import org.apache.hadoop.security.UserGroupInformation; 040import org.apache.oozie.action.ActionExecutor; 041import org.apache.oozie.action.ActionExecutorException; 042import org.apache.oozie.client.OozieClient; 043import org.apache.oozie.client.WorkflowAction; 044import org.apache.oozie.dependency.FSURIHandler; 045import org.apache.oozie.dependency.URIHandler; 046import org.apache.oozie.service.ConfigurationService; 047import org.apache.oozie.service.HadoopAccessorException; 048import org.apache.oozie.service.HadoopAccessorService; 049import org.apache.oozie.service.Services; 050import org.apache.oozie.service.UserGroupInformationService; 051import org.apache.oozie.service.URIHandlerService; 052import org.apache.oozie.util.XConfiguration; 053import org.apache.oozie.util.XmlUtils; 054import org.jdom.Element; 055 056/** 057 * File system action executor. <p> This executes the file system mkdir, move and delete commands 058 */ 059public class FsActionExecutor extends ActionExecutor { 060 061 public static final String ACTION_TYPE = "fs"; 062 063 private final int maxGlobCount; 064 065 public FsActionExecutor() { 066 super(ACTION_TYPE); 067 maxGlobCount = ConfigurationService.getInt(LauncherMapper.CONF_OOZIE_ACTION_FS_GLOB_MAX); 068 } 069 070 /** 071 * Initialize Action. 072 */ 073 @Override 074 public void initActionType() { 075 super.initActionType(); 076 registerError(AccessControlException.class.getName(), ActionExecutorException.ErrorType.ERROR, "FS014"); 077 } 078 079 Path getPath(Element element, String attribute) { 080 String str = element.getAttributeValue(attribute).trim(); 081 return new Path(str); 082 } 083 084 void validatePath(Path path, boolean withScheme) throws ActionExecutorException { 085 try { 086 String scheme = path.toUri().getScheme(); 087 if (withScheme) { 088 if (scheme == null) { 089 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS001", 090 "Missing scheme in path [{0}]", path); 091 } 092 else { 093 Services.get().get(HadoopAccessorService.class).checkSupportedFilesystem(path.toUri()); 094 } 095 } 096 else { 097 if (scheme != null) { 098 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS002", 099 "Scheme [{0}] not allowed in path [{1}]", scheme, path); 100 } 101 } 102 } 103 catch (HadoopAccessorException hex) { 104 throw convertException(hex); 105 } 106 } 107 108 Path resolveToFullPath(Path nameNode, Path path, boolean withScheme) throws ActionExecutorException { 109 Path fullPath; 110 111 // If no nameNode is given, validate the path as-is and return it as-is 112 if (nameNode == null) { 113 validatePath(path, withScheme); 114 fullPath = path; 115 } else { 116 // If the path doesn't have a scheme or authority, use the nameNode which should have already been verified earlier 117 String pathScheme = path.toUri().getScheme(); 118 String pathAuthority = path.toUri().getAuthority(); 119 if (pathScheme == null || pathAuthority == null) { 120 if (path.isAbsolute()) { 121 String nameNodeSchemeAuthority = nameNode.toUri().getScheme() + "://" + nameNode.toUri().getAuthority(); 122 fullPath = new Path(nameNodeSchemeAuthority + path.toString()); 123 } else { 124 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS011", 125 "Path [{0}] cannot be relative", path); 126 } 127 } else { 128 // If the path has a scheme and authority, but its not the nameNode then validate the path as-is and return it as-is 129 // If it is the nameNode, then it should have already been verified earlier so return it as-is 130 if (!nameNode.toUri().getScheme().equals(pathScheme) || !nameNode.toUri().getAuthority().equals(pathAuthority)) { 131 validatePath(path, withScheme); 132 } 133 fullPath = path; 134 } 135 } 136 return fullPath; 137 } 138 139 void validateSameNN(Path source, Path dest) throws ActionExecutorException { 140 Path destPath = new Path(source, dest); 141 String t = destPath.toUri().getScheme() + destPath.toUri().getAuthority(); 142 String s = source.toUri().getScheme() + source.toUri().getAuthority(); 143 144 //checking whether NN prefix of source and target is same. can modify this to adjust for a set of multiple whitelisted NN 145 if(!t.equals(s)) { 146 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS007", 147 "move, target NN URI different from that of source", dest); 148 } 149 } 150 151 @SuppressWarnings("unchecked") 152 void doOperations(Context context, Element element) throws ActionExecutorException { 153 try { 154 FileSystem fs = context.getAppFileSystem(); 155 boolean recovery = fs.exists(getRecoveryPath(context)); 156 if (!recovery) { 157 fs.mkdirs(getRecoveryPath(context)); 158 } 159 160 Path nameNodePath = null; 161 Element nameNodeElement = element.getChild("name-node", element.getNamespace()); 162 if (nameNodeElement != null) { 163 String nameNode = nameNodeElement.getTextTrim(); 164 if (nameNode != null) { 165 nameNodePath = new Path(nameNode); 166 // Verify the name node now 167 validatePath(nameNodePath, true); 168 } 169 } 170 171 XConfiguration fsConf = new XConfiguration(); 172 Path appPath = new Path(context.getWorkflow().getAppPath()); 173 // app path could be a file 174 if (fs.isFile(appPath)) { 175 appPath = appPath.getParent(); 176 } 177 JavaActionExecutor.parseJobXmlAndConfiguration(context, element, appPath, fsConf); 178 179 for (Element commandElement : (List<Element>) element.getChildren()) { 180 String command = commandElement.getName(); 181 if (command.equals("mkdir")) { 182 Path path = getPath(commandElement, "path"); 183 mkdir(context, fsConf, nameNodePath, path); 184 } 185 else { 186 if (command.equals("delete")) { 187 Path path = getPath(commandElement, "path"); 188 boolean skipTrash = true; 189 if (commandElement.getAttributeValue("skip-trash") != null && 190 commandElement.getAttributeValue("skip-trash").equals("false")) { 191 skipTrash = false; 192 } 193 delete(context, fsConf, nameNodePath, path, skipTrash); 194 } 195 else { 196 if (command.equals("move")) { 197 Path source = getPath(commandElement, "source"); 198 Path target = getPath(commandElement, "target"); 199 move(context, fsConf, nameNodePath, source, target, recovery); 200 } 201 else { 202 if (command.equals("chmod")) { 203 Path path = getPath(commandElement, "path"); 204 boolean recursive = commandElement.getChild("recursive", commandElement.getNamespace()) != null; 205 String str = commandElement.getAttributeValue("dir-files"); 206 boolean dirFiles = (str == null) || Boolean.parseBoolean(str); 207 String permissionsMask = commandElement.getAttributeValue("permissions").trim(); 208 chmod(context, fsConf, nameNodePath, path, permissionsMask, dirFiles, recursive); 209 } 210 else { 211 if (command.equals("touchz")) { 212 Path path = getPath(commandElement, "path"); 213 touchz(context, fsConf, nameNodePath, path); 214 } 215 else { 216 if (command.equals("chgrp")) { 217 Path path = getPath(commandElement, "path"); 218 boolean recursive = commandElement.getChild("recursive", 219 commandElement.getNamespace()) != null; 220 String group = commandElement.getAttributeValue("group"); 221 String str = commandElement.getAttributeValue("dir-files"); 222 boolean dirFiles = (str == null) || Boolean.parseBoolean(str); 223 chgrp(context, fsConf, nameNodePath, path, context.getWorkflow().getUser(), 224 group, dirFiles, recursive); 225 } 226 } 227 } 228 } 229 } 230 } 231 } 232 } 233 catch (Exception ex) { 234 throw convertException(ex); 235 } 236 } 237 238 void chgrp(Context context, XConfiguration fsConf, Path nameNodePath, Path path, String user, String group, 239 boolean dirFiles, boolean recursive) throws ActionExecutorException { 240 241 HashMap<String, String> argsMap = new HashMap<String, String>(); 242 argsMap.put("user", user); 243 argsMap.put("group", group); 244 try { 245 FileSystem fs = getFileSystemFor(path, context, fsConf); 246 path = resolveToFullPath(nameNodePath, path, true); 247 Path[] pathArr = FileUtil.stat2Paths(fs.globStatus(path)); 248 if (pathArr == null || pathArr.length == 0) { 249 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS009", "chgrp" 250 + ", path(s) that matches [{0}] does not exist", path); 251 } 252 checkGlobMax(pathArr); 253 for (Path p : pathArr) { 254 recursiveFsOperation("chgrp", fs, nameNodePath, p, argsMap, dirFiles, recursive, true); 255 } 256 } 257 catch (Exception ex) { 258 throw convertException(ex); 259 } 260 } 261 262 private void recursiveFsOperation(String op, FileSystem fs, Path nameNodePath, Path path, 263 Map<String, String> argsMap, boolean dirFiles, boolean recursive, boolean isRoot) 264 throws ActionExecutorException { 265 266 try { 267 FileStatus pathStatus = fs.getFileStatus(path); 268 List<Path> paths = new ArrayList<Path>(); 269 270 if (dirFiles && pathStatus.isDir()) { 271 if (isRoot) { 272 paths.add(path); 273 } 274 FileStatus[] filesStatus = fs.listStatus(path); 275 for (int i = 0; i < filesStatus.length; i++) { 276 Path p = filesStatus[i].getPath(); 277 paths.add(p); 278 if (recursive && filesStatus[i].isDir()) { 279 recursiveFsOperation(op, fs, null, p, argsMap, dirFiles, recursive, false); 280 } 281 } 282 } 283 else { 284 paths.add(path); 285 } 286 for (Path p : paths) { 287 doFsOperation(op, fs, p, argsMap); 288 } 289 } 290 catch (Exception ex) { 291 throw convertException(ex); 292 } 293 } 294 295 private void doFsOperation(String op, FileSystem fs, Path p, Map<String, String> argsMap) 296 throws ActionExecutorException, IOException { 297 if (op.equals("chmod")) { 298 String permissions = argsMap.get("permissions"); 299 FsPermission newFsPermission = createShortPermission(permissions, p); 300 fs.setPermission(p, newFsPermission); 301 } 302 else if (op.equals("chgrp")) { 303 String user = argsMap.get("user"); 304 String group = argsMap.get("group"); 305 fs.setOwner(p, user, group); 306 } 307 } 308 309 /** 310 * @param path 311 * @param context 312 * @param fsConf 313 * @return FileSystem 314 * @throws HadoopAccessorException 315 */ 316 private FileSystem getFileSystemFor(Path path, Context context, XConfiguration fsConf) throws HadoopAccessorException { 317 String user = context.getWorkflow().getUser(); 318 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 319 JobConf conf = has.createJobConf(path.toUri().getAuthority()); 320 XConfiguration.copy(context.getProtoActionConf(), conf); 321 if (fsConf != null) { 322 XConfiguration.copy(fsConf, conf); 323 } 324 return has.createFileSystem(user, path.toUri(), conf); 325 } 326 327 /** 328 * @param path 329 * @param user 330 * @return FileSystem 331 * @throws HadoopAccessorException 332 */ 333 private FileSystem getFileSystemFor(Path path, String user) throws HadoopAccessorException { 334 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 335 JobConf jobConf = has.createJobConf(path.toUri().getAuthority()); 336 return has.createFileSystem(user, path.toUri(), jobConf); 337 } 338 339 void mkdir(Context context, Path path) throws ActionExecutorException { 340 mkdir(context, null, null, path); 341 } 342 343 void mkdir(Context context, XConfiguration fsConf, Path nameNodePath, Path path) throws ActionExecutorException { 344 try { 345 path = resolveToFullPath(nameNodePath, path, true); 346 FileSystem fs = getFileSystemFor(path, context, fsConf); 347 348 if (!fs.exists(path)) { 349 if (!fs.mkdirs(path)) { 350 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS004", 351 "mkdir, path [{0}] could not create directory", path); 352 } 353 } 354 } 355 catch (Exception ex) { 356 throw convertException(ex); 357 } 358 } 359 360 /** 361 * Delete path 362 * 363 * @param context 364 * @param path 365 * @throws ActionExecutorException 366 */ 367 public void delete(Context context, Path path) throws ActionExecutorException { 368 delete(context, null, null, path, true); 369 } 370 371 /** 372 * Delete path 373 * 374 * @param context 375 * @param fsConf 376 * @param nameNodePath 377 * @param path 378 * @throws ActionExecutorException 379 */ 380 public void delete(Context context, XConfiguration fsConf, Path nameNodePath, Path path, boolean skipTrash) 381 throws ActionExecutorException { 382 URI uri = path.toUri(); 383 URIHandler handler; 384 org.apache.oozie.dependency.URIHandler.Context hcatContext = null; 385 try { 386 handler = Services.get().get(URIHandlerService.class).getURIHandler(uri); 387 if (handler instanceof FSURIHandler) { 388 // Use legacy code to handle hdfs partition deletion 389 path = resolveToFullPath(nameNodePath, path, true); 390 final FileSystem fs = getFileSystemFor(path, context, fsConf); 391 Path[] pathArr = FileUtil.stat2Paths(fs.globStatus(path)); 392 if (pathArr != null && pathArr.length > 0) { 393 checkGlobMax(pathArr); 394 for (final Path p : pathArr) { 395 if (fs.exists(p)) { 396 if (!skipTrash) { 397 // Moving directory/file to trash of user. 398 UserGroupInformationService ugiService = Services.get().get(UserGroupInformationService.class); 399 UserGroupInformation ugi = ugiService.getProxyUser(fs.getConf().get(OozieClient.USER_NAME)); 400 ugi.doAs(new PrivilegedExceptionAction<FileSystem>() { 401 @Override 402 public FileSystem run() throws Exception { 403 Trash trash = new Trash(fs.getConf()); 404 if (!trash.moveToTrash(p)) { 405 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS005", 406 "Could not move path [{0}] to trash on delete", p); 407 } 408 return null; 409 } 410 }); 411 } 412 else if (!fs.delete(p, true)) { 413 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS005", 414 "delete, path [{0}] could not delete path", p); 415 } 416 } 417 } 418 } 419 } else { 420 hcatContext = handler.getContext(uri, fsConf, context.getWorkflow().getUser(), false); 421 handler.delete(uri, hcatContext); 422 } 423 } 424 catch (Exception ex) { 425 throw convertException(ex); 426 } 427 finally{ 428 if (hcatContext != null) { 429 hcatContext.destroy(); 430 } 431 } 432 } 433 434 /** 435 * Delete path 436 * 437 * @param user 438 * @param group 439 * @param path 440 * @throws ActionExecutorException 441 */ 442 public void delete(String user, String group, Path path) throws ActionExecutorException { 443 try { 444 validatePath(path, true); 445 FileSystem fs = getFileSystemFor(path, user); 446 447 if (fs.exists(path)) { 448 if (!fs.delete(path, true)) { 449 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS005", 450 "delete, path [{0}] could not delete path", path); 451 } 452 } 453 } 454 catch (Exception ex) { 455 throw convertException(ex); 456 } 457 } 458 459 /** 460 * Move source to target 461 * 462 * @param context 463 * @param source 464 * @param target 465 * @param recovery 466 * @throws ActionExecutorException 467 */ 468 public void move(Context context, Path source, Path target, boolean recovery) throws ActionExecutorException { 469 move(context, null, null, source, target, recovery); 470 } 471 472 /** 473 * Move source to target 474 * 475 * @param context 476 * @param fsConf 477 * @param nameNodePath 478 * @param source 479 * @param target 480 * @param recovery 481 * @throws ActionExecutorException 482 */ 483 public void move(Context context, XConfiguration fsConf, Path nameNodePath, Path source, Path target, boolean recovery) 484 throws ActionExecutorException { 485 try { 486 source = resolveToFullPath(nameNodePath, source, true); 487 validateSameNN(source, target); 488 FileSystem fs = getFileSystemFor(source, context, fsConf); 489 Path[] pathArr = FileUtil.stat2Paths(fs.globStatus(source)); 490 if (( pathArr == null || pathArr.length == 0 ) ){ 491 if (!recovery) { 492 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS006", 493 "move, source path [{0}] does not exist", source); 494 } else { 495 return; 496 } 497 } 498 if (pathArr.length > 1 && (!fs.exists(target) || fs.isFile(target))) { 499 if(!recovery) { 500 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS012", 501 "move, could not rename multiple sources to the same target name"); 502 } else { 503 return; 504 } 505 } 506 checkGlobMax(pathArr); 507 for (Path p : pathArr) { 508 if (!fs.rename(p, target) && !recovery) { 509 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS008", 510 "move, could not move [{0}] to [{1}]", p, target); 511 } 512 } 513 } 514 catch (Exception ex) { 515 throw convertException(ex); 516 } 517 } 518 519 void chmod(Context context, Path path, String permissions, boolean dirFiles, boolean recursive) throws ActionExecutorException { 520 chmod(context, null, null, path, permissions, dirFiles, recursive); 521 } 522 523 void chmod(Context context, XConfiguration fsConf, Path nameNodePath, Path path, String permissions, 524 boolean dirFiles, boolean recursive) throws ActionExecutorException { 525 526 HashMap<String, String> argsMap = new HashMap<String, String>(); 527 argsMap.put("permissions", permissions); 528 try { 529 FileSystem fs = getFileSystemFor(path, context, fsConf); 530 path = resolveToFullPath(nameNodePath, path, true); 531 Path[] pathArr = FileUtil.stat2Paths(fs.globStatus(path)); 532 if (pathArr == null || pathArr.length == 0) { 533 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS009", "chmod" 534 + ", path(s) that matches [{0}] does not exist", path); 535 } 536 checkGlobMax(pathArr); 537 for (Path p : pathArr) { 538 recursiveFsOperation("chmod", fs, nameNodePath, p, argsMap, dirFiles, recursive, true); 539 } 540 541 } 542 catch (Exception ex) { 543 throw convertException(ex); 544 } 545 } 546 547 void touchz(Context context, Path path) throws ActionExecutorException { 548 touchz(context, null, null, path); 549 } 550 551 void touchz(Context context, XConfiguration fsConf, Path nameNodePath, Path path) throws ActionExecutorException { 552 try { 553 path = resolveToFullPath(nameNodePath, path, true); 554 FileSystem fs = getFileSystemFor(path, context, fsConf); 555 556 FileStatus st; 557 if (fs.exists(path)) { 558 st = fs.getFileStatus(path); 559 if (st.isDir()) { 560 throw new Exception(path.toString() + " is a directory"); 561 } else if (st.getLen() != 0) { 562 throw new Exception(path.toString() + " must be a zero-length file"); 563 } 564 } 565 FSDataOutputStream out = fs.create(path); 566 out.close(); 567 } 568 catch (Exception ex) { 569 throw convertException(ex); 570 } 571 } 572 573 FsPermission createShortPermission(String permissions, Path path) throws ActionExecutorException { 574 if (permissions.length() == 3) { 575 char user = permissions.charAt(0); 576 char group = permissions.charAt(1); 577 char other = permissions.charAt(2); 578 int useri = user - '0'; 579 int groupi = group - '0'; 580 int otheri = other - '0'; 581 int mask = useri * 100 + groupi * 10 + otheri; 582 short omask = Short.parseShort(Integer.toString(mask), 8); 583 return new FsPermission(omask); 584 } 585 else { 586 if (permissions.length() == 10) { 587 return FsPermission.valueOf(permissions); 588 } 589 else { 590 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS010", 591 "chmod, path [{0}] invalid permissions mask [{1}]", path, permissions); 592 } 593 } 594 } 595 596 @Override 597 public void check(Context context, WorkflowAction action) throws ActionExecutorException { 598 } 599 600 @Override 601 public void kill(Context context, WorkflowAction action) throws ActionExecutorException { 602 } 603 604 @Override 605 public void start(Context context, WorkflowAction action) throws ActionExecutorException { 606 try { 607 context.setStartData("-", "-", "-"); 608 Element actionXml = XmlUtils.parseXml(action.getConf()); 609 doOperations(context, actionXml); 610 context.setExecutionData("OK", null); 611 } 612 catch (Exception ex) { 613 throw convertException(ex); 614 } 615 } 616 617 @Override 618 public void end(Context context, WorkflowAction action) throws ActionExecutorException { 619 String externalStatus = action.getExternalStatus(); 620 WorkflowAction.Status status = externalStatus.equals("OK") ? WorkflowAction.Status.OK : 621 WorkflowAction.Status.ERROR; 622 context.setEndData(status, getActionSignal(status)); 623 if (!context.getProtoActionConf().getBoolean("oozie.action.keep.action.dir", false)) { 624 try { 625 FileSystem fs = context.getAppFileSystem(); 626 fs.delete(context.getActionDir(), true); 627 } 628 catch (Exception ex) { 629 throw convertException(ex); 630 } 631 } 632 } 633 634 @Override 635 public boolean isCompleted(String externalStatus) { 636 return true; 637 } 638 639 /** 640 * @param context 641 * @return 642 * @throws HadoopAccessorException 643 * @throws IOException 644 * @throws URISyntaxException 645 */ 646 public Path getRecoveryPath(Context context) throws HadoopAccessorException, IOException, URISyntaxException { 647 return new Path(context.getActionDir(), "fs-" + context.getRecoveryId()); 648 } 649 650 private void checkGlobMax(Path[] pathArr) throws ActionExecutorException { 651 if(pathArr.length > maxGlobCount) { 652 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS013", 653 "too many globbed files/dirs to do FS operation"); 654 } 655 } 656 657 public boolean supportsConfigurationJobXML() { 658 return true; 659 } 660}