001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.action.hadoop; 019 020 import java.io.IOException; 021 import java.net.URISyntaxException; 022 import java.util.List; 023 024 import org.apache.hadoop.conf.Configuration; 025 import org.apache.hadoop.fs.FileStatus; 026 import org.apache.hadoop.fs.FileSystem; 027 import org.apache.hadoop.fs.Path; 028 import org.apache.hadoop.fs.permission.FsPermission; 029 import org.apache.oozie.action.ActionExecutor; 030 import org.apache.oozie.action.ActionExecutorException; 031 import org.apache.oozie.client.WorkflowAction; 032 import org.apache.oozie.service.HadoopAccessorException; 033 import org.apache.oozie.service.HadoopAccessorService; 034 import org.apache.oozie.service.Services; 035 import org.apache.oozie.util.XmlUtils; 036 import org.jdom.Element; 037 038 /** 039 * File system action executor. <p/> This executes the file system mkdir, move and delete commands 040 */ 041 public class FsActionExecutor extends ActionExecutor { 042 043 public FsActionExecutor() { 044 super("fs"); 045 } 046 047 Path getPath(Element element, String attribute) { 048 String str = element.getAttributeValue(attribute).trim(); 049 return new Path(str); 050 } 051 052 void validatePath(Path path, boolean withScheme) throws ActionExecutorException { 053 String scheme = path.toUri().getScheme(); 054 if (withScheme) { 055 if (scheme == null) { 056 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS001", 057 "Missing scheme in path [{0}]", path); 058 } 059 else { 060 if (!scheme.equals("hdfs")) { 061 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS002", 062 "Scheme [{0}] not supported in path [{1}]", scheme, path); 063 } 064 } 065 } 066 else { 067 if (scheme != null) { 068 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS003", 069 "Scheme [{0}] not allowed in path [{1}]", scheme, path); 070 } 071 } 072 } 073 074 void validateSameNN(Path source, Path dest) throws ActionExecutorException { 075 Path destPath = new Path(source, dest); 076 String t = destPath.toUri().getScheme() + destPath.toUri().getAuthority(); 077 String s = source.toUri().getScheme() + source.toUri().getAuthority(); 078 079 //checking whether NN prefix of source and target is same. can modify this to adjust for a set of multiple whitelisted NN 080 if(!t.equals(s)) { 081 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS007", 082 "move, target NN URI different from that of source", dest); 083 } 084 } 085 086 @SuppressWarnings("unchecked") 087 void doOperations(Context context, Element element) throws ActionExecutorException { 088 try { 089 FileSystem fs = context.getAppFileSystem(); 090 boolean recovery = fs.exists(getRecoveryPath(context)); 091 if (!recovery) { 092 fs.mkdirs(getRecoveryPath(context)); 093 } 094 for (Element commandElement : (List<Element>) element.getChildren()) { 095 String command = commandElement.getName(); 096 if (command.equals("mkdir")) { 097 Path path = getPath(commandElement, "path"); 098 mkdir(context, path); 099 } 100 else { 101 if (command.equals("delete")) { 102 Path path = getPath(commandElement, "path"); 103 delete(context, path); 104 } 105 else { 106 if (command.equals("move")) { 107 Path source = getPath(commandElement, "source"); 108 Path target = getPath(commandElement, "target"); 109 move(context, source, target, recovery); 110 } 111 else { 112 if (command.equals("chmod")) { 113 Path path = getPath(commandElement, "path"); 114 String str = commandElement.getAttributeValue("dir-files"); 115 boolean dirFiles = (str == null) || Boolean.parseBoolean(str); 116 String permissionsMask = commandElement.getAttributeValue("permissions").trim(); 117 chmod(context, path, permissionsMask, dirFiles); 118 } 119 } 120 } 121 } 122 } 123 } 124 catch (Exception ex) { 125 throw convertException(ex); 126 } 127 } 128 129 /** 130 * @param path 131 * @param context 132 * @return FileSystem 133 * @throws HadoopAccessorException 134 */ 135 private FileSystem getFileSystemFor(Path path, Context context) throws HadoopAccessorException { 136 String user = context.getWorkflow().getUser(); 137 String group = context.getWorkflow().getGroup(); 138 return Services.get().get(HadoopAccessorService.class).createFileSystem(user, group, path.toUri(), 139 new Configuration()); 140 } 141 142 /** 143 * @param path 144 * @param user 145 * @param group 146 * @return FileSystem 147 * @throws HadoopAccessorException 148 */ 149 private FileSystem getFileSystemFor(Path path, String user, String group) throws HadoopAccessorException { 150 return Services.get().get(HadoopAccessorService.class).createFileSystem(user, group, path.toUri(), 151 new Configuration()); 152 } 153 154 void mkdir(Context context, Path path) throws ActionExecutorException { 155 try { 156 validatePath(path, true); 157 FileSystem fs = getFileSystemFor(path, context); 158 159 if (!fs.exists(path)) { 160 if (!fs.mkdirs(path)) { 161 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS004", 162 "mkdir, path [{0}] could not create directory", path); 163 } 164 } 165 } 166 catch (Exception ex) { 167 throw convertException(ex); 168 } 169 } 170 171 /** 172 * Delete path 173 * 174 * @param context 175 * @param path 176 * @throws ActionExecutorException 177 */ 178 public void delete(Context context, Path path) throws ActionExecutorException { 179 try { 180 validatePath(path, true); 181 FileSystem fs = getFileSystemFor(path, context); 182 183 if (fs.exists(path)) { 184 if (!fs.delete(path, true)) { 185 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS005", 186 "delete, path [{0}] could not delete path", path); 187 } 188 } 189 } 190 catch (Exception ex) { 191 throw convertException(ex); 192 } 193 } 194 195 /** 196 * Delete path 197 * 198 * @param user 199 * @param group 200 * @param path 201 * @throws ActionExecutorException 202 */ 203 public void delete(String user, String group, Path path) throws ActionExecutorException { 204 try { 205 validatePath(path, true); 206 FileSystem fs = getFileSystemFor(path, user, group); 207 208 if (fs.exists(path)) { 209 if (!fs.delete(path, true)) { 210 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS005", 211 "delete, path [{0}] could not delete path", path); 212 } 213 } 214 } 215 catch (Exception ex) { 216 throw convertException(ex); 217 } 218 } 219 220 /** 221 * Move source to target 222 * 223 * @param context 224 * @param source 225 * @param target 226 * @param recovery 227 * @throws ActionExecutorException 228 */ 229 public void move(Context context, Path source, Path target, boolean recovery) throws ActionExecutorException { 230 try { 231 validatePath(source, true); 232 validateSameNN(source, target); 233 FileSystem fs = getFileSystemFor(source, context); 234 235 if (!fs.exists(source) && !recovery) { 236 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS006", 237 "move, source path [{0}] does not exist", source); 238 } 239 240 if (!fs.rename(source, target) && !recovery) { 241 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS008", 242 "move, could not move [{0}] to [{1}]", source, target); 243 } 244 } 245 catch (Exception ex) { 246 throw convertException(ex); 247 } 248 } 249 250 void chmod(Context context, Path path, String permissions, boolean dirFiles) throws ActionExecutorException { 251 try { 252 validatePath(path, true); 253 FileSystem fs = getFileSystemFor(path, context); 254 255 if (!fs.exists(path)) { 256 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS009", 257 "chmod, path [{0}] does not exist", path); 258 } 259 260 FileStatus pathStatus = fs.getFileStatus(path); 261 262 Path[] paths; 263 if (dirFiles && pathStatus.isDir()) { 264 FileStatus[] filesStatus = fs.listStatus(path); 265 paths = new Path[filesStatus.length]; 266 for (int i = 0; i < filesStatus.length; i++) { 267 paths[i] = filesStatus[i].getPath(); 268 } 269 } 270 else { 271 paths = new Path[]{path}; 272 } 273 274 FsPermission newFsPermission = createShortPermission(permissions, path); 275 fs.setPermission(path, newFsPermission); 276 for (Path p : paths) { 277 fs.setPermission(p, newFsPermission); 278 } 279 } 280 catch (Exception ex) { 281 throw convertException(ex); 282 } 283 } 284 285 FsPermission createShortPermission(String permissions, Path path) throws ActionExecutorException { 286 if (permissions.length() == 3) { 287 char user = permissions.charAt(0); 288 char group = permissions.charAt(1); 289 char other = permissions.charAt(2); 290 int useri = user - '0'; 291 int groupi = group - '0'; 292 int otheri = other - '0'; 293 int mask = useri * 100 + groupi * 10 + otheri; 294 short omask = Short.parseShort(Integer.toString(mask), 8); 295 return new FsPermission(omask); 296 } 297 else { 298 if (permissions.length() == 10) { 299 return FsPermission.valueOf(permissions); 300 } 301 else { 302 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS010", 303 "chmod, path [{0}] invalid permissions mask [{1}]", path, permissions); 304 } 305 } 306 } 307 308 @Override 309 public void check(Context context, WorkflowAction action) throws ActionExecutorException { 310 } 311 312 @Override 313 public void kill(Context context, WorkflowAction action) throws ActionExecutorException { 314 } 315 316 @Override 317 public void start(Context context, WorkflowAction action) throws ActionExecutorException { 318 try { 319 context.setStartData("-", "-", "-"); 320 Element actionXml = XmlUtils.parseXml(action.getConf()); 321 doOperations(context, actionXml); 322 context.setExecutionData("OK", null); 323 } 324 catch (Exception ex) { 325 throw convertException(ex); 326 } 327 } 328 329 @Override 330 public void end(Context context, WorkflowAction action) throws ActionExecutorException { 331 String externalStatus = action.getExternalStatus(); 332 WorkflowAction.Status status = externalStatus.equals("OK") ? WorkflowAction.Status.OK : 333 WorkflowAction.Status.ERROR; 334 context.setEndData(status, getActionSignal(status)); 335 if (!context.getProtoActionConf().getBoolean("oozie.action.keep.action.dir", false)) { 336 try { 337 FileSystem fs = context.getAppFileSystem(); 338 fs.delete(context.getActionDir(), true); 339 } 340 catch (Exception ex) { 341 throw convertException(ex); 342 } 343 } 344 } 345 346 @Override 347 public boolean isCompleted(String externalStatus) { 348 return true; 349 } 350 351 /** 352 * @param context 353 * @return 354 * @throws HadoopAccessorException 355 * @throws IOException 356 * @throws URISyntaxException 357 */ 358 public Path getRecoveryPath(Context context) throws HadoopAccessorException, IOException, URISyntaxException { 359 return new Path(context.getActionDir(), "fs-" + context.getRecoveryId()); 360 } 361 362 }