This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.action.hadoop;
019
020 import java.io.IOException;
021 import java.io.StringReader;
022 import java.net.URISyntaxException;
023 import java.util.ArrayList;
024 import java.util.HashMap;
025 import java.util.Iterator;
026 import java.util.List;
027 import java.util.Map;
028
029 import org.apache.hadoop.fs.FSDataOutputStream;
030 import org.apache.hadoop.fs.FileStatus;
031 import org.apache.hadoop.fs.FileSystem;
032 import org.apache.hadoop.fs.Path;
033 import org.apache.hadoop.fs.permission.FsPermission;
034 import org.apache.hadoop.mapred.JobConf;
035 import org.apache.oozie.action.ActionExecutor;
036 import org.apache.oozie.action.ActionExecutorException;
037 import org.apache.oozie.client.WorkflowAction;
038 import org.apache.oozie.service.HadoopAccessorException;
039 import org.apache.oozie.service.HadoopAccessorService;
040 import org.apache.oozie.service.Services;
041 import org.apache.oozie.util.XConfiguration;
042 import org.apache.oozie.util.XmlUtils;
043 import org.jdom.Element;
044
045 /**
046 * File system action executor. <p/> This executes the file system mkdir, move and delete commands
047 */
048 public class FsActionExecutor extends ActionExecutor {
049
050 public FsActionExecutor() {
051 super("fs");
052 }
053
054 Path getPath(Element element, String attribute) {
055 String str = element.getAttributeValue(attribute).trim();
056 return new Path(str);
057 }
058
059 void validatePath(Path path, boolean withScheme) throws ActionExecutorException {
060 try {
061 String scheme = path.toUri().getScheme();
062 if (withScheme) {
063 if (scheme == null) {
064 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS001",
065 "Missing scheme in path [{0}]", path);
066 }
067 else {
068 Services.get().get(HadoopAccessorService.class).checkSupportedFilesystem(path.toUri());
069 }
070 }
071 else {
072 if (scheme != null) {
073 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS002",
074 "Scheme [{0}] not allowed in path [{1}]", scheme, path);
075 }
076 }
077 }
078 catch (HadoopAccessorException hex) {
079 throw convertException(hex);
080 }
081 }
082
083 Path resolveToFullPath(Path nameNode, Path path, boolean withScheme) throws ActionExecutorException {
084 Path fullPath;
085
086 // If no nameNode is given, validate the path as-is and return it as-is
087 if (nameNode == null) {
088 validatePath(path, withScheme);
089 fullPath = path;
090 } else {
091 // If the path doesn't have a scheme or authority, use the nameNode which should have already been verified earlier
092 String pathScheme = path.toUri().getScheme();
093 String pathAuthority = path.toUri().getAuthority();
094 if (pathScheme == null || pathAuthority == null) {
095 if (path.isAbsolute()) {
096 String nameNodeSchemeAuthority = nameNode.toUri().getScheme() + "://" + nameNode.toUri().getAuthority();
097 fullPath = new Path(nameNodeSchemeAuthority + path.toString());
098 } else {
099 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS011",
100 "Path [{0}] cannot be relative", path);
101 }
102 } else {
103 // If the path has a scheme and authority, but its not the nameNode then validate the path as-is and return it as-is
104 // If it is the nameNode, then it should have already been verified earlier so return it as-is
105 if (!nameNode.toUri().getScheme().equals(pathScheme) || !nameNode.toUri().getAuthority().equals(pathAuthority)) {
106 validatePath(path, withScheme);
107 }
108 fullPath = path;
109 }
110 }
111 return fullPath;
112 }
113
114 void validateSameNN(Path source, Path dest) throws ActionExecutorException {
115 Path destPath = new Path(source, dest);
116 String t = destPath.toUri().getScheme() + destPath.toUri().getAuthority();
117 String s = source.toUri().getScheme() + source.toUri().getAuthority();
118
119 //checking whether NN prefix of source and target is same. can modify this to adjust for a set of multiple whitelisted NN
120 if(!t.equals(s)) {
121 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS007",
122 "move, target NN URI different from that of source", dest);
123 }
124 }
125
126 @SuppressWarnings("unchecked")
127 void doOperations(Context context, Element element) throws ActionExecutorException {
128 try {
129 FileSystem fs = context.getAppFileSystem();
130 boolean recovery = fs.exists(getRecoveryPath(context));
131 if (!recovery) {
132 fs.mkdirs(getRecoveryPath(context));
133 }
134
135 Path nameNodePath = null;
136 Element nameNodeElement = element.getChild("name-node", element.getNamespace());
137 if (nameNodeElement != null) {
138 String nameNode = nameNodeElement.getTextTrim();
139 if (nameNode != null) {
140 nameNodePath = new Path(nameNode);
141 // Verify the name node now
142 validatePath(nameNodePath, true);
143 }
144 }
145
146 XConfiguration fsConf = new XConfiguration();
147 Path appPath = new Path(context.getWorkflow().getAppPath());
148 // app path could be a file
149 if (fs.isFile(appPath)) {
150 appPath = appPath.getParent();
151 }
152 JavaActionExecutor.parseJobXmlAndConfiguration(context, element, appPath, fsConf);
153
154 for (Element commandElement : (List<Element>) element.getChildren()) {
155 String command = commandElement.getName();
156 if (command.equals("mkdir")) {
157 Path path = getPath(commandElement, "path");
158 mkdir(context, fsConf, nameNodePath, path);
159 }
160 else {
161 if (command.equals("delete")) {
162 Path path = getPath(commandElement, "path");
163 delete(context, fsConf, nameNodePath, path);
164 }
165 else {
166 if (command.equals("move")) {
167 Path source = getPath(commandElement, "source");
168 Path target = getPath(commandElement, "target");
169 move(context, fsConf, nameNodePath, source, target, recovery);
170 }
171 else {
172 if (command.equals("chmod")) {
173 Path path = getPath(commandElement, "path");
174 boolean recursive = commandElement.getChild("recursive", commandElement.getNamespace()) != null;
175 String str = commandElement.getAttributeValue("dir-files");
176 boolean dirFiles = (str == null) || Boolean.parseBoolean(str);
177 String permissionsMask = commandElement.getAttributeValue("permissions").trim();
178 chmod(context, fsConf, nameNodePath, path, permissionsMask, dirFiles, recursive);
179 }
180 else {
181 if (command.equals("touchz")) {
182 Path path = getPath(commandElement, "path");
183 touchz(context, fsConf, nameNodePath, path);
184 }
185 else {
186 if (command.equals("chgrp")) {
187 Path path = getPath(commandElement, "path");
188 boolean recursive = commandElement.getChild("recursive",
189 commandElement.getNamespace()) != null;
190 String group = commandElement.getAttributeValue("group");
191 String str = commandElement.getAttributeValue("dir-files");
192 boolean dirFiles = (str == null) || Boolean.parseBoolean(str);
193 chgrp(context, fsConf, nameNodePath, path, context.getWorkflow().getUser(),
194 group, dirFiles, recursive);
195 }
196 }
197 }
198 }
199 }
200 }
201 }
202 }
203 catch (Exception ex) {
204 throw convertException(ex);
205 }
206 }
207
208 void chgrp(Context context, XConfiguration fsConf, Path nameNodePath, Path path, String user, String group,
209 boolean dirFiles, boolean recursive) throws ActionExecutorException {
210
211 HashMap<String, String> argsMap = new HashMap<String, String>();
212 argsMap.put("user", user);
213 argsMap.put("group", group);
214 try {
215 FileSystem fs = getFileSystemFor(path, context, fsConf);
216 recursiveFsOperation("chgrp", fs, nameNodePath, path, argsMap, dirFiles, recursive, true);
217 }
218 catch (Exception ex) {
219 throw convertException(ex);
220 }
221 }
222
223 private void recursiveFsOperation(String op, FileSystem fs, Path nameNodePath, Path path,
224 Map<String, String> argsMap, boolean dirFiles, boolean recursive, boolean isRoot)
225 throws ActionExecutorException {
226
227 try {
228 path = resolveToFullPath(nameNodePath, path, true);
229 if (!fs.exists(path)) {
230 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS009", op
231 + ", path [{0}] does not exist", path);
232 }
233 FileStatus pathStatus = fs.getFileStatus(path);
234 List<Path> paths = new ArrayList<Path>();
235
236 if (dirFiles && pathStatus.isDir()) {
237 if (isRoot) {
238 paths.add(path);
239 }
240 FileStatus[] filesStatus = fs.listStatus(path);
241 for (int i = 0; i < filesStatus.length; i++) {
242 Path p = filesStatus[i].getPath();
243 paths.add(p);
244 if (recursive && filesStatus[i].isDir()) {
245 recursiveFsOperation(op, fs, null, p, argsMap, dirFiles, recursive, false);
246 }
247 }
248 }
249 else {
250 paths.add(path);
251 }
252 for (Path p : paths) {
253 doFsOperation(op, fs, p, argsMap);
254 }
255 }
256 catch (Exception ex) {
257 throw convertException(ex);
258 }
259 }
260
261 private void doFsOperation(String op, FileSystem fs, Path p, Map<String, String> argsMap)
262 throws ActionExecutorException, IOException {
263 if (op.equals("chmod")) {
264 String permissions = argsMap.get("permissions");
265 FsPermission newFsPermission = createShortPermission(permissions, p);
266 fs.setPermission(p, newFsPermission);
267 }
268 else if (op.equals("chgrp")) {
269 String user = argsMap.get("user");
270 String group = argsMap.get("group");
271 fs.setOwner(p, user, group);
272 }
273 }
274
275 /**
276 * @param path
277 * @param context
278 * @param fsConf
279 * @return FileSystem
280 * @throws HadoopAccessorException
281 */
282 private FileSystem getFileSystemFor(Path path, Context context, XConfiguration fsConf) throws HadoopAccessorException {
283 String user = context.getWorkflow().getUser();
284 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
285 JobConf conf = has.createJobConf(path.toUri().getAuthority());
286 XConfiguration.copy(context.getProtoActionConf(), conf);
287 if (fsConf != null) {
288 XConfiguration.copy(fsConf, conf);
289 }
290 return has.createFileSystem(user, path.toUri(), conf);
291 }
292
293 /**
294 * @param path
295 * @param user
296 * @param group
297 * @return FileSystem
298 * @throws HadoopAccessorException
299 */
300 private FileSystem getFileSystemFor(Path path, String user) throws HadoopAccessorException {
301 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
302 JobConf jobConf = has.createJobConf(path.toUri().getAuthority());
303 return has.createFileSystem(user, path.toUri(), jobConf);
304 }
305
306 void mkdir(Context context, Path path) throws ActionExecutorException {
307 mkdir(context, null, null, path);
308 }
309
310 void mkdir(Context context, XConfiguration fsConf, Path nameNodePath, Path path) throws ActionExecutorException {
311 try {
312 path = resolveToFullPath(nameNodePath, path, true);
313 FileSystem fs = getFileSystemFor(path, context, fsConf);
314
315 if (!fs.exists(path)) {
316 if (!fs.mkdirs(path)) {
317 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS004",
318 "mkdir, path [{0}] could not create directory", path);
319 }
320 }
321 }
322 catch (Exception ex) {
323 throw convertException(ex);
324 }
325 }
326
327 /**
328 * Delete path
329 *
330 * @param context
331 * @param path
332 * @throws ActionExecutorException
333 */
334 public void delete(Context context, Path path) throws ActionExecutorException {
335 delete(context, null, null, path);
336 }
337
338 /**
339 * Delete path
340 *
341 * @param context
342 * @param fsConf
343 * @param nameNodePath
344 * @param path
345 * @throws ActionExecutorException
346 */
347 public void delete(Context context, XConfiguration fsConf, Path nameNodePath, Path path) throws ActionExecutorException {
348 try {
349 path = resolveToFullPath(nameNodePath, path, true);
350 FileSystem fs = getFileSystemFor(path, context, fsConf);
351
352 if (fs.exists(path)) {
353 if (!fs.delete(path, true)) {
354 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS005",
355 "delete, path [{0}] could not delete path", path);
356 }
357 }
358 }
359 catch (Exception ex) {
360 throw convertException(ex);
361 }
362 }
363
364 /**
365 * Delete path
366 *
367 * @param user
368 * @param group
369 * @param path
370 * @throws ActionExecutorException
371 */
372 public void delete(String user, String group, Path path) throws ActionExecutorException {
373 try {
374 validatePath(path, true);
375 FileSystem fs = getFileSystemFor(path, user);
376
377 if (fs.exists(path)) {
378 if (!fs.delete(path, true)) {
379 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS005",
380 "delete, path [{0}] could not delete path", path);
381 }
382 }
383 }
384 catch (Exception ex) {
385 throw convertException(ex);
386 }
387 }
388
389 /**
390 * Move source to target
391 *
392 * @param context
393 * @param source
394 * @param target
395 * @param recovery
396 * @throws ActionExecutorException
397 */
398 public void move(Context context, Path source, Path target, boolean recovery) throws ActionExecutorException {
399 move(context, null, null, source, target, recovery);
400 }
401
402 /**
403 * Move source to target
404 *
405 * @param context
406 * @param fsConf
407 * @param nameNodePath
408 * @param source
409 * @param target
410 * @param recovery
411 * @throws ActionExecutorException
412 */
413 public void move(Context context, XConfiguration fsConf, Path nameNodePath, Path source, Path target, boolean recovery)
414 throws ActionExecutorException {
415 try {
416 source = resolveToFullPath(nameNodePath, source, true);
417 validateSameNN(source, target);
418 FileSystem fs = getFileSystemFor(source, context, fsConf);
419
420 if (!fs.exists(source) && !recovery) {
421 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS006",
422 "move, source path [{0}] does not exist", source);
423 }
424
425 if (!fs.rename(source, target) && !recovery) {
426 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS008",
427 "move, could not move [{0}] to [{1}]", source, target);
428 }
429 }
430 catch (Exception ex) {
431 throw convertException(ex);
432 }
433 }
434
435 void chmod(Context context, Path path, String permissions, boolean dirFiles, boolean recursive) throws ActionExecutorException {
436 chmod(context, null, null, path, permissions, dirFiles, recursive);
437 }
438
439 void chmod(Context context, XConfiguration fsConf, Path nameNodePath, Path path, String permissions,
440 boolean dirFiles, boolean recursive) throws ActionExecutorException {
441
442 HashMap<String, String> argsMap = new HashMap<String, String>();
443 argsMap.put("permissions", permissions);
444 try {
445 FileSystem fs = getFileSystemFor(path, context, fsConf);
446 recursiveFsOperation("chmod", fs, nameNodePath, path, argsMap, dirFiles, recursive, true);
447 }
448 catch (Exception ex) {
449 throw convertException(ex);
450 }
451 }
452
453 void touchz(Context context, Path path) throws ActionExecutorException {
454 touchz(context, null, null, path);
455 }
456
457 void touchz(Context context, XConfiguration fsConf, Path nameNodePath, Path path) throws ActionExecutorException {
458 try {
459 path = resolveToFullPath(nameNodePath, path, true);
460 FileSystem fs = getFileSystemFor(path, context, fsConf);
461
462 FileStatus st;
463 if (fs.exists(path)) {
464 st = fs.getFileStatus(path);
465 if (st.isDir()) {
466 throw new Exception(path.toString() + " is a directory");
467 } else if (st.getLen() != 0)
468 throw new Exception(path.toString() + " must be a zero-length file");
469 }
470 FSDataOutputStream out = fs.create(path);
471 out.close();
472 }
473 catch (Exception ex) {
474 throw convertException(ex);
475 }
476 }
477
478 FsPermission createShortPermission(String permissions, Path path) throws ActionExecutorException {
479 if (permissions.length() == 3) {
480 char user = permissions.charAt(0);
481 char group = permissions.charAt(1);
482 char other = permissions.charAt(2);
483 int useri = user - '0';
484 int groupi = group - '0';
485 int otheri = other - '0';
486 int mask = useri * 100 + groupi * 10 + otheri;
487 short omask = Short.parseShort(Integer.toString(mask), 8);
488 return new FsPermission(omask);
489 }
490 else {
491 if (permissions.length() == 10) {
492 return FsPermission.valueOf(permissions);
493 }
494 else {
495 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS010",
496 "chmod, path [{0}] invalid permissions mask [{1}]", path, permissions);
497 }
498 }
499 }
500
501 @Override
502 public void check(Context context, WorkflowAction action) throws ActionExecutorException {
503 }
504
505 @Override
506 public void kill(Context context, WorkflowAction action) throws ActionExecutorException {
507 }
508
509 @Override
510 public void start(Context context, WorkflowAction action) throws ActionExecutorException {
511 try {
512 context.setStartData("-", "-", "-");
513 Element actionXml = XmlUtils.parseXml(action.getConf());
514 doOperations(context, actionXml);
515 context.setExecutionData("OK", null);
516 }
517 catch (Exception ex) {
518 throw convertException(ex);
519 }
520 }
521
522 @Override
523 public void end(Context context, WorkflowAction action) throws ActionExecutorException {
524 String externalStatus = action.getExternalStatus();
525 WorkflowAction.Status status = externalStatus.equals("OK") ? WorkflowAction.Status.OK :
526 WorkflowAction.Status.ERROR;
527 context.setEndData(status, getActionSignal(status));
528 if (!context.getProtoActionConf().getBoolean("oozie.action.keep.action.dir", false)) {
529 try {
530 FileSystem fs = context.getAppFileSystem();
531 fs.delete(context.getActionDir(), true);
532 }
533 catch (Exception ex) {
534 throw convertException(ex);
535 }
536 }
537 }
538
539 @Override
540 public boolean isCompleted(String externalStatus) {
541 return true;
542 }
543
544 /**
545 * @param context
546 * @return
547 * @throws HadoopAccessorException
548 * @throws IOException
549 * @throws URISyntaxException
550 */
551 public Path getRecoveryPath(Context context) throws HadoopAccessorException, IOException, URISyntaxException {
552 return new Path(context.getActionDir(), "fs-" + context.getRecoveryId());
553 }
554
555 }