001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.action.hadoop;
020
021import java.io.IOException;
022import java.net.URI;
023import java.net.URISyntaxException;
024import java.security.PrivilegedExceptionAction;
025import java.util.ArrayList;
026import java.util.HashMap;
027import java.util.List;
028import java.util.Map;
029
030import org.apache.hadoop.fs.FSDataOutputStream;
031import org.apache.hadoop.fs.FileStatus;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.FileUtil;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.fs.Trash;
036import org.apache.hadoop.fs.permission.FsPermission;
037import org.apache.hadoop.mapred.JobConf;
038import org.apache.hadoop.security.AccessControlException;
039import org.apache.hadoop.security.UserGroupInformation;
040import org.apache.oozie.action.ActionExecutor;
041import org.apache.oozie.action.ActionExecutorException;
042import org.apache.oozie.client.OozieClient;
043import org.apache.oozie.client.WorkflowAction;
044import org.apache.oozie.dependency.FSURIHandler;
045import org.apache.oozie.dependency.URIHandler;
046import org.apache.oozie.service.ConfigurationService;
047import org.apache.oozie.service.HadoopAccessorException;
048import org.apache.oozie.service.HadoopAccessorService;
049import org.apache.oozie.service.Services;
050import org.apache.oozie.service.UserGroupInformationService;
051import org.apache.oozie.service.URIHandlerService;
052import org.apache.oozie.util.XConfiguration;
053import org.apache.oozie.util.XmlUtils;
054import org.jdom.Element;
055
056/**
057 * File system action executor. <p> This executes the file system mkdir, move and delete commands
058 */
059public class FsActionExecutor extends ActionExecutor {
060
061    public static final String ACTION_TYPE = "fs";
062
063    private final int maxGlobCount;
064
065    public FsActionExecutor() {
066        super(ACTION_TYPE);
067        maxGlobCount = ConfigurationService.getInt(LauncherMapper.CONF_OOZIE_ACTION_FS_GLOB_MAX);
068    }
069
070    /**
071    * Initialize Action.
072    */
073    @Override
074    public void initActionType() {
075        super.initActionType();
076        registerError(AccessControlException.class.getName(), ActionExecutorException.ErrorType.ERROR, "FS014");
077    }
078
079    Path getPath(Element element, String attribute) {
080        String str = element.getAttributeValue(attribute).trim();
081        return new Path(str);
082    }
083
084    void validatePath(Path path, boolean withScheme) throws ActionExecutorException {
085        try {
086            String scheme = path.toUri().getScheme();
087            if (withScheme) {
088                if (scheme == null) {
089                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS001",
090                                                      "Missing scheme in path [{0}]", path);
091                }
092                else {
093                    Services.get().get(HadoopAccessorService.class).checkSupportedFilesystem(path.toUri());
094                }
095            }
096            else {
097                if (scheme != null) {
098                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS002",
099                                                      "Scheme [{0}] not allowed in path [{1}]", scheme, path);
100                }
101            }
102        }
103        catch (HadoopAccessorException hex) {
104            throw convertException(hex);
105        }
106    }
107
108    Path resolveToFullPath(Path nameNode, Path path, boolean withScheme) throws ActionExecutorException {
109        Path fullPath;
110
111        // If no nameNode is given, validate the path as-is and return it as-is
112        if (nameNode == null) {
113            validatePath(path, withScheme);
114            fullPath = path;
115        } else {
116            // If the path doesn't have a scheme or authority, use the nameNode which should have already been verified earlier
117            String pathScheme = path.toUri().getScheme();
118            String pathAuthority = path.toUri().getAuthority();
119            if (pathScheme == null || pathAuthority == null) {
120                if (path.isAbsolute()) {
121                    String nameNodeSchemeAuthority = nameNode.toUri().getScheme() + "://" + nameNode.toUri().getAuthority();
122                    fullPath = new Path(nameNodeSchemeAuthority + path.toString());
123                } else {
124                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS011",
125                            "Path [{0}] cannot be relative", path);
126                }
127            } else {
128                // If the path has a scheme and authority, but its not the nameNode then validate the path as-is and return it as-is
129                // If it is the nameNode, then it should have already been verified earlier so return it as-is
130                if (!nameNode.toUri().getScheme().equals(pathScheme) || !nameNode.toUri().getAuthority().equals(pathAuthority)) {
131                    validatePath(path, withScheme);
132                }
133                fullPath = path;
134            }
135        }
136        return fullPath;
137    }
138
139    void validateSameNN(Path source, Path dest) throws ActionExecutorException {
140        Path destPath = new Path(source, dest);
141        String t = destPath.toUri().getScheme() + destPath.toUri().getAuthority();
142        String s = source.toUri().getScheme() + source.toUri().getAuthority();
143
144        //checking whether NN prefix of source and target is same. can modify this to adjust for a set of multiple whitelisted NN
145        if(!t.equals(s)) {
146            throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS007",
147                    "move, target NN URI different from that of source", dest);
148        }
149    }
150
151    @SuppressWarnings("unchecked")
152    void doOperations(Context context, Element element) throws ActionExecutorException {
153        try {
154            FileSystem fs = context.getAppFileSystem();
155            boolean recovery = fs.exists(getRecoveryPath(context));
156            if (!recovery) {
157                fs.mkdirs(getRecoveryPath(context));
158            }
159
160            Path nameNodePath = null;
161            Element nameNodeElement = element.getChild("name-node", element.getNamespace());
162            if (nameNodeElement != null) {
163                String nameNode = nameNodeElement.getTextTrim();
164                if (nameNode != null) {
165                    nameNodePath = new Path(nameNode);
166                    // Verify the name node now
167                    validatePath(nameNodePath, true);
168                }
169            }
170
171            XConfiguration fsConf = new XConfiguration();
172            Path appPath = new Path(context.getWorkflow().getAppPath());
173            // app path could be a file
174            if (fs.isFile(appPath)) {
175                appPath = appPath.getParent();
176            }
177            JavaActionExecutor.parseJobXmlAndConfiguration(context, element, appPath, fsConf);
178
179            for (Element commandElement : (List<Element>) element.getChildren()) {
180                String command = commandElement.getName();
181                if (command.equals("mkdir")) {
182                    Path path = getPath(commandElement, "path");
183                    mkdir(context, fsConf, nameNodePath, path);
184                }
185                else {
186                    if (command.equals("delete")) {
187                        Path path = getPath(commandElement, "path");
188                        boolean skipTrash = true;
189                        if (commandElement.getAttributeValue("skip-trash") != null &&
190                                commandElement.getAttributeValue("skip-trash").equals("false")) {
191                            skipTrash = false;
192                        }
193                        delete(context, fsConf, nameNodePath, path, skipTrash);
194                    }
195                    else {
196                        if (command.equals("move")) {
197                            Path source = getPath(commandElement, "source");
198                            Path target = getPath(commandElement, "target");
199                            move(context, fsConf, nameNodePath, source, target, recovery);
200                        }
201                        else {
202                            if (command.equals("chmod")) {
203                                Path path = getPath(commandElement, "path");
204                                boolean recursive = commandElement.getChild("recursive", commandElement.getNamespace()) != null;
205                                String str = commandElement.getAttributeValue("dir-files");
206                                boolean dirFiles = (str == null) || Boolean.parseBoolean(str);
207                                String permissionsMask = commandElement.getAttributeValue("permissions").trim();
208                                chmod(context, fsConf, nameNodePath, path, permissionsMask, dirFiles, recursive);
209                            }
210                            else {
211                                if (command.equals("touchz")) {
212                                    Path path = getPath(commandElement, "path");
213                                    touchz(context, fsConf, nameNodePath, path);
214                                }
215                                else {
216                                    if (command.equals("chgrp")) {
217                                        Path path = getPath(commandElement, "path");
218                                        boolean recursive = commandElement.getChild("recursive",
219                                                commandElement.getNamespace()) != null;
220                                        String group = commandElement.getAttributeValue("group");
221                                        String str = commandElement.getAttributeValue("dir-files");
222                                        boolean dirFiles = (str == null) || Boolean.parseBoolean(str);
223                                        chgrp(context, fsConf, nameNodePath, path, context.getWorkflow().getUser(),
224                                                group, dirFiles, recursive);
225                                    }
226                                }
227                            }
228                        }
229                    }
230                }
231            }
232        }
233        catch (Exception ex) {
234            throw convertException(ex);
235        }
236    }
237
238    void chgrp(Context context, XConfiguration fsConf, Path nameNodePath, Path path, String user, String group,
239            boolean dirFiles, boolean recursive) throws ActionExecutorException {
240
241        HashMap<String, String> argsMap = new HashMap<String, String>();
242        argsMap.put("user", user);
243        argsMap.put("group", group);
244        try {
245            FileSystem fs = getFileSystemFor(path, context, fsConf);
246            path = resolveToFullPath(nameNodePath, path, true);
247            Path[] pathArr = FileUtil.stat2Paths(fs.globStatus(path));
248            if (pathArr == null || pathArr.length == 0) {
249                throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS009", "chgrp"
250                        + ", path(s) that matches [{0}] does not exist", path);
251            }
252            checkGlobMax(pathArr);
253            for (Path p : pathArr) {
254                recursiveFsOperation("chgrp", fs, nameNodePath, p, argsMap, dirFiles, recursive, true);
255            }
256        }
257        catch (Exception ex) {
258            throw convertException(ex);
259        }
260    }
261
262    private void recursiveFsOperation(String op, FileSystem fs, Path nameNodePath, Path path,
263            Map<String, String> argsMap, boolean dirFiles, boolean recursive, boolean isRoot)
264            throws ActionExecutorException {
265
266        try {
267            FileStatus pathStatus = fs.getFileStatus(path);
268            List<Path> paths = new ArrayList<Path>();
269
270            if (dirFiles && pathStatus.isDir()) {
271                if (isRoot) {
272                    paths.add(path);
273                }
274                FileStatus[] filesStatus = fs.listStatus(path);
275                for (int i = 0; i < filesStatus.length; i++) {
276                    Path p = filesStatus[i].getPath();
277                    paths.add(p);
278                    if (recursive && filesStatus[i].isDir()) {
279                        recursiveFsOperation(op, fs, null, p, argsMap, dirFiles, recursive, false);
280                    }
281                }
282            }
283            else {
284                paths.add(path);
285            }
286            for (Path p : paths) {
287                doFsOperation(op, fs, p, argsMap);
288            }
289        }
290        catch (Exception ex) {
291            throw convertException(ex);
292        }
293    }
294
295    private void doFsOperation(String op, FileSystem fs, Path p, Map<String, String> argsMap)
296            throws ActionExecutorException, IOException {
297        if (op.equals("chmod")) {
298            String permissions = argsMap.get("permissions");
299            FsPermission newFsPermission = createShortPermission(permissions, p);
300            fs.setPermission(p, newFsPermission);
301        }
302        else if (op.equals("chgrp")) {
303            String user = argsMap.get("user");
304            String group = argsMap.get("group");
305            fs.setOwner(p, user, group);
306        }
307    }
308
309    /**
310     * @param path
311     * @param context
312     * @param fsConf
313     * @return FileSystem
314     * @throws HadoopAccessorException
315     */
316    private FileSystem getFileSystemFor(Path path, Context context, XConfiguration fsConf) throws HadoopAccessorException {
317        String user = context.getWorkflow().getUser();
318        HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
319        JobConf conf = has.createJobConf(path.toUri().getAuthority());
320        XConfiguration.copy(context.getProtoActionConf(), conf);
321        if (fsConf != null) {
322            XConfiguration.copy(fsConf, conf);
323        }
324        return has.createFileSystem(user, path.toUri(), conf);
325    }
326
327    /**
328     * @param path
329     * @param user
330     * @return FileSystem
331     * @throws HadoopAccessorException
332     */
333    private FileSystem getFileSystemFor(Path path, String user) throws HadoopAccessorException {
334        HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
335        JobConf jobConf = has.createJobConf(path.toUri().getAuthority());
336        return has.createFileSystem(user, path.toUri(), jobConf);
337    }
338
339    void mkdir(Context context, Path path) throws ActionExecutorException {
340        mkdir(context, null, null, path);
341    }
342
343    void mkdir(Context context, XConfiguration fsConf, Path nameNodePath, Path path) throws ActionExecutorException {
344        try {
345            path = resolveToFullPath(nameNodePath, path, true);
346            FileSystem fs = getFileSystemFor(path, context, fsConf);
347
348            if (!fs.exists(path)) {
349                if (!fs.mkdirs(path)) {
350                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS004",
351                                                      "mkdir, path [{0}] could not create directory", path);
352                }
353            }
354        }
355        catch (Exception ex) {
356            throw convertException(ex);
357        }
358    }
359
360    /**
361     * Delete path
362     *
363     * @param context
364     * @param path
365     * @throws ActionExecutorException
366     */
367    public void delete(Context context, Path path) throws ActionExecutorException {
368        delete(context, null, null, path, true);
369    }
370
371    /**
372     * Delete path
373     *
374     * @param context
375     * @param fsConf
376     * @param nameNodePath
377     * @param path
378     * @throws ActionExecutorException
379     */
380    public void delete(Context context, XConfiguration fsConf, Path nameNodePath, Path path, boolean skipTrash)
381            throws ActionExecutorException {
382        URI uri = path.toUri();
383        URIHandler handler;
384        org.apache.oozie.dependency.URIHandler.Context hcatContext = null;
385        try {
386            handler = Services.get().get(URIHandlerService.class).getURIHandler(uri);
387            if (handler instanceof FSURIHandler) {
388                // Use legacy code to handle hdfs partition deletion
389                path = resolveToFullPath(nameNodePath, path, true);
390                final FileSystem fs = getFileSystemFor(path, context, fsConf);
391                Path[] pathArr = FileUtil.stat2Paths(fs.globStatus(path));
392                if (pathArr != null && pathArr.length > 0) {
393                    checkGlobMax(pathArr);
394                    for (final Path p : pathArr) {
395                        if (fs.exists(p)) {
396                            if (!skipTrash) {
397                                // Moving directory/file to trash of user.
398                                UserGroupInformationService ugiService = Services.get().get(UserGroupInformationService.class);
399                                UserGroupInformation ugi = ugiService.getProxyUser(fs.getConf().get(OozieClient.USER_NAME));
400                                ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
401                                    @Override
402                                    public FileSystem run() throws Exception {
403                                        Trash trash = new Trash(fs.getConf());
404                                        if (!trash.moveToTrash(p)) {
405                                            throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS005",
406                                                    "Could not move path [{0}] to trash on delete", p);
407                                        }
408                                        return null;
409                                    }
410                                });
411                            }
412                            else if (!fs.delete(p, true)) {
413                                throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS005",
414                                        "delete, path [{0}] could not delete path", p);
415                            }
416                        }
417                    }
418                }
419            } else {
420                hcatContext = handler.getContext(uri, fsConf, context.getWorkflow().getUser(), false);
421                handler.delete(uri, hcatContext);
422            }
423        }
424        catch (Exception ex) {
425            throw convertException(ex);
426        }
427        finally{
428            if (hcatContext != null) {
429                hcatContext.destroy();
430            }
431        }
432    }
433
434    /**
435     * Delete path
436     *
437     * @param user
438     * @param group
439     * @param path
440     * @throws ActionExecutorException
441     */
442    public void delete(String user, String group, Path path) throws ActionExecutorException {
443        try {
444            validatePath(path, true);
445            FileSystem fs = getFileSystemFor(path, user);
446
447            if (fs.exists(path)) {
448                if (!fs.delete(path, true)) {
449                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS005",
450                            "delete, path [{0}] could not delete path", path);
451                }
452            }
453        }
454        catch (Exception ex) {
455            throw convertException(ex);
456        }
457    }
458
459    /**
460     * Move source to target
461     *
462     * @param context
463     * @param source
464     * @param target
465     * @param recovery
466     * @throws ActionExecutorException
467     */
468    public void move(Context context, Path source, Path target, boolean recovery) throws ActionExecutorException {
469        move(context, null, null, source, target, recovery);
470    }
471
472    /**
473     * Move source to target
474     *
475     * @param context
476     * @param fsConf
477     * @param nameNodePath
478     * @param source
479     * @param target
480     * @param recovery
481     * @throws ActionExecutorException
482     */
483    public void move(Context context, XConfiguration fsConf, Path nameNodePath, Path source, Path target, boolean recovery)
484            throws ActionExecutorException {
485        try {
486            source = resolveToFullPath(nameNodePath, source, true);
487            validateSameNN(source, target);
488            FileSystem fs = getFileSystemFor(source, context, fsConf);
489            Path[] pathArr = FileUtil.stat2Paths(fs.globStatus(source));
490            if (( pathArr == null || pathArr.length == 0 ) ){
491                if (!recovery) {
492                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS006",
493                        "move, source path [{0}] does not exist", source);
494                } else {
495                    return;
496                }
497            }
498            if (pathArr.length > 1 && (!fs.exists(target) || fs.isFile(target))) {
499                if(!recovery) {
500                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS012",
501                            "move, could not rename multiple sources to the same target name");
502                } else {
503                    return;
504                }
505            }
506            checkGlobMax(pathArr);
507            for (Path p : pathArr) {
508                if (!fs.rename(p, target) && !recovery) {
509                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS008",
510                            "move, could not move [{0}] to [{1}]", p, target);
511                }
512            }
513        }
514        catch (Exception ex) {
515            throw convertException(ex);
516        }
517    }
518
519    void chmod(Context context, Path path, String permissions, boolean dirFiles, boolean recursive) throws ActionExecutorException {
520        chmod(context, null, null, path, permissions, dirFiles, recursive);
521    }
522
523    void chmod(Context context, XConfiguration fsConf, Path nameNodePath, Path path, String permissions,
524            boolean dirFiles, boolean recursive) throws ActionExecutorException {
525
526        HashMap<String, String> argsMap = new HashMap<String, String>();
527        argsMap.put("permissions", permissions);
528        try {
529            FileSystem fs = getFileSystemFor(path, context, fsConf);
530            path = resolveToFullPath(nameNodePath, path, true);
531            Path[] pathArr = FileUtil.stat2Paths(fs.globStatus(path));
532            if (pathArr == null || pathArr.length == 0) {
533                throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS009", "chmod"
534                        + ", path(s) that matches [{0}] does not exist", path);
535            }
536            checkGlobMax(pathArr);
537            for (Path p : pathArr) {
538                recursiveFsOperation("chmod", fs, nameNodePath, p, argsMap, dirFiles, recursive, true);
539            }
540
541        }
542        catch (Exception ex) {
543            throw convertException(ex);
544        }
545    }
546
547    void touchz(Context context, Path path) throws ActionExecutorException {
548        touchz(context, null, null, path);
549    }
550
551    void touchz(Context context, XConfiguration fsConf, Path nameNodePath, Path path) throws ActionExecutorException {
552        try {
553            path = resolveToFullPath(nameNodePath, path, true);
554            FileSystem fs = getFileSystemFor(path, context, fsConf);
555
556            FileStatus st;
557            if (fs.exists(path)) {
558                st = fs.getFileStatus(path);
559                if (st.isDir()) {
560                    throw new Exception(path.toString() + " is a directory");
561                } else if (st.getLen() != 0) {
562                    throw new Exception(path.toString() + " must be a zero-length file");
563                }
564            }
565            FSDataOutputStream out = fs.create(path);
566            out.close();
567        }
568        catch (Exception ex) {
569            throw convertException(ex);
570        }
571    }
572
573    FsPermission createShortPermission(String permissions, Path path) throws ActionExecutorException {
574        if (permissions.length() == 3) {
575            char user = permissions.charAt(0);
576            char group = permissions.charAt(1);
577            char other = permissions.charAt(2);
578            int useri = user - '0';
579            int groupi = group - '0';
580            int otheri = other - '0';
581            int mask = useri * 100 + groupi * 10 + otheri;
582            short omask = Short.parseShort(Integer.toString(mask), 8);
583            return new FsPermission(omask);
584        }
585        else {
586            if (permissions.length() == 10) {
587                return FsPermission.valueOf(permissions);
588            }
589            else {
590                throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS010",
591                                                  "chmod, path [{0}] invalid permissions mask [{1}]", path, permissions);
592            }
593        }
594    }
595
596    @Override
597    public void check(Context context, WorkflowAction action) throws ActionExecutorException {
598    }
599
600    @Override
601    public void kill(Context context, WorkflowAction action) throws ActionExecutorException {
602    }
603
604    @Override
605    public void start(Context context, WorkflowAction action) throws ActionExecutorException {
606        try {
607            context.setStartData("-", "-", "-");
608            Element actionXml = XmlUtils.parseXml(action.getConf());
609            doOperations(context, actionXml);
610            context.setExecutionData("OK", null);
611        }
612        catch (Exception ex) {
613            throw convertException(ex);
614        }
615    }
616
617    @Override
618    public void end(Context context, WorkflowAction action) throws ActionExecutorException {
619        String externalStatus = action.getExternalStatus();
620        WorkflowAction.Status status = externalStatus.equals("OK") ? WorkflowAction.Status.OK :
621                                       WorkflowAction.Status.ERROR;
622        context.setEndData(status, getActionSignal(status));
623        if (!context.getProtoActionConf().getBoolean("oozie.action.keep.action.dir", false)) {
624            try {
625                FileSystem fs = context.getAppFileSystem();
626                fs.delete(context.getActionDir(), true);
627            }
628            catch (Exception ex) {
629                throw convertException(ex);
630            }
631        }
632    }
633
634    @Override
635    public boolean isCompleted(String externalStatus) {
636        return true;
637    }
638
639    /**
640     * @param context
641     * @return
642     * @throws HadoopAccessorException
643     * @throws IOException
644     * @throws URISyntaxException
645     */
646    public Path getRecoveryPath(Context context) throws HadoopAccessorException, IOException, URISyntaxException {
647        return new Path(context.getActionDir(), "fs-" + context.getRecoveryId());
648    }
649
650    private void checkGlobMax(Path[] pathArr) throws ActionExecutorException {
651        if(pathArr.length > maxGlobCount) {
652            throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS013",
653                    "too many globbed files/dirs to do FS operation");
654        }
655    }
656
657    public boolean supportsConfigurationJobXML() {
658        return true;
659    }
660}