001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.action.hadoop;
019    
020    import java.io.IOException;
021    import java.io.StringReader;
022    import java.net.URISyntaxException;
023    import java.util.Iterator;
024    import java.util.List;
025    
026    import org.apache.hadoop.fs.FSDataOutputStream;
027    import org.apache.hadoop.fs.FileStatus;
028    import org.apache.hadoop.fs.FileSystem;
029    import org.apache.hadoop.fs.Path;
030    import org.apache.hadoop.fs.permission.FsPermission;
031    import org.apache.hadoop.mapred.JobConf;
032    import org.apache.oozie.action.ActionExecutor;
033    import org.apache.oozie.action.ActionExecutorException;
034    import org.apache.oozie.client.WorkflowAction;
035    import org.apache.oozie.service.HadoopAccessorException;
036    import org.apache.oozie.service.HadoopAccessorService;
037    import org.apache.oozie.service.Services;
038    import org.apache.oozie.util.XConfiguration;
039    import org.apache.oozie.util.XmlUtils;
040    import org.jdom.Element;
041    
042    /**
043     * File system action executor. <p/> This executes the file system mkdir, move and delete commands
044     */
045    public class FsActionExecutor extends ActionExecutor {
046    
047        public FsActionExecutor() {
048            super("fs");
049        }
050    
051        Path getPath(Element element, String attribute) {
052            String str = element.getAttributeValue(attribute).trim();
053            return new Path(str);
054        }
055    
056        void validatePath(Path path, boolean withScheme) throws ActionExecutorException {
057            try {
058                String scheme = path.toUri().getScheme();
059                if (withScheme) {
060                    if (scheme == null) {
061                        throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS001",
062                                                          "Missing scheme in path [{0}]", path);
063                    }
064                    else {
065                        Services.get().get(HadoopAccessorService.class).checkSupportedFilesystem(path.toUri());
066                    }
067                }
068                else {
069                    if (scheme != null) {
070                        throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS002",
071                                                          "Scheme [{0}] not allowed in path [{1}]", scheme, path);
072                    }
073                }
074            }
075            catch (HadoopAccessorException hex) {
076                throw convertException(hex);
077            }
078        }
079    
080        Path resolveToFullPath(Path nameNode, Path path, boolean withScheme) throws ActionExecutorException {
081            Path fullPath;
082    
083            // If no nameNode is given, validate the path as-is and return it as-is
084            if (nameNode == null) {
085                validatePath(path, withScheme);
086                fullPath = path;
087            } else {
088                // If the path doesn't have a scheme or authority, use the nameNode which should have already been verified earlier
089                String pathScheme = path.toUri().getScheme();
090                String pathAuthority = path.toUri().getAuthority();
091                if (pathScheme == null || pathAuthority == null) {
092                    if (path.isAbsolute()) {
093                        String nameNodeSchemeAuthority = nameNode.toUri().getScheme() + "://" + nameNode.toUri().getAuthority();
094                        fullPath = new Path(nameNodeSchemeAuthority + path.toString());
095                    } else {
096                        throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS011", 
097                                "Path [{0}] cannot be relative", path);
098                    }
099                } else {
100                    // If the path has a scheme and authority, but its not the nameNode then validate the path as-is and return it as-is
101                    // If it is the nameNode, then it should have already been verified earlier so return it as-is
102                    if (!nameNode.toUri().getScheme().equals(pathScheme) || !nameNode.toUri().getAuthority().equals(pathAuthority)) {
103                        validatePath(path, withScheme);
104                    }
105                    fullPath = path;
106                }
107            }
108            return fullPath;
109        }
110    
111        void validateSameNN(Path source, Path dest) throws ActionExecutorException {
112            Path destPath = new Path(source, dest);
113            String t = destPath.toUri().getScheme() + destPath.toUri().getAuthority();
114            String s = source.toUri().getScheme() + source.toUri().getAuthority();
115    
116            //checking whether NN prefix of source and target is same. can modify this to adjust for a set of multiple whitelisted NN
117            if(!t.equals(s)) {
118                throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS007",
119                        "move, target NN URI different from that of source", dest);
120            }
121        }
122    
123        @SuppressWarnings("unchecked")
124        void doOperations(Context context, Element element) throws ActionExecutorException {
125            try {
126                FileSystem fs = context.getAppFileSystem();
127                boolean recovery = fs.exists(getRecoveryPath(context));
128                if (!recovery) {
129                    fs.mkdirs(getRecoveryPath(context));
130                }
131                
132                Path nameNodePath = null;
133                Element nameNodeElement = element.getChild("name-node", element.getNamespace());
134                if (nameNodeElement != null) {
135                    String nameNode = nameNodeElement.getTextTrim();
136                    if (nameNode != null) {
137                        nameNodePath = new Path(nameNode);
138                        // Verify the name node now
139                        validatePath(nameNodePath, true);
140                    }
141                }
142                
143                XConfiguration fsConf = new XConfiguration();
144                Path appPath = new Path(context.getWorkflow().getAppPath());
145                // app path could be a file
146                if (fs.isFile(appPath)) {
147                    appPath = appPath.getParent();
148                }
149                JavaActionExecutor.parseJobXmlAndConfiguration(context, element, appPath, fsConf);
150                
151                for (Element commandElement : (List<Element>) element.getChildren()) {
152                    String command = commandElement.getName();
153                    if (command.equals("mkdir")) {
154                        Path path = getPath(commandElement, "path");
155                        mkdir(context, fsConf, nameNodePath, path);
156                    }
157                    else {
158                        if (command.equals("delete")) {
159                            Path path = getPath(commandElement, "path");
160                            delete(context, fsConf,nameNodePath, path);
161                        }
162                        else {
163                            if (command.equals("move")) {
164                                Path source = getPath(commandElement, "source");
165                                Path target = getPath(commandElement, "target");
166                                move(context, fsConf,nameNodePath, source, target, recovery);
167                            }
168                            else {
169                                if (command.equals("chmod")) {
170                                    Path path = getPath(commandElement, "path");
171                                    boolean recursive = commandElement.getChild("recursive", commandElement.getNamespace()) != null;
172                                    String str = commandElement.getAttributeValue("dir-files");
173                                    boolean dirFiles = (str == null) || Boolean.parseBoolean(str);
174                                    String permissionsMask = commandElement.getAttributeValue("permissions").trim();
175                                    chmod(context, fsConf,nameNodePath, path, permissionsMask, dirFiles, recursive);
176                                }
177                                else {
178                                    if (command.equals("touchz")) {
179                                        Path path = getPath(commandElement, "path");
180                                        touchz(context, fsConf,nameNodePath, path);
181                                    }
182                                }
183                            }
184                        }
185                    }
186                }
187            }
188            catch (Exception ex) {
189                throw convertException(ex);
190            }
191        }
192    
193        /**
194         * @param path
195         * @param context
196         * @param fsConf
197         * @return FileSystem
198         * @throws HadoopAccessorException
199         */
200        private FileSystem getFileSystemFor(Path path, Context context, XConfiguration fsConf) throws HadoopAccessorException {
201            String user = context.getWorkflow().getUser();
202            HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
203            JobConf conf = has.createJobConf(path.toUri().getAuthority());
204            XConfiguration.copy(context.getProtoActionConf(), conf);
205            if (fsConf != null) {
206                XConfiguration.copy(fsConf, conf);
207            }
208            return has.createFileSystem(user, path.toUri(), conf);
209        }
210    
211        /**
212         * @param path
213         * @param user
214         * @param group
215         * @return FileSystem
216         * @throws HadoopAccessorException
217         */
218        private FileSystem getFileSystemFor(Path path, String user) throws HadoopAccessorException {
219            HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
220            JobConf jobConf = has.createJobConf(path.toUri().getAuthority());
221            return has.createFileSystem(user, path.toUri(), jobConf);
222        }
223    
224        void mkdir(Context context, Path path) throws ActionExecutorException {
225            mkdir(context, null, null, path);
226        }
227    
228        void mkdir(Context context, XConfiguration fsConf, Path nameNodePath, Path path) throws ActionExecutorException {
229            try {
230                path = resolveToFullPath(nameNodePath, path, true);
231                FileSystem fs = getFileSystemFor(path, context, fsConf);
232    
233                if (!fs.exists(path)) {
234                    if (!fs.mkdirs(path)) {
235                        throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS004",
236                                                          "mkdir, path [{0}] could not create directory", path);
237                    }
238                }
239            }
240            catch (Exception ex) {
241                throw convertException(ex);
242            }
243        }
244    
245        /**
246         * Delete path
247         *
248         * @param context
249         * @param path
250         * @throws ActionExecutorException
251         */
252        public void delete(Context context, Path path) throws ActionExecutorException {
253            delete(context, null, null, path);
254        }
255    
256        /**
257         * Delete path
258         *
259         * @param context
260         * @param fsConf
261         * @param nameNodePath 
262         * @param path
263         * @throws ActionExecutorException
264         */
265        public void delete(Context context, XConfiguration fsConf, Path nameNodePath, Path path) throws ActionExecutorException {
266            try {
267                path = resolveToFullPath(nameNodePath, path, true);
268                FileSystem fs = getFileSystemFor(path, context, fsConf);
269    
270                if (fs.exists(path)) {
271                    if (!fs.delete(path, true)) {
272                        throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS005",
273                                                          "delete, path [{0}] could not delete path", path);
274                    }
275                }
276            }
277            catch (Exception ex) {
278                throw convertException(ex);
279            }
280        }
281    
282        /**
283         * Delete path
284         *
285         * @param user
286         * @param group
287         * @param path
288         * @throws ActionExecutorException
289         */
290        public void delete(String user, String group, Path path) throws ActionExecutorException {
291            try {
292                validatePath(path, true);
293                FileSystem fs = getFileSystemFor(path, user);
294    
295                if (fs.exists(path)) {
296                    if (!fs.delete(path, true)) {
297                        throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS005",
298                                "delete, path [{0}] could not delete path", path);
299                    }
300                }
301            }
302            catch (Exception ex) {
303                throw convertException(ex);
304            }
305        }
306    
307        /**
308         * Move source to target
309         *
310         * @param context
311         * @param source
312         * @param target
313         * @param recovery
314         * @throws ActionExecutorException
315         */
316        public void move(Context context, Path source, Path target, boolean recovery) throws ActionExecutorException {
317            move(context, null, null, source, target, recovery);
318        }
319    
320        /**
321         * Move source to target
322         *
323         * @param context
324         * @param fsConf 
325         * @param nameNodePath
326         * @param source
327         * @param target
328         * @param recovery
329         * @throws ActionExecutorException
330         */
331        public void move(Context context, XConfiguration fsConf, Path nameNodePath, Path source, Path target, boolean recovery) 
332                throws ActionExecutorException {
333            try {
334                source = resolveToFullPath(nameNodePath, source, true);
335                validateSameNN(source, target);
336                FileSystem fs = getFileSystemFor(source, context, fsConf);
337    
338                if (!fs.exists(source) && !recovery) {
339                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS006",
340                                                      "move, source path [{0}] does not exist", source);
341                }
342    
343                if (!fs.rename(source, target) && !recovery) {
344                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS008",
345                                                      "move, could not move [{0}] to [{1}]", source, target);
346                }
347            }
348            catch (Exception ex) {
349                throw convertException(ex);
350            }
351        }
352    
353        void chmod(Context context, Path path, String permissions, boolean dirFiles, boolean recursive) throws ActionExecutorException {
354            chmod(context, null, null, path, permissions, dirFiles, recursive);
355        }
356    
357        void chmod(Context context, XConfiguration fsConf, Path nameNodePath, Path path, String permissions, boolean dirFiles, 
358                boolean recursive) throws ActionExecutorException {
359            try {
360                path = resolveToFullPath(nameNodePath, path, true);
361                FileSystem fs = getFileSystemFor(path, context, fsConf);
362    
363                if (!fs.exists(path)) {
364                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS009",
365                                                      "chmod, path [{0}] does not exist", path);
366                }
367    
368                FileStatus pathStatus = fs.getFileStatus(path);
369    
370                Path[] paths;
371                if (dirFiles && pathStatus.isDir()) {
372                    FileStatus[] filesStatus = fs.listStatus(path);
373                    paths = new Path[filesStatus.length];
374                    for (int i = 0; i < filesStatus.length; i++) {
375                        paths[i] = filesStatus[i].getPath();
376                        if (recursive && filesStatus[i].isDir()){
377                            chmod(context, fsConf, nameNodePath, paths[i], permissions, dirFiles, recursive);
378                        }
379                    }
380                }
381                else {
382                    paths = new Path[]{path};
383                }
384    
385                FsPermission newFsPermission = createShortPermission(permissions, path);
386                fs.setPermission(path, newFsPermission);
387                for (Path p : paths) {
388                    fs.setPermission(p, newFsPermission);
389                }
390            }
391            catch (Exception ex) {
392                throw convertException(ex);
393            }
394        }
395    
396        void touchz(Context context, Path path) throws ActionExecutorException {
397            touchz(context, null, null, path);
398        }
399    
400        void touchz(Context context, XConfiguration fsConf, Path nameNodePath, Path path) throws ActionExecutorException {
401            try {
402                path = resolveToFullPath(nameNodePath, path, true);
403                FileSystem fs = getFileSystemFor(path, context, fsConf);
404    
405                FileStatus st;
406                if (fs.exists(path)) {
407                    st = fs.getFileStatus(path);
408                    if (st.isDir()) {
409                        throw new Exception(path.toString() + " is a directory");
410                    } else if (st.getLen() != 0)
411                        throw new Exception(path.toString() + " must be a zero-length file");
412                }
413                FSDataOutputStream out = fs.create(path);
414                out.close();
415            }
416            catch (Exception ex) {
417                throw convertException(ex);
418            }
419        }
420    
421        FsPermission createShortPermission(String permissions, Path path) throws ActionExecutorException {
422            if (permissions.length() == 3) {
423                char user = permissions.charAt(0);
424                char group = permissions.charAt(1);
425                char other = permissions.charAt(2);
426                int useri = user - '0';
427                int groupi = group - '0';
428                int otheri = other - '0';
429                int mask = useri * 100 + groupi * 10 + otheri;
430                short omask = Short.parseShort(Integer.toString(mask), 8);
431                return new FsPermission(omask);
432            }
433            else {
434                if (permissions.length() == 10) {
435                    return FsPermission.valueOf(permissions);
436                }
437                else {
438                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS010",
439                                                      "chmod, path [{0}] invalid permissions mask [{1}]", path, permissions);
440                }
441            }
442        }
443    
444        @Override
445        public void check(Context context, WorkflowAction action) throws ActionExecutorException {
446        }
447    
448        @Override
449        public void kill(Context context, WorkflowAction action) throws ActionExecutorException {
450        }
451    
452        @Override
453        public void start(Context context, WorkflowAction action) throws ActionExecutorException {
454            try {
455                context.setStartData("-", "-", "-");
456                Element actionXml = XmlUtils.parseXml(action.getConf());
457                doOperations(context, actionXml);
458                context.setExecutionData("OK", null);
459            }
460            catch (Exception ex) {
461                throw convertException(ex);
462            }
463        }
464    
465        @Override
466        public void end(Context context, WorkflowAction action) throws ActionExecutorException {
467            String externalStatus = action.getExternalStatus();
468            WorkflowAction.Status status = externalStatus.equals("OK") ? WorkflowAction.Status.OK :
469                                           WorkflowAction.Status.ERROR;
470            context.setEndData(status, getActionSignal(status));
471            if (!context.getProtoActionConf().getBoolean("oozie.action.keep.action.dir", false)) {
472                try {
473                    FileSystem fs = context.getAppFileSystem();
474                    fs.delete(context.getActionDir(), true);
475                }
476                catch (Exception ex) {
477                    throw convertException(ex);
478                }
479            }
480        }
481    
482        @Override
483        public boolean isCompleted(String externalStatus) {
484            return true;
485        }
486    
487        /**
488         * @param context
489         * @return
490         * @throws HadoopAccessorException
491         * @throws IOException
492         * @throws URISyntaxException
493         */
494        public Path getRecoveryPath(Context context) throws HadoopAccessorException, IOException, URISyntaxException {
495            return new Path(context.getActionDir(), "fs-" + context.getRecoveryId());
496        }
497    
498    }