001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.action.hadoop;
019    
020    import java.io.IOException;
021    import java.net.URISyntaxException;
022    import java.util.List;
023    
024    import org.apache.hadoop.conf.Configuration;
025    import org.apache.hadoop.fs.FileStatus;
026    import org.apache.hadoop.fs.FileSystem;
027    import org.apache.hadoop.fs.Path;
028    import org.apache.hadoop.fs.permission.FsPermission;
029    import org.apache.oozie.action.ActionExecutor;
030    import org.apache.oozie.action.ActionExecutorException;
031    import org.apache.oozie.client.WorkflowAction;
032    import org.apache.oozie.service.HadoopAccessorException;
033    import org.apache.oozie.service.HadoopAccessorService;
034    import org.apache.oozie.service.Services;
035    import org.apache.oozie.util.XmlUtils;
036    import org.jdom.Element;
037    
038    /**
039     * File system action executor. <p/> This executes the file system mkdir, move and delete commands
040     */
041    public class FsActionExecutor extends ActionExecutor {
042    
043        public FsActionExecutor() {
044            super("fs");
045        }
046    
047        Path getPath(Element element, String attribute) {
048            String str = element.getAttributeValue(attribute).trim();
049            return new Path(str);
050        }
051    
052        void validatePath(Path path, boolean withScheme) throws ActionExecutorException {
053            String scheme = path.toUri().getScheme();
054            if (withScheme) {
055                if (scheme == null) {
056                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS001",
057                                                      "Missing scheme in path [{0}]", path);
058                }
059                else {
060                    if (!scheme.equals("hdfs")) {
061                        throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS002",
062                                                          "Scheme [{0}] not supported in path [{1}]", scheme, path);
063                    }
064                }
065            }
066            else { 
067                if (scheme != null) {
068                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS003",
069                                                      "Scheme [{0}] not allowed in path [{1}]", scheme, path);
070                }
071            }
072        }
073    
074        void validateSameNN(Path source, Path dest) throws ActionExecutorException {
075            Path destPath = new Path(source, dest);
076            String t = destPath.toUri().getScheme() + destPath.toUri().getAuthority();
077            String s = source.toUri().getScheme() + source.toUri().getAuthority();
078    
079            //checking whether NN prefix of source and target is same. can modify this to adjust for a set of multiple whitelisted NN
080            if(!t.equals(s)) {
081                throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS007",
082                        "move, target NN URI different from that of source", dest);
083            }
084        }
085    
086        @SuppressWarnings("unchecked")
087        void doOperations(Context context, Element element) throws ActionExecutorException {
088            try {
089                FileSystem fs = context.getAppFileSystem();
090                boolean recovery = fs.exists(getRecoveryPath(context));
091                if (!recovery) {
092                    fs.mkdirs(getRecoveryPath(context));
093                }
094                for (Element commandElement : (List<Element>) element.getChildren()) {
095                    String command = commandElement.getName();
096                    if (command.equals("mkdir")) {
097                        Path path = getPath(commandElement, "path");
098                        mkdir(context, path);
099                    }
100                    else {
101                        if (command.equals("delete")) {
102                            Path path = getPath(commandElement, "path");
103                            delete(context, path);
104                        }
105                        else {
106                            if (command.equals("move")) {
107                                Path source = getPath(commandElement, "source");
108                                Path target = getPath(commandElement, "target");
109                                move(context, source, target, recovery);
110                            }
111                            else {
112                                if (command.equals("chmod")) {
113                                    Path path = getPath(commandElement, "path");
114                                    String str = commandElement.getAttributeValue("dir-files");
115                                    boolean dirFiles = (str == null) || Boolean.parseBoolean(str);
116                                    String permissionsMask = commandElement.getAttributeValue("permissions").trim();
117                                    chmod(context, path, permissionsMask, dirFiles);
118                                }
119                            }
120                        }
121                    }
122                }
123            }
124            catch (Exception ex) {
125                throw convertException(ex);
126            }
127        }
128    
129        /**
130         * @param path
131         * @param context
132         * @return FileSystem
133         * @throws HadoopAccessorException
134         */
135        private FileSystem getFileSystemFor(Path path, Context context) throws HadoopAccessorException {
136            String user = context.getWorkflow().getUser();
137            String group = context.getWorkflow().getGroup();
138            return Services.get().get(HadoopAccessorService.class).createFileSystem(user, group, path.toUri(),
139                    new Configuration());
140        }
141    
142        /**
143         * @param path
144         * @param user
145         * @param group
146         * @return FileSystem
147         * @throws HadoopAccessorException
148         */
149        private FileSystem getFileSystemFor(Path path, String user, String group) throws HadoopAccessorException {
150            return Services.get().get(HadoopAccessorService.class).createFileSystem(user, group, path.toUri(),
151                    new Configuration());
152        }
153    
154        void mkdir(Context context, Path path) throws ActionExecutorException {
155            try {
156                validatePath(path, true);
157                FileSystem fs = getFileSystemFor(path, context);
158    
159                if (!fs.exists(path)) {
160                    if (!fs.mkdirs(path)) {
161                        throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS004",
162                                                          "mkdir, path [{0}] could not create directory", path);
163                    }
164                }
165            }
166            catch (Exception ex) {
167                throw convertException(ex);
168            }
169        }
170    
171        /**
172         * Delete path
173         *
174         * @param context
175         * @param path
176         * @throws ActionExecutorException
177         */
178        public void delete(Context context, Path path) throws ActionExecutorException {
179            try {
180                validatePath(path, true);
181                FileSystem fs = getFileSystemFor(path, context);
182    
183                if (fs.exists(path)) {
184                    if (!fs.delete(path, true)) {
185                        throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS005",
186                                                          "delete, path [{0}] could not delete path", path);
187                    }
188                }
189            }
190            catch (Exception ex) {
191                throw convertException(ex);
192            }
193        }
194    
195        /**
196         * Delete path
197         *
198         * @param user
199         * @param group
200         * @param path
201         * @throws ActionExecutorException
202         */
203        public void delete(String user, String group, Path path) throws ActionExecutorException {
204            try {
205                validatePath(path, true);
206                FileSystem fs = getFileSystemFor(path, user, group);
207    
208                if (fs.exists(path)) {
209                    if (!fs.delete(path, true)) {
210                        throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS005",
211                                "delete, path [{0}] could not delete path", path);
212                    }
213                }
214            }
215            catch (Exception ex) {
216                throw convertException(ex);
217            }
218        }
219    
220        /**
221         * Move source to target
222         *
223         * @param context
224         * @param source
225         * @param target
226         * @param recovery
227         * @throws ActionExecutorException
228         */
229        public void move(Context context, Path source, Path target, boolean recovery) throws ActionExecutorException {
230            try {
231                validatePath(source, true);
232                validateSameNN(source, target);
233                FileSystem fs = getFileSystemFor(source, context);
234    
235                if (!fs.exists(source) && !recovery) {
236                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS006",
237                                                      "move, source path [{0}] does not exist", source);
238                }
239    
240                if (!fs.rename(source, target) && !recovery) {
241                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS008",
242                                                      "move, could not move [{0}] to [{1}]", source, target);
243                }
244            }
245            catch (Exception ex) {
246                throw convertException(ex);
247            }
248        }
249    
250        void chmod(Context context, Path path, String permissions, boolean dirFiles) throws ActionExecutorException {
251            try {
252                validatePath(path, true);
253                FileSystem fs = getFileSystemFor(path, context);
254    
255                if (!fs.exists(path)) {
256                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS009",
257                                                      "chmod, path [{0}] does not exist", path);
258                }
259    
260                FileStatus pathStatus = fs.getFileStatus(path);
261    
262                Path[] paths;
263                if (dirFiles && pathStatus.isDir()) {
264                    FileStatus[] filesStatus = fs.listStatus(path);
265                    paths = new Path[filesStatus.length];
266                    for (int i = 0; i < filesStatus.length; i++) {
267                        paths[i] = filesStatus[i].getPath();
268                    }
269                }
270                else {
271                    paths = new Path[]{path};
272                }
273    
274                FsPermission newFsPermission = createShortPermission(permissions, path);
275                fs.setPermission(path, newFsPermission);
276                for (Path p : paths) {
277                    fs.setPermission(p, newFsPermission);
278                }
279            }
280            catch (Exception ex) {
281                throw convertException(ex);
282            }
283        }
284    
285        FsPermission createShortPermission(String permissions, Path path) throws ActionExecutorException {
286            if (permissions.length() == 3) {
287                char user = permissions.charAt(0);
288                char group = permissions.charAt(1);
289                char other = permissions.charAt(2);
290                int useri = user - '0';
291                int groupi = group - '0';
292                int otheri = other - '0';
293                int mask = useri * 100 + groupi * 10 + otheri;
294                short omask = Short.parseShort(Integer.toString(mask), 8);
295                return new FsPermission(omask);
296            }
297            else {
298                if (permissions.length() == 10) {
299                    return FsPermission.valueOf(permissions);
300                }
301                else {
302                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS010",
303                                                      "chmod, path [{0}] invalid permissions mask [{1}]", path, permissions);
304                }
305            }
306        }
307    
308        @Override
309        public void check(Context context, WorkflowAction action) throws ActionExecutorException {
310        }
311    
312        @Override
313        public void kill(Context context, WorkflowAction action) throws ActionExecutorException {
314        }
315    
316        @Override
317        public void start(Context context, WorkflowAction action) throws ActionExecutorException {
318            try {
319                context.setStartData("-", "-", "-");
320                Element actionXml = XmlUtils.parseXml(action.getConf());
321                doOperations(context, actionXml);
322                context.setExecutionData("OK", null);
323            }
324            catch (Exception ex) {
325                throw convertException(ex);
326            }
327        }
328    
329        @Override
330        public void end(Context context, WorkflowAction action) throws ActionExecutorException {
331            String externalStatus = action.getExternalStatus();
332            WorkflowAction.Status status = externalStatus.equals("OK") ? WorkflowAction.Status.OK :
333                                           WorkflowAction.Status.ERROR;
334            context.setEndData(status, getActionSignal(status));
335            if (!context.getProtoActionConf().getBoolean("oozie.action.keep.action.dir", false)) {
336                try {
337                    FileSystem fs = context.getAppFileSystem();
338                    fs.delete(context.getActionDir(), true);
339                }
340                catch (Exception ex) {
341                    throw convertException(ex);
342                }
343            }
344        }
345    
346        @Override
347        public boolean isCompleted(String externalStatus) {
348            return true;
349        }
350    
351        /**
352         * @param context
353         * @return
354         * @throws HadoopAccessorException
355         * @throws IOException
356         * @throws URISyntaxException
357         */
358        public Path getRecoveryPath(Context context) throws HadoopAccessorException, IOException, URISyntaxException {
359            return new Path(context.getActionDir(), "fs-" + context.getRecoveryId());
360        }
361    
362    }