001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.action.hadoop; 019 020 import java.io.IOException; 021 import java.io.StringReader; 022 import java.net.URISyntaxException; 023 import java.util.ArrayList; 024 import java.util.HashMap; 025 import java.util.Iterator; 026 import java.util.List; 027 import java.util.Map; 028 029 import org.apache.hadoop.fs.FSDataOutputStream; 030 import org.apache.hadoop.fs.FileStatus; 031 import org.apache.hadoop.fs.FileSystem; 032 import org.apache.hadoop.fs.Path; 033 import org.apache.hadoop.fs.permission.FsPermission; 034 import org.apache.hadoop.mapred.JobConf; 035 import org.apache.oozie.action.ActionExecutor; 036 import org.apache.oozie.action.ActionExecutorException; 037 import org.apache.oozie.client.WorkflowAction; 038 import org.apache.oozie.service.HadoopAccessorException; 039 import org.apache.oozie.service.HadoopAccessorService; 040 import org.apache.oozie.service.Services; 041 import org.apache.oozie.util.XConfiguration; 042 import org.apache.oozie.util.XmlUtils; 043 import org.jdom.Element; 044 045 /** 046 * File system action executor. <p/> This executes the file system mkdir, move and delete commands 047 */ 048 public class FsActionExecutor extends ActionExecutor { 049 050 public FsActionExecutor() { 051 super("fs"); 052 } 053 054 Path getPath(Element element, String attribute) { 055 String str = element.getAttributeValue(attribute).trim(); 056 return new Path(str); 057 } 058 059 void validatePath(Path path, boolean withScheme) throws ActionExecutorException { 060 try { 061 String scheme = path.toUri().getScheme(); 062 if (withScheme) { 063 if (scheme == null) { 064 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS001", 065 "Missing scheme in path [{0}]", path); 066 } 067 else { 068 Services.get().get(HadoopAccessorService.class).checkSupportedFilesystem(path.toUri()); 069 } 070 } 071 else { 072 if (scheme != null) { 073 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS002", 074 "Scheme [{0}] not allowed in path [{1}]", scheme, path); 075 } 076 } 077 } 078 catch (HadoopAccessorException hex) { 079 throw convertException(hex); 080 } 081 } 082 083 Path resolveToFullPath(Path nameNode, Path path, boolean withScheme) throws ActionExecutorException { 084 Path fullPath; 085 086 // If no nameNode is given, validate the path as-is and return it as-is 087 if (nameNode == null) { 088 validatePath(path, withScheme); 089 fullPath = path; 090 } else { 091 // If the path doesn't have a scheme or authority, use the nameNode which should have already been verified earlier 092 String pathScheme = path.toUri().getScheme(); 093 String pathAuthority = path.toUri().getAuthority(); 094 if (pathScheme == null || pathAuthority == null) { 095 if (path.isAbsolute()) { 096 String nameNodeSchemeAuthority = nameNode.toUri().getScheme() + "://" + nameNode.toUri().getAuthority(); 097 fullPath = new Path(nameNodeSchemeAuthority + path.toString()); 098 } else { 099 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS011", 100 "Path [{0}] cannot be relative", path); 101 } 102 } else { 103 // If the path has a scheme and authority, but its not the nameNode then validate the path as-is and return it as-is 104 // If it is the nameNode, then it should have already been verified earlier so return it as-is 105 if (!nameNode.toUri().getScheme().equals(pathScheme) || !nameNode.toUri().getAuthority().equals(pathAuthority)) { 106 validatePath(path, withScheme); 107 } 108 fullPath = path; 109 } 110 } 111 return fullPath; 112 } 113 114 void validateSameNN(Path source, Path dest) throws ActionExecutorException { 115 Path destPath = new Path(source, dest); 116 String t = destPath.toUri().getScheme() + destPath.toUri().getAuthority(); 117 String s = source.toUri().getScheme() + source.toUri().getAuthority(); 118 119 //checking whether NN prefix of source and target is same. can modify this to adjust for a set of multiple whitelisted NN 120 if(!t.equals(s)) { 121 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS007", 122 "move, target NN URI different from that of source", dest); 123 } 124 } 125 126 @SuppressWarnings("unchecked") 127 void doOperations(Context context, Element element) throws ActionExecutorException { 128 try { 129 FileSystem fs = context.getAppFileSystem(); 130 boolean recovery = fs.exists(getRecoveryPath(context)); 131 if (!recovery) { 132 fs.mkdirs(getRecoveryPath(context)); 133 } 134 135 Path nameNodePath = null; 136 Element nameNodeElement = element.getChild("name-node", element.getNamespace()); 137 if (nameNodeElement != null) { 138 String nameNode = nameNodeElement.getTextTrim(); 139 if (nameNode != null) { 140 nameNodePath = new Path(nameNode); 141 // Verify the name node now 142 validatePath(nameNodePath, true); 143 } 144 } 145 146 XConfiguration fsConf = new XConfiguration(); 147 Path appPath = new Path(context.getWorkflow().getAppPath()); 148 // app path could be a file 149 if (fs.isFile(appPath)) { 150 appPath = appPath.getParent(); 151 } 152 JavaActionExecutor.parseJobXmlAndConfiguration(context, element, appPath, fsConf); 153 154 for (Element commandElement : (List<Element>) element.getChildren()) { 155 String command = commandElement.getName(); 156 if (command.equals("mkdir")) { 157 Path path = getPath(commandElement, "path"); 158 mkdir(context, fsConf, nameNodePath, path); 159 } 160 else { 161 if (command.equals("delete")) { 162 Path path = getPath(commandElement, "path"); 163 delete(context, fsConf, nameNodePath, path); 164 } 165 else { 166 if (command.equals("move")) { 167 Path source = getPath(commandElement, "source"); 168 Path target = getPath(commandElement, "target"); 169 move(context, fsConf, nameNodePath, source, target, recovery); 170 } 171 else { 172 if (command.equals("chmod")) { 173 Path path = getPath(commandElement, "path"); 174 boolean recursive = commandElement.getChild("recursive", commandElement.getNamespace()) != null; 175 String str = commandElement.getAttributeValue("dir-files"); 176 boolean dirFiles = (str == null) || Boolean.parseBoolean(str); 177 String permissionsMask = commandElement.getAttributeValue("permissions").trim(); 178 chmod(context, fsConf, nameNodePath, path, permissionsMask, dirFiles, recursive); 179 } 180 else { 181 if (command.equals("touchz")) { 182 Path path = getPath(commandElement, "path"); 183 touchz(context, fsConf, nameNodePath, path); 184 } 185 else { 186 if (command.equals("chgrp")) { 187 Path path = getPath(commandElement, "path"); 188 boolean recursive = commandElement.getChild("recursive", 189 commandElement.getNamespace()) != null; 190 String group = commandElement.getAttributeValue("group"); 191 String str = commandElement.getAttributeValue("dir-files"); 192 boolean dirFiles = (str == null) || Boolean.parseBoolean(str); 193 chgrp(context, fsConf, nameNodePath, path, context.getWorkflow().getUser(), 194 group, dirFiles, recursive); 195 } 196 } 197 } 198 } 199 } 200 } 201 } 202 } 203 catch (Exception ex) { 204 throw convertException(ex); 205 } 206 } 207 208 void chgrp(Context context, XConfiguration fsConf, Path nameNodePath, Path path, String user, String group, 209 boolean dirFiles, boolean recursive) throws ActionExecutorException { 210 211 HashMap<String, String> argsMap = new HashMap<String, String>(); 212 argsMap.put("user", user); 213 argsMap.put("group", group); 214 try { 215 FileSystem fs = getFileSystemFor(path, context, fsConf); 216 recursiveFsOperation("chgrp", fs, nameNodePath, path, argsMap, dirFiles, recursive, true); 217 } 218 catch (Exception ex) { 219 throw convertException(ex); 220 } 221 } 222 223 private void recursiveFsOperation(String op, FileSystem fs, Path nameNodePath, Path path, 224 Map<String, String> argsMap, boolean dirFiles, boolean recursive, boolean isRoot) 225 throws ActionExecutorException { 226 227 try { 228 path = resolveToFullPath(nameNodePath, path, true); 229 if (!fs.exists(path)) { 230 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS009", op 231 + ", path [{0}] does not exist", path); 232 } 233 FileStatus pathStatus = fs.getFileStatus(path); 234 List<Path> paths = new ArrayList<Path>(); 235 236 if (dirFiles && pathStatus.isDir()) { 237 if (isRoot) { 238 paths.add(path); 239 } 240 FileStatus[] filesStatus = fs.listStatus(path); 241 for (int i = 0; i < filesStatus.length; i++) { 242 Path p = filesStatus[i].getPath(); 243 paths.add(p); 244 if (recursive && filesStatus[i].isDir()) { 245 recursiveFsOperation(op, fs, null, p, argsMap, dirFiles, recursive, false); 246 } 247 } 248 } 249 else { 250 paths.add(path); 251 } 252 for (Path p : paths) { 253 doFsOperation(op, fs, p, argsMap); 254 } 255 } 256 catch (Exception ex) { 257 throw convertException(ex); 258 } 259 } 260 261 private void doFsOperation(String op, FileSystem fs, Path p, Map<String, String> argsMap) 262 throws ActionExecutorException, IOException { 263 if (op.equals("chmod")) { 264 String permissions = argsMap.get("permissions"); 265 FsPermission newFsPermission = createShortPermission(permissions, p); 266 fs.setPermission(p, newFsPermission); 267 } 268 else if (op.equals("chgrp")) { 269 String user = argsMap.get("user"); 270 String group = argsMap.get("group"); 271 fs.setOwner(p, user, group); 272 } 273 } 274 275 /** 276 * @param path 277 * @param context 278 * @param fsConf 279 * @return FileSystem 280 * @throws HadoopAccessorException 281 */ 282 private FileSystem getFileSystemFor(Path path, Context context, XConfiguration fsConf) throws HadoopAccessorException { 283 String user = context.getWorkflow().getUser(); 284 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 285 JobConf conf = has.createJobConf(path.toUri().getAuthority()); 286 XConfiguration.copy(context.getProtoActionConf(), conf); 287 if (fsConf != null) { 288 XConfiguration.copy(fsConf, conf); 289 } 290 return has.createFileSystem(user, path.toUri(), conf); 291 } 292 293 /** 294 * @param path 295 * @param user 296 * @param group 297 * @return FileSystem 298 * @throws HadoopAccessorException 299 */ 300 private FileSystem getFileSystemFor(Path path, String user) throws HadoopAccessorException { 301 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 302 JobConf jobConf = has.createJobConf(path.toUri().getAuthority()); 303 return has.createFileSystem(user, path.toUri(), jobConf); 304 } 305 306 void mkdir(Context context, Path path) throws ActionExecutorException { 307 mkdir(context, null, null, path); 308 } 309 310 void mkdir(Context context, XConfiguration fsConf, Path nameNodePath, Path path) throws ActionExecutorException { 311 try { 312 path = resolveToFullPath(nameNodePath, path, true); 313 FileSystem fs = getFileSystemFor(path, context, fsConf); 314 315 if (!fs.exists(path)) { 316 if (!fs.mkdirs(path)) { 317 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS004", 318 "mkdir, path [{0}] could not create directory", path); 319 } 320 } 321 } 322 catch (Exception ex) { 323 throw convertException(ex); 324 } 325 } 326 327 /** 328 * Delete path 329 * 330 * @param context 331 * @param path 332 * @throws ActionExecutorException 333 */ 334 public void delete(Context context, Path path) throws ActionExecutorException { 335 delete(context, null, null, path); 336 } 337 338 /** 339 * Delete path 340 * 341 * @param context 342 * @param fsConf 343 * @param nameNodePath 344 * @param path 345 * @throws ActionExecutorException 346 */ 347 public void delete(Context context, XConfiguration fsConf, Path nameNodePath, Path path) throws ActionExecutorException { 348 try { 349 path = resolveToFullPath(nameNodePath, path, true); 350 FileSystem fs = getFileSystemFor(path, context, fsConf); 351 352 if (fs.exists(path)) { 353 if (!fs.delete(path, true)) { 354 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS005", 355 "delete, path [{0}] could not delete path", path); 356 } 357 } 358 } 359 catch (Exception ex) { 360 throw convertException(ex); 361 } 362 } 363 364 /** 365 * Delete path 366 * 367 * @param user 368 * @param group 369 * @param path 370 * @throws ActionExecutorException 371 */ 372 public void delete(String user, String group, Path path) throws ActionExecutorException { 373 try { 374 validatePath(path, true); 375 FileSystem fs = getFileSystemFor(path, user); 376 377 if (fs.exists(path)) { 378 if (!fs.delete(path, true)) { 379 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS005", 380 "delete, path [{0}] could not delete path", path); 381 } 382 } 383 } 384 catch (Exception ex) { 385 throw convertException(ex); 386 } 387 } 388 389 /** 390 * Move source to target 391 * 392 * @param context 393 * @param source 394 * @param target 395 * @param recovery 396 * @throws ActionExecutorException 397 */ 398 public void move(Context context, Path source, Path target, boolean recovery) throws ActionExecutorException { 399 move(context, null, null, source, target, recovery); 400 } 401 402 /** 403 * Move source to target 404 * 405 * @param context 406 * @param fsConf 407 * @param nameNodePath 408 * @param source 409 * @param target 410 * @param recovery 411 * @throws ActionExecutorException 412 */ 413 public void move(Context context, XConfiguration fsConf, Path nameNodePath, Path source, Path target, boolean recovery) 414 throws ActionExecutorException { 415 try { 416 source = resolveToFullPath(nameNodePath, source, true); 417 validateSameNN(source, target); 418 FileSystem fs = getFileSystemFor(source, context, fsConf); 419 420 if (!fs.exists(source) && !recovery) { 421 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS006", 422 "move, source path [{0}] does not exist", source); 423 } 424 425 if (!fs.rename(source, target) && !recovery) { 426 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS008", 427 "move, could not move [{0}] to [{1}]", source, target); 428 } 429 } 430 catch (Exception ex) { 431 throw convertException(ex); 432 } 433 } 434 435 void chmod(Context context, Path path, String permissions, boolean dirFiles, boolean recursive) throws ActionExecutorException { 436 chmod(context, null, null, path, permissions, dirFiles, recursive); 437 } 438 439 void chmod(Context context, XConfiguration fsConf, Path nameNodePath, Path path, String permissions, 440 boolean dirFiles, boolean recursive) throws ActionExecutorException { 441 442 HashMap<String, String> argsMap = new HashMap<String, String>(); 443 argsMap.put("permissions", permissions); 444 try { 445 FileSystem fs = getFileSystemFor(path, context, fsConf); 446 recursiveFsOperation("chmod", fs, nameNodePath, path, argsMap, dirFiles, recursive, true); 447 } 448 catch (Exception ex) { 449 throw convertException(ex); 450 } 451 } 452 453 void touchz(Context context, Path path) throws ActionExecutorException { 454 touchz(context, null, null, path); 455 } 456 457 void touchz(Context context, XConfiguration fsConf, Path nameNodePath, Path path) throws ActionExecutorException { 458 try { 459 path = resolveToFullPath(nameNodePath, path, true); 460 FileSystem fs = getFileSystemFor(path, context, fsConf); 461 462 FileStatus st; 463 if (fs.exists(path)) { 464 st = fs.getFileStatus(path); 465 if (st.isDir()) { 466 throw new Exception(path.toString() + " is a directory"); 467 } else if (st.getLen() != 0) 468 throw new Exception(path.toString() + " must be a zero-length file"); 469 } 470 FSDataOutputStream out = fs.create(path); 471 out.close(); 472 } 473 catch (Exception ex) { 474 throw convertException(ex); 475 } 476 } 477 478 FsPermission createShortPermission(String permissions, Path path) throws ActionExecutorException { 479 if (permissions.length() == 3) { 480 char user = permissions.charAt(0); 481 char group = permissions.charAt(1); 482 char other = permissions.charAt(2); 483 int useri = user - '0'; 484 int groupi = group - '0'; 485 int otheri = other - '0'; 486 int mask = useri * 100 + groupi * 10 + otheri; 487 short omask = Short.parseShort(Integer.toString(mask), 8); 488 return new FsPermission(omask); 489 } 490 else { 491 if (permissions.length() == 10) { 492 return FsPermission.valueOf(permissions); 493 } 494 else { 495 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS010", 496 "chmod, path [{0}] invalid permissions mask [{1}]", path, permissions); 497 } 498 } 499 } 500 501 @Override 502 public void check(Context context, WorkflowAction action) throws ActionExecutorException { 503 } 504 505 @Override 506 public void kill(Context context, WorkflowAction action) throws ActionExecutorException { 507 } 508 509 @Override 510 public void start(Context context, WorkflowAction action) throws ActionExecutorException { 511 try { 512 context.setStartData("-", "-", "-"); 513 Element actionXml = XmlUtils.parseXml(action.getConf()); 514 doOperations(context, actionXml); 515 context.setExecutionData("OK", null); 516 } 517 catch (Exception ex) { 518 throw convertException(ex); 519 } 520 } 521 522 @Override 523 public void end(Context context, WorkflowAction action) throws ActionExecutorException { 524 String externalStatus = action.getExternalStatus(); 525 WorkflowAction.Status status = externalStatus.equals("OK") ? WorkflowAction.Status.OK : 526 WorkflowAction.Status.ERROR; 527 context.setEndData(status, getActionSignal(status)); 528 if (!context.getProtoActionConf().getBoolean("oozie.action.keep.action.dir", false)) { 529 try { 530 FileSystem fs = context.getAppFileSystem(); 531 fs.delete(context.getActionDir(), true); 532 } 533 catch (Exception ex) { 534 throw convertException(ex); 535 } 536 } 537 } 538 539 @Override 540 public boolean isCompleted(String externalStatus) { 541 return true; 542 } 543 544 /** 545 * @param context 546 * @return 547 * @throws HadoopAccessorException 548 * @throws IOException 549 * @throws URISyntaxException 550 */ 551 public Path getRecoveryPath(Context context) throws HadoopAccessorException, IOException, URISyntaxException { 552 return new Path(context.getActionDir(), "fs-" + context.getRecoveryId()); 553 } 554 555 }