001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.action.hadoop;
019    
020    import java.io.IOException;
021    import java.io.StringReader;
022    import java.net.URISyntaxException;
023    import java.util.ArrayList;
024    import java.util.HashMap;
025    import java.util.Iterator;
026    import java.util.List;
027    import java.util.Map;
028    
029    import org.apache.hadoop.fs.FSDataOutputStream;
030    import org.apache.hadoop.fs.FileStatus;
031    import org.apache.hadoop.fs.FileSystem;
032    import org.apache.hadoop.fs.Path;
033    import org.apache.hadoop.fs.permission.FsPermission;
034    import org.apache.hadoop.mapred.JobConf;
035    import org.apache.oozie.action.ActionExecutor;
036    import org.apache.oozie.action.ActionExecutorException;
037    import org.apache.oozie.client.WorkflowAction;
038    import org.apache.oozie.service.HadoopAccessorException;
039    import org.apache.oozie.service.HadoopAccessorService;
040    import org.apache.oozie.service.Services;
041    import org.apache.oozie.util.XConfiguration;
042    import org.apache.oozie.util.XmlUtils;
043    import org.jdom.Element;
044    
045    /**
046     * File system action executor. <p/> This executes the file system mkdir, move and delete commands
047     */
048    public class FsActionExecutor extends ActionExecutor {
049    
050        public FsActionExecutor() {
051            super("fs");
052        }
053    
054        Path getPath(Element element, String attribute) {
055            String str = element.getAttributeValue(attribute).trim();
056            return new Path(str);
057        }
058    
059        void validatePath(Path path, boolean withScheme) throws ActionExecutorException {
060            try {
061                String scheme = path.toUri().getScheme();
062                if (withScheme) {
063                    if (scheme == null) {
064                        throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS001",
065                                                          "Missing scheme in path [{0}]", path);
066                    }
067                    else {
068                        Services.get().get(HadoopAccessorService.class).checkSupportedFilesystem(path.toUri());
069                    }
070                }
071                else {
072                    if (scheme != null) {
073                        throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS002",
074                                                          "Scheme [{0}] not allowed in path [{1}]", scheme, path);
075                    }
076                }
077            }
078            catch (HadoopAccessorException hex) {
079                throw convertException(hex);
080            }
081        }
082    
083        Path resolveToFullPath(Path nameNode, Path path, boolean withScheme) throws ActionExecutorException {
084            Path fullPath;
085    
086            // If no nameNode is given, validate the path as-is and return it as-is
087            if (nameNode == null) {
088                validatePath(path, withScheme);
089                fullPath = path;
090            } else {
091                // If the path doesn't have a scheme or authority, use the nameNode which should have already been verified earlier
092                String pathScheme = path.toUri().getScheme();
093                String pathAuthority = path.toUri().getAuthority();
094                if (pathScheme == null || pathAuthority == null) {
095                    if (path.isAbsolute()) {
096                        String nameNodeSchemeAuthority = nameNode.toUri().getScheme() + "://" + nameNode.toUri().getAuthority();
097                        fullPath = new Path(nameNodeSchemeAuthority + path.toString());
098                    } else {
099                        throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS011",
100                                "Path [{0}] cannot be relative", path);
101                    }
102                } else {
103                    // If the path has a scheme and authority, but its not the nameNode then validate the path as-is and return it as-is
104                    // If it is the nameNode, then it should have already been verified earlier so return it as-is
105                    if (!nameNode.toUri().getScheme().equals(pathScheme) || !nameNode.toUri().getAuthority().equals(pathAuthority)) {
106                        validatePath(path, withScheme);
107                    }
108                    fullPath = path;
109                }
110            }
111            return fullPath;
112        }
113    
114        void validateSameNN(Path source, Path dest) throws ActionExecutorException {
115            Path destPath = new Path(source, dest);
116            String t = destPath.toUri().getScheme() + destPath.toUri().getAuthority();
117            String s = source.toUri().getScheme() + source.toUri().getAuthority();
118    
119            //checking whether NN prefix of source and target is same. can modify this to adjust for a set of multiple whitelisted NN
120            if(!t.equals(s)) {
121                throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS007",
122                        "move, target NN URI different from that of source", dest);
123            }
124        }
125    
126        @SuppressWarnings("unchecked")
127        void doOperations(Context context, Element element) throws ActionExecutorException {
128            try {
129                FileSystem fs = context.getAppFileSystem();
130                boolean recovery = fs.exists(getRecoveryPath(context));
131                if (!recovery) {
132                    fs.mkdirs(getRecoveryPath(context));
133                }
134    
135                Path nameNodePath = null;
136                Element nameNodeElement = element.getChild("name-node", element.getNamespace());
137                if (nameNodeElement != null) {
138                    String nameNode = nameNodeElement.getTextTrim();
139                    if (nameNode != null) {
140                        nameNodePath = new Path(nameNode);
141                        // Verify the name node now
142                        validatePath(nameNodePath, true);
143                    }
144                }
145    
146                XConfiguration fsConf = new XConfiguration();
147                Path appPath = new Path(context.getWorkflow().getAppPath());
148                // app path could be a file
149                if (fs.isFile(appPath)) {
150                    appPath = appPath.getParent();
151                }
152                JavaActionExecutor.parseJobXmlAndConfiguration(context, element, appPath, fsConf);
153    
154                for (Element commandElement : (List<Element>) element.getChildren()) {
155                    String command = commandElement.getName();
156                    if (command.equals("mkdir")) {
157                        Path path = getPath(commandElement, "path");
158                        mkdir(context, fsConf, nameNodePath, path);
159                    }
160                    else {
161                        if (command.equals("delete")) {
162                            Path path = getPath(commandElement, "path");
163                            delete(context, fsConf, nameNodePath, path);
164                        }
165                        else {
166                            if (command.equals("move")) {
167                                Path source = getPath(commandElement, "source");
168                                Path target = getPath(commandElement, "target");
169                                move(context, fsConf, nameNodePath, source, target, recovery);
170                            }
171                            else {
172                                if (command.equals("chmod")) {
173                                    Path path = getPath(commandElement, "path");
174                                    boolean recursive = commandElement.getChild("recursive", commandElement.getNamespace()) != null;
175                                    String str = commandElement.getAttributeValue("dir-files");
176                                    boolean dirFiles = (str == null) || Boolean.parseBoolean(str);
177                                    String permissionsMask = commandElement.getAttributeValue("permissions").trim();
178                                    chmod(context, fsConf, nameNodePath, path, permissionsMask, dirFiles, recursive);
179                                }
180                                else {
181                                    if (command.equals("touchz")) {
182                                        Path path = getPath(commandElement, "path");
183                                        touchz(context, fsConf, nameNodePath, path);
184                                    }
185                                    else {
186                                        if (command.equals("chgrp")) {
187                                            Path path = getPath(commandElement, "path");
188                                            boolean recursive = commandElement.getChild("recursive",
189                                                    commandElement.getNamespace()) != null;
190                                            String group = commandElement.getAttributeValue("group");
191                                            String str = commandElement.getAttributeValue("dir-files");
192                                            boolean dirFiles = (str == null) || Boolean.parseBoolean(str);
193                                            chgrp(context, fsConf, nameNodePath, path, context.getWorkflow().getUser(),
194                                                    group, dirFiles, recursive);
195                                        }
196                                    }
197                                }
198                            }
199                        }
200                    }
201                }
202            }
203            catch (Exception ex) {
204                throw convertException(ex);
205            }
206        }
207    
208        void chgrp(Context context, XConfiguration fsConf, Path nameNodePath, Path path, String user, String group,
209                boolean dirFiles, boolean recursive) throws ActionExecutorException {
210    
211            HashMap<String, String> argsMap = new HashMap<String, String>();
212            argsMap.put("user", user);
213            argsMap.put("group", group);
214            try {
215                FileSystem fs = getFileSystemFor(path, context, fsConf);
216                recursiveFsOperation("chgrp", fs, nameNodePath, path, argsMap, dirFiles, recursive, true);
217            }
218            catch (Exception ex) {
219                throw convertException(ex);
220            }
221        }
222    
223        private void recursiveFsOperation(String op, FileSystem fs, Path nameNodePath, Path path,
224                Map<String, String> argsMap, boolean dirFiles, boolean recursive, boolean isRoot)
225                throws ActionExecutorException {
226    
227            try {
228                path = resolveToFullPath(nameNodePath, path, true);
229                if (!fs.exists(path)) {
230                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS009", op
231                            + ", path [{0}] does not exist", path);
232                }
233                FileStatus pathStatus = fs.getFileStatus(path);
234                List<Path> paths = new ArrayList<Path>();
235    
236                if (dirFiles && pathStatus.isDir()) {
237                    if (isRoot) {
238                        paths.add(path);
239                    }
240                    FileStatus[] filesStatus = fs.listStatus(path);
241                    for (int i = 0; i < filesStatus.length; i++) {
242                        Path p = filesStatus[i].getPath();
243                        paths.add(p);
244                        if (recursive && filesStatus[i].isDir()) {
245                            recursiveFsOperation(op, fs, null, p, argsMap, dirFiles, recursive, false);
246                        }
247                    }
248                }
249                else {
250                    paths.add(path);
251                }
252                for (Path p : paths) {
253                    doFsOperation(op, fs, p, argsMap);
254                }
255            }
256            catch (Exception ex) {
257                throw convertException(ex);
258            }
259        }
260    
261        private void doFsOperation(String op, FileSystem fs, Path p, Map<String, String> argsMap)
262                throws ActionExecutorException, IOException {
263            if (op.equals("chmod")) {
264                String permissions = argsMap.get("permissions");
265                FsPermission newFsPermission = createShortPermission(permissions, p);
266                fs.setPermission(p, newFsPermission);
267            }
268            else if (op.equals("chgrp")) {
269                String user = argsMap.get("user");
270                String group = argsMap.get("group");
271                fs.setOwner(p, user, group);
272            }
273        }
274    
275        /**
276         * @param path
277         * @param context
278         * @param fsConf
279         * @return FileSystem
280         * @throws HadoopAccessorException
281         */
282        private FileSystem getFileSystemFor(Path path, Context context, XConfiguration fsConf) throws HadoopAccessorException {
283            String user = context.getWorkflow().getUser();
284            HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
285            JobConf conf = has.createJobConf(path.toUri().getAuthority());
286            XConfiguration.copy(context.getProtoActionConf(), conf);
287            if (fsConf != null) {
288                XConfiguration.copy(fsConf, conf);
289            }
290            return has.createFileSystem(user, path.toUri(), conf);
291        }
292    
293        /**
294         * @param path
295         * @param user
296         * @param group
297         * @return FileSystem
298         * @throws HadoopAccessorException
299         */
300        private FileSystem getFileSystemFor(Path path, String user) throws HadoopAccessorException {
301            HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
302            JobConf jobConf = has.createJobConf(path.toUri().getAuthority());
303            return has.createFileSystem(user, path.toUri(), jobConf);
304        }
305    
306        void mkdir(Context context, Path path) throws ActionExecutorException {
307            mkdir(context, null, null, path);
308        }
309    
310        void mkdir(Context context, XConfiguration fsConf, Path nameNodePath, Path path) throws ActionExecutorException {
311            try {
312                path = resolveToFullPath(nameNodePath, path, true);
313                FileSystem fs = getFileSystemFor(path, context, fsConf);
314    
315                if (!fs.exists(path)) {
316                    if (!fs.mkdirs(path)) {
317                        throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS004",
318                                                          "mkdir, path [{0}] could not create directory", path);
319                    }
320                }
321            }
322            catch (Exception ex) {
323                throw convertException(ex);
324            }
325        }
326    
327        /**
328         * Delete path
329         *
330         * @param context
331         * @param path
332         * @throws ActionExecutorException
333         */
334        public void delete(Context context, Path path) throws ActionExecutorException {
335            delete(context, null, null, path);
336        }
337    
338        /**
339         * Delete path
340         *
341         * @param context
342         * @param fsConf
343         * @param nameNodePath
344         * @param path
345         * @throws ActionExecutorException
346         */
347        public void delete(Context context, XConfiguration fsConf, Path nameNodePath, Path path) throws ActionExecutorException {
348            try {
349                path = resolveToFullPath(nameNodePath, path, true);
350                FileSystem fs = getFileSystemFor(path, context, fsConf);
351    
352                if (fs.exists(path)) {
353                    if (!fs.delete(path, true)) {
354                        throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS005",
355                                                          "delete, path [{0}] could not delete path", path);
356                    }
357                }
358            }
359            catch (Exception ex) {
360                throw convertException(ex);
361            }
362        }
363    
364        /**
365         * Delete path
366         *
367         * @param user
368         * @param group
369         * @param path
370         * @throws ActionExecutorException
371         */
372        public void delete(String user, String group, Path path) throws ActionExecutorException {
373            try {
374                validatePath(path, true);
375                FileSystem fs = getFileSystemFor(path, user);
376    
377                if (fs.exists(path)) {
378                    if (!fs.delete(path, true)) {
379                        throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS005",
380                                "delete, path [{0}] could not delete path", path);
381                    }
382                }
383            }
384            catch (Exception ex) {
385                throw convertException(ex);
386            }
387        }
388    
389        /**
390         * Move source to target
391         *
392         * @param context
393         * @param source
394         * @param target
395         * @param recovery
396         * @throws ActionExecutorException
397         */
398        public void move(Context context, Path source, Path target, boolean recovery) throws ActionExecutorException {
399            move(context, null, null, source, target, recovery);
400        }
401    
402        /**
403         * Move source to target
404         *
405         * @param context
406         * @param fsConf
407         * @param nameNodePath
408         * @param source
409         * @param target
410         * @param recovery
411         * @throws ActionExecutorException
412         */
413        public void move(Context context, XConfiguration fsConf, Path nameNodePath, Path source, Path target, boolean recovery)
414                throws ActionExecutorException {
415            try {
416                source = resolveToFullPath(nameNodePath, source, true);
417                validateSameNN(source, target);
418                FileSystem fs = getFileSystemFor(source, context, fsConf);
419    
420                if (!fs.exists(source) && !recovery) {
421                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS006",
422                                                      "move, source path [{0}] does not exist", source);
423                }
424    
425                if (!fs.rename(source, target) && !recovery) {
426                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS008",
427                                                      "move, could not move [{0}] to [{1}]", source, target);
428                }
429            }
430            catch (Exception ex) {
431                throw convertException(ex);
432            }
433        }
434    
435        void chmod(Context context, Path path, String permissions, boolean dirFiles, boolean recursive) throws ActionExecutorException {
436            chmod(context, null, null, path, permissions, dirFiles, recursive);
437        }
438    
439        void chmod(Context context, XConfiguration fsConf, Path nameNodePath, Path path, String permissions,
440                boolean dirFiles, boolean recursive) throws ActionExecutorException {
441    
442            HashMap<String, String> argsMap = new HashMap<String, String>();
443            argsMap.put("permissions", permissions);
444            try {
445                FileSystem fs = getFileSystemFor(path, context, fsConf);
446                recursiveFsOperation("chmod", fs, nameNodePath, path, argsMap, dirFiles, recursive, true);
447            }
448            catch (Exception ex) {
449                throw convertException(ex);
450            }
451        }
452    
453        void touchz(Context context, Path path) throws ActionExecutorException {
454            touchz(context, null, null, path);
455        }
456    
457        void touchz(Context context, XConfiguration fsConf, Path nameNodePath, Path path) throws ActionExecutorException {
458            try {
459                path = resolveToFullPath(nameNodePath, path, true);
460                FileSystem fs = getFileSystemFor(path, context, fsConf);
461    
462                FileStatus st;
463                if (fs.exists(path)) {
464                    st = fs.getFileStatus(path);
465                    if (st.isDir()) {
466                        throw new Exception(path.toString() + " is a directory");
467                    } else if (st.getLen() != 0)
468                        throw new Exception(path.toString() + " must be a zero-length file");
469                }
470                FSDataOutputStream out = fs.create(path);
471                out.close();
472            }
473            catch (Exception ex) {
474                throw convertException(ex);
475            }
476        }
477    
478        FsPermission createShortPermission(String permissions, Path path) throws ActionExecutorException {
479            if (permissions.length() == 3) {
480                char user = permissions.charAt(0);
481                char group = permissions.charAt(1);
482                char other = permissions.charAt(2);
483                int useri = user - '0';
484                int groupi = group - '0';
485                int otheri = other - '0';
486                int mask = useri * 100 + groupi * 10 + otheri;
487                short omask = Short.parseShort(Integer.toString(mask), 8);
488                return new FsPermission(omask);
489            }
490            else {
491                if (permissions.length() == 10) {
492                    return FsPermission.valueOf(permissions);
493                }
494                else {
495                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS010",
496                                                      "chmod, path [{0}] invalid permissions mask [{1}]", path, permissions);
497                }
498            }
499        }
500    
501        @Override
502        public void check(Context context, WorkflowAction action) throws ActionExecutorException {
503        }
504    
505        @Override
506        public void kill(Context context, WorkflowAction action) throws ActionExecutorException {
507        }
508    
509        @Override
510        public void start(Context context, WorkflowAction action) throws ActionExecutorException {
511            try {
512                context.setStartData("-", "-", "-");
513                Element actionXml = XmlUtils.parseXml(action.getConf());
514                doOperations(context, actionXml);
515                context.setExecutionData("OK", null);
516            }
517            catch (Exception ex) {
518                throw convertException(ex);
519            }
520        }
521    
522        @Override
523        public void end(Context context, WorkflowAction action) throws ActionExecutorException {
524            String externalStatus = action.getExternalStatus();
525            WorkflowAction.Status status = externalStatus.equals("OK") ? WorkflowAction.Status.OK :
526                                           WorkflowAction.Status.ERROR;
527            context.setEndData(status, getActionSignal(status));
528            if (!context.getProtoActionConf().getBoolean("oozie.action.keep.action.dir", false)) {
529                try {
530                    FileSystem fs = context.getAppFileSystem();
531                    fs.delete(context.getActionDir(), true);
532                }
533                catch (Exception ex) {
534                    throw convertException(ex);
535                }
536            }
537        }
538    
539        @Override
540        public boolean isCompleted(String externalStatus) {
541            return true;
542        }
543    
544        /**
545         * @param context
546         * @return
547         * @throws HadoopAccessorException
548         * @throws IOException
549         * @throws URISyntaxException
550         */
551        public Path getRecoveryPath(Context context) throws HadoopAccessorException, IOException, URISyntaxException {
552            return new Path(context.getActionDir(), "fs-" + context.getRecoveryId());
553        }
554    
555    }