This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.action.hadoop;
019
020 import java.io.IOException;
021 import java.io.StringReader;
022 import java.net.URISyntaxException;
023 import java.util.Iterator;
024 import java.util.List;
025
026 import org.apache.hadoop.fs.FSDataOutputStream;
027 import org.apache.hadoop.fs.FileStatus;
028 import org.apache.hadoop.fs.FileSystem;
029 import org.apache.hadoop.fs.Path;
030 import org.apache.hadoop.fs.permission.FsPermission;
031 import org.apache.hadoop.mapred.JobConf;
032 import org.apache.oozie.action.ActionExecutor;
033 import org.apache.oozie.action.ActionExecutorException;
034 import org.apache.oozie.client.WorkflowAction;
035 import org.apache.oozie.service.HadoopAccessorException;
036 import org.apache.oozie.service.HadoopAccessorService;
037 import org.apache.oozie.service.Services;
038 import org.apache.oozie.util.XConfiguration;
039 import org.apache.oozie.util.XmlUtils;
040 import org.jdom.Element;
041
042 /**
043 * File system action executor. <p/> This executes the file system mkdir, move and delete commands
044 */
045 public class FsActionExecutor extends ActionExecutor {
046
047 public FsActionExecutor() {
048 super("fs");
049 }
050
051 Path getPath(Element element, String attribute) {
052 String str = element.getAttributeValue(attribute).trim();
053 return new Path(str);
054 }
055
056 void validatePath(Path path, boolean withScheme) throws ActionExecutorException {
057 try {
058 String scheme = path.toUri().getScheme();
059 if (withScheme) {
060 if (scheme == null) {
061 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS001",
062 "Missing scheme in path [{0}]", path);
063 }
064 else {
065 Services.get().get(HadoopAccessorService.class).checkSupportedFilesystem(path.toUri());
066 }
067 }
068 else {
069 if (scheme != null) {
070 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS002",
071 "Scheme [{0}] not allowed in path [{1}]", scheme, path);
072 }
073 }
074 }
075 catch (HadoopAccessorException hex) {
076 throw convertException(hex);
077 }
078 }
079
080 Path resolveToFullPath(Path nameNode, Path path, boolean withScheme) throws ActionExecutorException {
081 Path fullPath;
082
083 // If no nameNode is given, validate the path as-is and return it as-is
084 if (nameNode == null) {
085 validatePath(path, withScheme);
086 fullPath = path;
087 } else {
088 // If the path doesn't have a scheme or authority, use the nameNode which should have already been verified earlier
089 String pathScheme = path.toUri().getScheme();
090 String pathAuthority = path.toUri().getAuthority();
091 if (pathScheme == null || pathAuthority == null) {
092 if (path.isAbsolute()) {
093 String nameNodeSchemeAuthority = nameNode.toUri().getScheme() + "://" + nameNode.toUri().getAuthority();
094 fullPath = new Path(nameNodeSchemeAuthority + path.toString());
095 } else {
096 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS011",
097 "Path [{0}] cannot be relative", path);
098 }
099 } else {
100 // If the path has a scheme and authority, but its not the nameNode then validate the path as-is and return it as-is
101 // If it is the nameNode, then it should have already been verified earlier so return it as-is
102 if (!nameNode.toUri().getScheme().equals(pathScheme) || !nameNode.toUri().getAuthority().equals(pathAuthority)) {
103 validatePath(path, withScheme);
104 }
105 fullPath = path;
106 }
107 }
108 return fullPath;
109 }
110
111 void validateSameNN(Path source, Path dest) throws ActionExecutorException {
112 Path destPath = new Path(source, dest);
113 String t = destPath.toUri().getScheme() + destPath.toUri().getAuthority();
114 String s = source.toUri().getScheme() + source.toUri().getAuthority();
115
116 //checking whether NN prefix of source and target is same. can modify this to adjust for a set of multiple whitelisted NN
117 if(!t.equals(s)) {
118 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS007",
119 "move, target NN URI different from that of source", dest);
120 }
121 }
122
123 @SuppressWarnings("unchecked")
124 void doOperations(Context context, Element element) throws ActionExecutorException {
125 try {
126 FileSystem fs = context.getAppFileSystem();
127 boolean recovery = fs.exists(getRecoveryPath(context));
128 if (!recovery) {
129 fs.mkdirs(getRecoveryPath(context));
130 }
131
132 Path nameNodePath = null;
133 Element nameNodeElement = element.getChild("name-node", element.getNamespace());
134 if (nameNodeElement != null) {
135 String nameNode = nameNodeElement.getTextTrim();
136 if (nameNode != null) {
137 nameNodePath = new Path(nameNode);
138 // Verify the name node now
139 validatePath(nameNodePath, true);
140 }
141 }
142
143 XConfiguration fsConf = new XConfiguration();
144 Path appPath = new Path(context.getWorkflow().getAppPath());
145 // app path could be a file
146 if (fs.isFile(appPath)) {
147 appPath = appPath.getParent();
148 }
149 JavaActionExecutor.parseJobXmlAndConfiguration(context, element, appPath, fsConf);
150
151 for (Element commandElement : (List<Element>) element.getChildren()) {
152 String command = commandElement.getName();
153 if (command.equals("mkdir")) {
154 Path path = getPath(commandElement, "path");
155 mkdir(context, fsConf, nameNodePath, path);
156 }
157 else {
158 if (command.equals("delete")) {
159 Path path = getPath(commandElement, "path");
160 delete(context, fsConf,nameNodePath, path);
161 }
162 else {
163 if (command.equals("move")) {
164 Path source = getPath(commandElement, "source");
165 Path target = getPath(commandElement, "target");
166 move(context, fsConf,nameNodePath, source, target, recovery);
167 }
168 else {
169 if (command.equals("chmod")) {
170 Path path = getPath(commandElement, "path");
171 boolean recursive = commandElement.getChild("recursive", commandElement.getNamespace()) != null;
172 String str = commandElement.getAttributeValue("dir-files");
173 boolean dirFiles = (str == null) || Boolean.parseBoolean(str);
174 String permissionsMask = commandElement.getAttributeValue("permissions").trim();
175 chmod(context, fsConf,nameNodePath, path, permissionsMask, dirFiles, recursive);
176 }
177 else {
178 if (command.equals("touchz")) {
179 Path path = getPath(commandElement, "path");
180 touchz(context, fsConf,nameNodePath, path);
181 }
182 }
183 }
184 }
185 }
186 }
187 }
188 catch (Exception ex) {
189 throw convertException(ex);
190 }
191 }
192
193 /**
194 * @param path
195 * @param context
196 * @param fsConf
197 * @return FileSystem
198 * @throws HadoopAccessorException
199 */
200 private FileSystem getFileSystemFor(Path path, Context context, XConfiguration fsConf) throws HadoopAccessorException {
201 String user = context.getWorkflow().getUser();
202 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
203 JobConf conf = has.createJobConf(path.toUri().getAuthority());
204 XConfiguration.copy(context.getProtoActionConf(), conf);
205 if (fsConf != null) {
206 XConfiguration.copy(fsConf, conf);
207 }
208 return has.createFileSystem(user, path.toUri(), conf);
209 }
210
211 /**
212 * @param path
213 * @param user
214 * @param group
215 * @return FileSystem
216 * @throws HadoopAccessorException
217 */
218 private FileSystem getFileSystemFor(Path path, String user) throws HadoopAccessorException {
219 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
220 JobConf jobConf = has.createJobConf(path.toUri().getAuthority());
221 return has.createFileSystem(user, path.toUri(), jobConf);
222 }
223
224 void mkdir(Context context, Path path) throws ActionExecutorException {
225 mkdir(context, null, null, path);
226 }
227
228 void mkdir(Context context, XConfiguration fsConf, Path nameNodePath, Path path) throws ActionExecutorException {
229 try {
230 path = resolveToFullPath(nameNodePath, path, true);
231 FileSystem fs = getFileSystemFor(path, context, fsConf);
232
233 if (!fs.exists(path)) {
234 if (!fs.mkdirs(path)) {
235 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS004",
236 "mkdir, path [{0}] could not create directory", path);
237 }
238 }
239 }
240 catch (Exception ex) {
241 throw convertException(ex);
242 }
243 }
244
245 /**
246 * Delete path
247 *
248 * @param context
249 * @param path
250 * @throws ActionExecutorException
251 */
252 public void delete(Context context, Path path) throws ActionExecutorException {
253 delete(context, null, null, path);
254 }
255
256 /**
257 * Delete path
258 *
259 * @param context
260 * @param fsConf
261 * @param nameNodePath
262 * @param path
263 * @throws ActionExecutorException
264 */
265 public void delete(Context context, XConfiguration fsConf, Path nameNodePath, Path path) throws ActionExecutorException {
266 try {
267 path = resolveToFullPath(nameNodePath, path, true);
268 FileSystem fs = getFileSystemFor(path, context, fsConf);
269
270 if (fs.exists(path)) {
271 if (!fs.delete(path, true)) {
272 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS005",
273 "delete, path [{0}] could not delete path", path);
274 }
275 }
276 }
277 catch (Exception ex) {
278 throw convertException(ex);
279 }
280 }
281
282 /**
283 * Delete path
284 *
285 * @param user
286 * @param group
287 * @param path
288 * @throws ActionExecutorException
289 */
290 public void delete(String user, String group, Path path) throws ActionExecutorException {
291 try {
292 validatePath(path, true);
293 FileSystem fs = getFileSystemFor(path, user);
294
295 if (fs.exists(path)) {
296 if (!fs.delete(path, true)) {
297 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS005",
298 "delete, path [{0}] could not delete path", path);
299 }
300 }
301 }
302 catch (Exception ex) {
303 throw convertException(ex);
304 }
305 }
306
307 /**
308 * Move source to target
309 *
310 * @param context
311 * @param source
312 * @param target
313 * @param recovery
314 * @throws ActionExecutorException
315 */
316 public void move(Context context, Path source, Path target, boolean recovery) throws ActionExecutorException {
317 move(context, null, null, source, target, recovery);
318 }
319
320 /**
321 * Move source to target
322 *
323 * @param context
324 * @param fsConf
325 * @param nameNodePath
326 * @param source
327 * @param target
328 * @param recovery
329 * @throws ActionExecutorException
330 */
331 public void move(Context context, XConfiguration fsConf, Path nameNodePath, Path source, Path target, boolean recovery)
332 throws ActionExecutorException {
333 try {
334 source = resolveToFullPath(nameNodePath, source, true);
335 validateSameNN(source, target);
336 FileSystem fs = getFileSystemFor(source, context, fsConf);
337
338 if (!fs.exists(source) && !recovery) {
339 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS006",
340 "move, source path [{0}] does not exist", source);
341 }
342
343 if (!fs.rename(source, target) && !recovery) {
344 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS008",
345 "move, could not move [{0}] to [{1}]", source, target);
346 }
347 }
348 catch (Exception ex) {
349 throw convertException(ex);
350 }
351 }
352
353 void chmod(Context context, Path path, String permissions, boolean dirFiles, boolean recursive) throws ActionExecutorException {
354 chmod(context, null, null, path, permissions, dirFiles, recursive);
355 }
356
357 void chmod(Context context, XConfiguration fsConf, Path nameNodePath, Path path, String permissions, boolean dirFiles,
358 boolean recursive) throws ActionExecutorException {
359 try {
360 path = resolveToFullPath(nameNodePath, path, true);
361 FileSystem fs = getFileSystemFor(path, context, fsConf);
362
363 if (!fs.exists(path)) {
364 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS009",
365 "chmod, path [{0}] does not exist", path);
366 }
367
368 FileStatus pathStatus = fs.getFileStatus(path);
369
370 Path[] paths;
371 if (dirFiles && pathStatus.isDir()) {
372 FileStatus[] filesStatus = fs.listStatus(path);
373 paths = new Path[filesStatus.length];
374 for (int i = 0; i < filesStatus.length; i++) {
375 paths[i] = filesStatus[i].getPath();
376 if (recursive && filesStatus[i].isDir()){
377 chmod(context, fsConf, nameNodePath, paths[i], permissions, dirFiles, recursive);
378 }
379 }
380 }
381 else {
382 paths = new Path[]{path};
383 }
384
385 FsPermission newFsPermission = createShortPermission(permissions, path);
386 fs.setPermission(path, newFsPermission);
387 for (Path p : paths) {
388 fs.setPermission(p, newFsPermission);
389 }
390 }
391 catch (Exception ex) {
392 throw convertException(ex);
393 }
394 }
395
396 void touchz(Context context, Path path) throws ActionExecutorException {
397 touchz(context, null, null, path);
398 }
399
400 void touchz(Context context, XConfiguration fsConf, Path nameNodePath, Path path) throws ActionExecutorException {
401 try {
402 path = resolveToFullPath(nameNodePath, path, true);
403 FileSystem fs = getFileSystemFor(path, context, fsConf);
404
405 FileStatus st;
406 if (fs.exists(path)) {
407 st = fs.getFileStatus(path);
408 if (st.isDir()) {
409 throw new Exception(path.toString() + " is a directory");
410 } else if (st.getLen() != 0)
411 throw new Exception(path.toString() + " must be a zero-length file");
412 }
413 FSDataOutputStream out = fs.create(path);
414 out.close();
415 }
416 catch (Exception ex) {
417 throw convertException(ex);
418 }
419 }
420
421 FsPermission createShortPermission(String permissions, Path path) throws ActionExecutorException {
422 if (permissions.length() == 3) {
423 char user = permissions.charAt(0);
424 char group = permissions.charAt(1);
425 char other = permissions.charAt(2);
426 int useri = user - '0';
427 int groupi = group - '0';
428 int otheri = other - '0';
429 int mask = useri * 100 + groupi * 10 + otheri;
430 short omask = Short.parseShort(Integer.toString(mask), 8);
431 return new FsPermission(omask);
432 }
433 else {
434 if (permissions.length() == 10) {
435 return FsPermission.valueOf(permissions);
436 }
437 else {
438 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS010",
439 "chmod, path [{0}] invalid permissions mask [{1}]", path, permissions);
440 }
441 }
442 }
443
444 @Override
445 public void check(Context context, WorkflowAction action) throws ActionExecutorException {
446 }
447
448 @Override
449 public void kill(Context context, WorkflowAction action) throws ActionExecutorException {
450 }
451
452 @Override
453 public void start(Context context, WorkflowAction action) throws ActionExecutorException {
454 try {
455 context.setStartData("-", "-", "-");
456 Element actionXml = XmlUtils.parseXml(action.getConf());
457 doOperations(context, actionXml);
458 context.setExecutionData("OK", null);
459 }
460 catch (Exception ex) {
461 throw convertException(ex);
462 }
463 }
464
465 @Override
466 public void end(Context context, WorkflowAction action) throws ActionExecutorException {
467 String externalStatus = action.getExternalStatus();
468 WorkflowAction.Status status = externalStatus.equals("OK") ? WorkflowAction.Status.OK :
469 WorkflowAction.Status.ERROR;
470 context.setEndData(status, getActionSignal(status));
471 if (!context.getProtoActionConf().getBoolean("oozie.action.keep.action.dir", false)) {
472 try {
473 FileSystem fs = context.getAppFileSystem();
474 fs.delete(context.getActionDir(), true);
475 }
476 catch (Exception ex) {
477 throw convertException(ex);
478 }
479 }
480 }
481
482 @Override
483 public boolean isCompleted(String externalStatus) {
484 return true;
485 }
486
487 /**
488 * @param context
489 * @return
490 * @throws HadoopAccessorException
491 * @throws IOException
492 * @throws URISyntaxException
493 */
494 public Path getRecoveryPath(Context context) throws HadoopAccessorException, IOException, URISyntaxException {
495 return new Path(context.getActionDir(), "fs-" + context.getRecoveryId());
496 }
497
498 }