001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.action.hadoop;
019
020import java.io.IOException;
021import java.net.URISyntaxException;
022import java.util.ArrayList;
023import java.util.HashMap;
024import java.util.List;
025import java.util.Map;
026
027import org.apache.hadoop.fs.FSDataOutputStream;
028import org.apache.hadoop.fs.FileStatus;
029import org.apache.hadoop.fs.FileSystem;
030import org.apache.hadoop.fs.FileUtil;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.fs.permission.FsPermission;
033import org.apache.hadoop.mapred.JobConf;
034import org.apache.oozie.action.ActionExecutor;
035import org.apache.oozie.action.ActionExecutorException;
036import org.apache.oozie.client.WorkflowAction;
037import org.apache.oozie.service.HadoopAccessorException;
038import org.apache.oozie.service.HadoopAccessorService;
039import org.apache.oozie.service.Services;
040import org.apache.oozie.util.XConfiguration;
041import org.apache.oozie.util.XmlUtils;
042import org.jdom.Element;
043
044/**
045 * File system action executor. <p/> This executes the file system mkdir, move and delete commands
046 */
047public class FsActionExecutor extends ActionExecutor {
048
049    private final int maxGlobCount;
050
051    public FsActionExecutor() {
052        super("fs");
053        maxGlobCount = getOozieConf().getInt(LauncherMapper.CONF_OOZIE_ACTION_FS_GLOB_MAX,
054                LauncherMapper.GLOB_MAX_DEFAULT);
055    }
056
057    Path getPath(Element element, String attribute) {
058        String str = element.getAttributeValue(attribute).trim();
059        return new Path(str);
060    }
061
062    void validatePath(Path path, boolean withScheme) throws ActionExecutorException {
063        try {
064            String scheme = path.toUri().getScheme();
065            if (withScheme) {
066                if (scheme == null) {
067                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS001",
068                                                      "Missing scheme in path [{0}]", path);
069                }
070                else {
071                    Services.get().get(HadoopAccessorService.class).checkSupportedFilesystem(path.toUri());
072                }
073            }
074            else {
075                if (scheme != null) {
076                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS002",
077                                                      "Scheme [{0}] not allowed in path [{1}]", scheme, path);
078                }
079            }
080        }
081        catch (HadoopAccessorException hex) {
082            throw convertException(hex);
083        }
084    }
085
086    Path resolveToFullPath(Path nameNode, Path path, boolean withScheme) throws ActionExecutorException {
087        Path fullPath;
088
089        // If no nameNode is given, validate the path as-is and return it as-is
090        if (nameNode == null) {
091            validatePath(path, withScheme);
092            fullPath = path;
093        } else {
094            // If the path doesn't have a scheme or authority, use the nameNode which should have already been verified earlier
095            String pathScheme = path.toUri().getScheme();
096            String pathAuthority = path.toUri().getAuthority();
097            if (pathScheme == null || pathAuthority == null) {
098                if (path.isAbsolute()) {
099                    String nameNodeSchemeAuthority = nameNode.toUri().getScheme() + "://" + nameNode.toUri().getAuthority();
100                    fullPath = new Path(nameNodeSchemeAuthority + path.toString());
101                } else {
102                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS011",
103                            "Path [{0}] cannot be relative", path);
104                }
105            } else {
106                // If the path has a scheme and authority, but its not the nameNode then validate the path as-is and return it as-is
107                // If it is the nameNode, then it should have already been verified earlier so return it as-is
108                if (!nameNode.toUri().getScheme().equals(pathScheme) || !nameNode.toUri().getAuthority().equals(pathAuthority)) {
109                    validatePath(path, withScheme);
110                }
111                fullPath = path;
112            }
113        }
114        return fullPath;
115    }
116
117    void validateSameNN(Path source, Path dest) throws ActionExecutorException {
118        Path destPath = new Path(source, dest);
119        String t = destPath.toUri().getScheme() + destPath.toUri().getAuthority();
120        String s = source.toUri().getScheme() + source.toUri().getAuthority();
121
122        //checking whether NN prefix of source and target is same. can modify this to adjust for a set of multiple whitelisted NN
123        if(!t.equals(s)) {
124            throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS007",
125                    "move, target NN URI different from that of source", dest);
126        }
127    }
128
129    @SuppressWarnings("unchecked")
130    void doOperations(Context context, Element element) throws ActionExecutorException {
131        try {
132            FileSystem fs = context.getAppFileSystem();
133            boolean recovery = fs.exists(getRecoveryPath(context));
134            if (!recovery) {
135                fs.mkdirs(getRecoveryPath(context));
136            }
137
138            Path nameNodePath = null;
139            Element nameNodeElement = element.getChild("name-node", element.getNamespace());
140            if (nameNodeElement != null) {
141                String nameNode = nameNodeElement.getTextTrim();
142                if (nameNode != null) {
143                    nameNodePath = new Path(nameNode);
144                    // Verify the name node now
145                    validatePath(nameNodePath, true);
146                }
147            }
148
149            XConfiguration fsConf = new XConfiguration();
150            Path appPath = new Path(context.getWorkflow().getAppPath());
151            // app path could be a file
152            if (fs.isFile(appPath)) {
153                appPath = appPath.getParent();
154            }
155            JavaActionExecutor.parseJobXmlAndConfiguration(context, element, appPath, fsConf);
156
157            for (Element commandElement : (List<Element>) element.getChildren()) {
158                String command = commandElement.getName();
159                if (command.equals("mkdir")) {
160                    Path path = getPath(commandElement, "path");
161                    mkdir(context, fsConf, nameNodePath, path);
162                }
163                else {
164                    if (command.equals("delete")) {
165                        Path path = getPath(commandElement, "path");
166                        delete(context, fsConf, nameNodePath, path);
167                    }
168                    else {
169                        if (command.equals("move")) {
170                            Path source = getPath(commandElement, "source");
171                            Path target = getPath(commandElement, "target");
172                            move(context, fsConf, nameNodePath, source, target, recovery);
173                        }
174                        else {
175                            if (command.equals("chmod")) {
176                                Path path = getPath(commandElement, "path");
177                                boolean recursive = commandElement.getChild("recursive", commandElement.getNamespace()) != null;
178                                String str = commandElement.getAttributeValue("dir-files");
179                                boolean dirFiles = (str == null) || Boolean.parseBoolean(str);
180                                String permissionsMask = commandElement.getAttributeValue("permissions").trim();
181                                chmod(context, fsConf, nameNodePath, path, permissionsMask, dirFiles, recursive);
182                            }
183                            else {
184                                if (command.equals("touchz")) {
185                                    Path path = getPath(commandElement, "path");
186                                    touchz(context, fsConf, nameNodePath, path);
187                                }
188                                else {
189                                    if (command.equals("chgrp")) {
190                                        Path path = getPath(commandElement, "path");
191                                        boolean recursive = commandElement.getChild("recursive",
192                                                commandElement.getNamespace()) != null;
193                                        String group = commandElement.getAttributeValue("group");
194                                        String str = commandElement.getAttributeValue("dir-files");
195                                        boolean dirFiles = (str == null) || Boolean.parseBoolean(str);
196                                        chgrp(context, fsConf, nameNodePath, path, context.getWorkflow().getUser(),
197                                                group, dirFiles, recursive);
198                                    }
199                                }
200                            }
201                        }
202                    }
203                }
204            }
205        }
206        catch (Exception ex) {
207            throw convertException(ex);
208        }
209    }
210
211    void chgrp(Context context, XConfiguration fsConf, Path nameNodePath, Path path, String user, String group,
212            boolean dirFiles, boolean recursive) throws ActionExecutorException {
213
214        HashMap<String, String> argsMap = new HashMap<String, String>();
215        argsMap.put("user", user);
216        argsMap.put("group", group);
217        try {
218            FileSystem fs = getFileSystemFor(path, context, fsConf);
219            path = resolveToFullPath(nameNodePath, path, true);
220            Path[] pathArr = FileUtil.stat2Paths(fs.globStatus(path));
221            if (pathArr == null || pathArr.length == 0) {
222                throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS009", "chgrp"
223                        + ", path(s) that matches [{0}] does not exist", path);
224            }
225            checkGlobMax(pathArr);
226            for (Path p : pathArr) {
227                recursiveFsOperation("chgrp", fs, nameNodePath, p, argsMap, dirFiles, recursive, true);
228            }
229        }
230        catch (Exception ex) {
231            throw convertException(ex);
232        }
233    }
234
235    private void recursiveFsOperation(String op, FileSystem fs, Path nameNodePath, Path path,
236            Map<String, String> argsMap, boolean dirFiles, boolean recursive, boolean isRoot)
237            throws ActionExecutorException {
238
239        try {
240            FileStatus pathStatus = fs.getFileStatus(path);
241            List<Path> paths = new ArrayList<Path>();
242
243            if (dirFiles && pathStatus.isDir()) {
244                if (isRoot) {
245                    paths.add(path);
246                }
247                FileStatus[] filesStatus = fs.listStatus(path);
248                for (int i = 0; i < filesStatus.length; i++) {
249                    Path p = filesStatus[i].getPath();
250                    paths.add(p);
251                    if (recursive && filesStatus[i].isDir()) {
252                        recursiveFsOperation(op, fs, null, p, argsMap, dirFiles, recursive, false);
253                    }
254                }
255            }
256            else {
257                paths.add(path);
258            }
259            for (Path p : paths) {
260                doFsOperation(op, fs, p, argsMap);
261            }
262        }
263        catch (Exception ex) {
264            throw convertException(ex);
265        }
266    }
267
268    private void doFsOperation(String op, FileSystem fs, Path p, Map<String, String> argsMap)
269            throws ActionExecutorException, IOException {
270        if (op.equals("chmod")) {
271            String permissions = argsMap.get("permissions");
272            FsPermission newFsPermission = createShortPermission(permissions, p);
273            fs.setPermission(p, newFsPermission);
274        }
275        else if (op.equals("chgrp")) {
276            String user = argsMap.get("user");
277            String group = argsMap.get("group");
278            fs.setOwner(p, user, group);
279        }
280    }
281
282    /**
283     * @param path
284     * @param context
285     * @param fsConf
286     * @return FileSystem
287     * @throws HadoopAccessorException
288     */
289    private FileSystem getFileSystemFor(Path path, Context context, XConfiguration fsConf) throws HadoopAccessorException {
290        String user = context.getWorkflow().getUser();
291        HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
292        JobConf conf = has.createJobConf(path.toUri().getAuthority());
293        XConfiguration.copy(context.getProtoActionConf(), conf);
294        if (fsConf != null) {
295            XConfiguration.copy(fsConf, conf);
296        }
297        return has.createFileSystem(user, path.toUri(), conf);
298    }
299
300    /**
301     * @param path
302     * @param user
303     * @param group
304     * @return FileSystem
305     * @throws HadoopAccessorException
306     */
307    private FileSystem getFileSystemFor(Path path, String user) throws HadoopAccessorException {
308        HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
309        JobConf jobConf = has.createJobConf(path.toUri().getAuthority());
310        return has.createFileSystem(user, path.toUri(), jobConf);
311    }
312
313    void mkdir(Context context, Path path) throws ActionExecutorException {
314        mkdir(context, null, null, path);
315    }
316
317    void mkdir(Context context, XConfiguration fsConf, Path nameNodePath, Path path) throws ActionExecutorException {
318        try {
319            path = resolveToFullPath(nameNodePath, path, true);
320            FileSystem fs = getFileSystemFor(path, context, fsConf);
321
322            if (!fs.exists(path)) {
323                if (!fs.mkdirs(path)) {
324                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS004",
325                                                      "mkdir, path [{0}] could not create directory", path);
326                }
327            }
328        }
329        catch (Exception ex) {
330            throw convertException(ex);
331        }
332    }
333
334    /**
335     * Delete path
336     *
337     * @param context
338     * @param path
339     * @throws ActionExecutorException
340     */
341    public void delete(Context context, Path path) throws ActionExecutorException {
342        delete(context, null, null, path);
343    }
344
345    /**
346     * Delete path
347     *
348     * @param context
349     * @param fsConf
350     * @param nameNodePath
351     * @param path
352     * @throws ActionExecutorException
353     */
354    public void delete(Context context, XConfiguration fsConf, Path nameNodePath, Path path) throws ActionExecutorException {
355        try {
356            path = resolveToFullPath(nameNodePath, path, true);
357            FileSystem fs = getFileSystemFor(path, context, fsConf);
358            Path[] pathArr = FileUtil.stat2Paths(fs.globStatus(path));
359            if (pathArr != null && pathArr.length > 0) {
360                checkGlobMax(pathArr);
361                for (Path p : pathArr) {
362                    if (fs.exists(p)) {
363                        if (!fs.delete(p, true)) {
364                            throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS005",
365                                    "delete, path [{0}] could not delete path", p);
366                        }
367                    }
368                }
369            }
370        }
371        catch (Exception ex) {
372            throw convertException(ex);
373        }
374    }
375
376    /**
377     * Delete path
378     *
379     * @param user
380     * @param group
381     * @param path
382     * @throws ActionExecutorException
383     */
384    public void delete(String user, String group, Path path) throws ActionExecutorException {
385        try {
386            validatePath(path, true);
387            FileSystem fs = getFileSystemFor(path, user);
388
389            if (fs.exists(path)) {
390                if (!fs.delete(path, true)) {
391                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS005",
392                            "delete, path [{0}] could not delete path", path);
393                }
394            }
395        }
396        catch (Exception ex) {
397            throw convertException(ex);
398        }
399    }
400
401    /**
402     * Move source to target
403     *
404     * @param context
405     * @param source
406     * @param target
407     * @param recovery
408     * @throws ActionExecutorException
409     */
410    public void move(Context context, Path source, Path target, boolean recovery) throws ActionExecutorException {
411        move(context, null, null, source, target, recovery);
412    }
413
414    /**
415     * Move source to target
416     *
417     * @param context
418     * @param fsConf
419     * @param nameNodePath
420     * @param source
421     * @param target
422     * @param recovery
423     * @throws ActionExecutorException
424     */
425    public void move(Context context, XConfiguration fsConf, Path nameNodePath, Path source, Path target, boolean recovery)
426            throws ActionExecutorException {
427        try {
428            source = resolveToFullPath(nameNodePath, source, true);
429            validateSameNN(source, target);
430            FileSystem fs = getFileSystemFor(source, context, fsConf);
431            Path[] pathArr = FileUtil.stat2Paths(fs.globStatus(source));
432            if (( pathArr == null || pathArr.length == 0 ) ){
433                if (!recovery) {
434                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS006",
435                        "move, source path [{0}] does not exist", source);
436                } else {
437                    return;
438                }
439            }
440            if (pathArr.length > 1 && (!fs.exists(target) || fs.isFile(target))) {
441                if(!recovery) {
442                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS012",
443                            "move, could not rename multiple sources to the same target name");
444                } else {
445                    return;
446                }
447            }
448            checkGlobMax(pathArr);
449            for (Path p : pathArr) {
450                if (!fs.rename(p, target) && !recovery) {
451                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS008",
452                            "move, could not move [{0}] to [{1}]", p, target);
453                }
454            }
455        }
456        catch (Exception ex) {
457            throw convertException(ex);
458        }
459    }
460
461    void chmod(Context context, Path path, String permissions, boolean dirFiles, boolean recursive) throws ActionExecutorException {
462        chmod(context, null, null, path, permissions, dirFiles, recursive);
463    }
464
465    void chmod(Context context, XConfiguration fsConf, Path nameNodePath, Path path, String permissions,
466            boolean dirFiles, boolean recursive) throws ActionExecutorException {
467
468        HashMap<String, String> argsMap = new HashMap<String, String>();
469        argsMap.put("permissions", permissions);
470        try {
471            FileSystem fs = getFileSystemFor(path, context, fsConf);
472            path = resolveToFullPath(nameNodePath, path, true);
473            Path[] pathArr = FileUtil.stat2Paths(fs.globStatus(path));
474            if (pathArr == null || pathArr.length == 0) {
475                throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS009", "chmod"
476                        + ", path(s) that matches [{0}] does not exist", path);
477            }
478            checkGlobMax(pathArr);
479            for (Path p : pathArr) {
480                recursiveFsOperation("chmod", fs, nameNodePath, p, argsMap, dirFiles, recursive, true);
481            }
482
483        }
484        catch (Exception ex) {
485            throw convertException(ex);
486        }
487    }
488
489    void touchz(Context context, Path path) throws ActionExecutorException {
490        touchz(context, null, null, path);
491    }
492
493    void touchz(Context context, XConfiguration fsConf, Path nameNodePath, Path path) throws ActionExecutorException {
494        try {
495            path = resolveToFullPath(nameNodePath, path, true);
496            FileSystem fs = getFileSystemFor(path, context, fsConf);
497
498            FileStatus st;
499            if (fs.exists(path)) {
500                st = fs.getFileStatus(path);
501                if (st.isDir()) {
502                    throw new Exception(path.toString() + " is a directory");
503                } else if (st.getLen() != 0)
504                    throw new Exception(path.toString() + " must be a zero-length file");
505            }
506            FSDataOutputStream out = fs.create(path);
507            out.close();
508        }
509        catch (Exception ex) {
510            throw convertException(ex);
511        }
512    }
513
514    FsPermission createShortPermission(String permissions, Path path) throws ActionExecutorException {
515        if (permissions.length() == 3) {
516            char user = permissions.charAt(0);
517            char group = permissions.charAt(1);
518            char other = permissions.charAt(2);
519            int useri = user - '0';
520            int groupi = group - '0';
521            int otheri = other - '0';
522            int mask = useri * 100 + groupi * 10 + otheri;
523            short omask = Short.parseShort(Integer.toString(mask), 8);
524            return new FsPermission(omask);
525        }
526        else {
527            if (permissions.length() == 10) {
528                return FsPermission.valueOf(permissions);
529            }
530            else {
531                throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS010",
532                                                  "chmod, path [{0}] invalid permissions mask [{1}]", path, permissions);
533            }
534        }
535    }
536
537    @Override
538    public void check(Context context, WorkflowAction action) throws ActionExecutorException {
539    }
540
541    @Override
542    public void kill(Context context, WorkflowAction action) throws ActionExecutorException {
543    }
544
545    @Override
546    public void start(Context context, WorkflowAction action) throws ActionExecutorException {
547        try {
548            context.setStartData("-", "-", "-");
549            Element actionXml = XmlUtils.parseXml(action.getConf());
550            doOperations(context, actionXml);
551            context.setExecutionData("OK", null);
552        }
553        catch (Exception ex) {
554            throw convertException(ex);
555        }
556    }
557
558    @Override
559    public void end(Context context, WorkflowAction action) throws ActionExecutorException {
560        String externalStatus = action.getExternalStatus();
561        WorkflowAction.Status status = externalStatus.equals("OK") ? WorkflowAction.Status.OK :
562                                       WorkflowAction.Status.ERROR;
563        context.setEndData(status, getActionSignal(status));
564        if (!context.getProtoActionConf().getBoolean("oozie.action.keep.action.dir", false)) {
565            try {
566                FileSystem fs = context.getAppFileSystem();
567                fs.delete(context.getActionDir(), true);
568            }
569            catch (Exception ex) {
570                throw convertException(ex);
571            }
572        }
573    }
574
575    @Override
576    public boolean isCompleted(String externalStatus) {
577        return true;
578    }
579
580    /**
581     * @param context
582     * @return
583     * @throws HadoopAccessorException
584     * @throws IOException
585     * @throws URISyntaxException
586     */
587    public Path getRecoveryPath(Context context) throws HadoopAccessorException, IOException, URISyntaxException {
588        return new Path(context.getActionDir(), "fs-" + context.getRecoveryId());
589    }
590
591    private void checkGlobMax(Path[] pathArr) throws ActionExecutorException {
592        if(pathArr.length > maxGlobCount) {
593            throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS013",
594                    "too many globbed files/dirs to do FS operation");
595        }
596    }
597
598}