001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.action.hadoop;
019    
020    import java.io.IOException;
021    import java.net.URISyntaxException;
022    import java.util.List;
023    
024    import org.apache.hadoop.fs.FileStatus;
025    import org.apache.hadoop.fs.FileSystem;
026    import org.apache.hadoop.fs.Path;
027    import org.apache.hadoop.fs.permission.FsPermission;
028    import org.apache.hadoop.mapred.JobConf;
029    import org.apache.oozie.action.ActionExecutor;
030    import org.apache.oozie.action.ActionExecutorException;
031    import org.apache.oozie.client.WorkflowAction;
032    import org.apache.oozie.service.HadoopAccessorException;
033    import org.apache.oozie.service.HadoopAccessorService;
034    import org.apache.oozie.service.Services;
035    import org.apache.oozie.util.XConfiguration;
036    import org.apache.oozie.util.XmlUtils;
037    import org.jdom.Element;
038    
039    /**
040     * File system action executor. <p/> This executes the file system mkdir, move and delete commands
041     */
042    public class FsActionExecutor extends ActionExecutor {
043    
044        public FsActionExecutor() {
045            super("fs");
046        }
047    
048        Path getPath(Element element, String attribute) {
049            String str = element.getAttributeValue(attribute).trim();
050            return new Path(str);
051        }
052    
053        void validatePath(Path path, boolean withScheme) throws ActionExecutorException {
054            String scheme = path.toUri().getScheme();
055            if (withScheme) {
056                if (scheme == null) {
057                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS001",
058                                                      "Missing scheme in path [{0}]", path);
059                }
060                else {
061                    if (!scheme.equals("hdfs")) {
062                        throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS002",
063                                                          "Scheme [{0}] not supported in path [{1}]", scheme, path);
064                    }
065                }
066            }
067            else { 
068                if (scheme != null) {
069                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS003",
070                                                      "Scheme [{0}] not allowed in path [{1}]", scheme, path);
071                }
072            }
073        }
074    
075        void validateSameNN(Path source, Path dest) throws ActionExecutorException {
076            Path destPath = new Path(source, dest);
077            String t = destPath.toUri().getScheme() + destPath.toUri().getAuthority();
078            String s = source.toUri().getScheme() + source.toUri().getAuthority();
079    
080            //checking whether NN prefix of source and target is same. can modify this to adjust for a set of multiple whitelisted NN
081            if(!t.equals(s)) {
082                throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS007",
083                        "move, target NN URI different from that of source", dest);
084            }
085        }
086    
087        @SuppressWarnings("unchecked")
088        void doOperations(Context context, Element element) throws ActionExecutorException {
089            try {
090                FileSystem fs = context.getAppFileSystem();
091                boolean recovery = fs.exists(getRecoveryPath(context));
092                if (!recovery) {
093                    fs.mkdirs(getRecoveryPath(context));
094                }
095                for (Element commandElement : (List<Element>) element.getChildren()) {
096                    String command = commandElement.getName();
097                    if (command.equals("mkdir")) {
098                        Path path = getPath(commandElement, "path");
099                        mkdir(context, path);
100                    }
101                    else {
102                        if (command.equals("delete")) {
103                            Path path = getPath(commandElement, "path");
104                            delete(context, path);
105                        }
106                        else {
107                            if (command.equals("move")) {
108                                Path source = getPath(commandElement, "source");
109                                Path target = getPath(commandElement, "target");
110                                move(context, source, target, recovery);
111                            }
112                            else {
113                                if (command.equals("chmod")) {
114                                    Path path = getPath(commandElement, "path");
115                                    String str = commandElement.getAttributeValue("dir-files");
116                                    boolean dirFiles = (str == null) || Boolean.parseBoolean(str);
117                                    String permissionsMask = commandElement.getAttributeValue("permissions").trim();
118                                    chmod(context, path, permissionsMask, dirFiles);
119                                }
120                            }
121                        }
122                    }
123                }
124            }
125            catch (Exception ex) {
126                throw convertException(ex);
127            }
128        }
129    
130        /**
131         * @param path
132         * @param context
133         * @return FileSystem
134         * @throws HadoopAccessorException
135         */
136        private FileSystem getFileSystemFor(Path path, Context context) throws HadoopAccessorException {
137            String user = context.getWorkflow().getUser();
138            HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
139            JobConf conf = has.createJobConf(path.toUri().getAuthority());
140            XConfiguration.copy(context.getProtoActionConf(), conf);
141            return has.createFileSystem(user, path.toUri(), conf);
142        }
143    
144        /**
145         * @param path
146         * @param user
147         * @param group
148         * @return FileSystem
149         * @throws HadoopAccessorException
150         */
151        private FileSystem getFileSystemFor(Path path, String user) throws HadoopAccessorException {
152            HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
153            JobConf jobConf = has.createJobConf(path.toUri().getAuthority());
154            return has.createFileSystem(user, path.toUri(), jobConf);
155        }
156    
157        void mkdir(Context context, Path path) throws ActionExecutorException {
158            try {
159                validatePath(path, true);
160                FileSystem fs = getFileSystemFor(path, context);
161    
162                if (!fs.exists(path)) {
163                    if (!fs.mkdirs(path)) {
164                        throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS004",
165                                                          "mkdir, path [{0}] could not create directory", path);
166                    }
167                }
168            }
169            catch (Exception ex) {
170                throw convertException(ex);
171            }
172        }
173    
174        /**
175         * Delete path
176         *
177         * @param context
178         * @param path
179         * @throws ActionExecutorException
180         */
181        public void delete(Context context, Path path) throws ActionExecutorException {
182            try {
183                validatePath(path, true);
184                FileSystem fs = getFileSystemFor(path, context);
185    
186                if (fs.exists(path)) {
187                    if (!fs.delete(path, true)) {
188                        throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS005",
189                                                          "delete, path [{0}] could not delete path", path);
190                    }
191                }
192            }
193            catch (Exception ex) {
194                throw convertException(ex);
195            }
196        }
197    
198        /**
199         * Delete path
200         *
201         * @param user
202         * @param group
203         * @param path
204         * @throws ActionExecutorException
205         */
206        public void delete(String user, String group, Path path) throws ActionExecutorException {
207            try {
208                validatePath(path, true);
209                FileSystem fs = getFileSystemFor(path, user);
210    
211                if (fs.exists(path)) {
212                    if (!fs.delete(path, true)) {
213                        throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS005",
214                                "delete, path [{0}] could not delete path", path);
215                    }
216                }
217            }
218            catch (Exception ex) {
219                throw convertException(ex);
220            }
221        }
222    
223        /**
224         * Move source to target
225         *
226         * @param context
227         * @param source
228         * @param target
229         * @param recovery
230         * @throws ActionExecutorException
231         */
232        public void move(Context context, Path source, Path target, boolean recovery) throws ActionExecutorException {
233            try {
234                validatePath(source, true);
235                validateSameNN(source, target);
236                FileSystem fs = getFileSystemFor(source, context);
237    
238                if (!fs.exists(source) && !recovery) {
239                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS006",
240                                                      "move, source path [{0}] does not exist", source);
241                }
242    
243                if (!fs.rename(source, target) && !recovery) {
244                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS008",
245                                                      "move, could not move [{0}] to [{1}]", source, target);
246                }
247            }
248            catch (Exception ex) {
249                throw convertException(ex);
250            }
251        }
252    
253        void chmod(Context context, Path path, String permissions, boolean dirFiles) throws ActionExecutorException {
254            try {
255                validatePath(path, true);
256                FileSystem fs = getFileSystemFor(path, context);
257    
258                if (!fs.exists(path)) {
259                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS009",
260                                                      "chmod, path [{0}] does not exist", path);
261                }
262    
263                FileStatus pathStatus = fs.getFileStatus(path);
264    
265                Path[] paths;
266                if (dirFiles && pathStatus.isDir()) {
267                    FileStatus[] filesStatus = fs.listStatus(path);
268                    paths = new Path[filesStatus.length];
269                    for (int i = 0; i < filesStatus.length; i++) {
270                        paths[i] = filesStatus[i].getPath();
271                    }
272                }
273                else {
274                    paths = new Path[]{path};
275                }
276    
277                FsPermission newFsPermission = createShortPermission(permissions, path);
278                fs.setPermission(path, newFsPermission);
279                for (Path p : paths) {
280                    fs.setPermission(p, newFsPermission);
281                }
282            }
283            catch (Exception ex) {
284                throw convertException(ex);
285            }
286        }
287    
288        FsPermission createShortPermission(String permissions, Path path) throws ActionExecutorException {
289            if (permissions.length() == 3) {
290                char user = permissions.charAt(0);
291                char group = permissions.charAt(1);
292                char other = permissions.charAt(2);
293                int useri = user - '0';
294                int groupi = group - '0';
295                int otheri = other - '0';
296                int mask = useri * 100 + groupi * 10 + otheri;
297                short omask = Short.parseShort(Integer.toString(mask), 8);
298                return new FsPermission(omask);
299            }
300            else {
301                if (permissions.length() == 10) {
302                    return FsPermission.valueOf(permissions);
303                }
304                else {
305                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS010",
306                                                      "chmod, path [{0}] invalid permissions mask [{1}]", path, permissions);
307                }
308            }
309        }
310    
311        @Override
312        public void check(Context context, WorkflowAction action) throws ActionExecutorException {
313        }
314    
315        @Override
316        public void kill(Context context, WorkflowAction action) throws ActionExecutorException {
317        }
318    
319        @Override
320        public void start(Context context, WorkflowAction action) throws ActionExecutorException {
321            try {
322                context.setStartData("-", "-", "-");
323                Element actionXml = XmlUtils.parseXml(action.getConf());
324                doOperations(context, actionXml);
325                context.setExecutionData("OK", null);
326            }
327            catch (Exception ex) {
328                throw convertException(ex);
329            }
330        }
331    
332        @Override
333        public void end(Context context, WorkflowAction action) throws ActionExecutorException {
334            String externalStatus = action.getExternalStatus();
335            WorkflowAction.Status status = externalStatus.equals("OK") ? WorkflowAction.Status.OK :
336                                           WorkflowAction.Status.ERROR;
337            context.setEndData(status, getActionSignal(status));
338            if (!context.getProtoActionConf().getBoolean("oozie.action.keep.action.dir", false)) {
339                try {
340                    FileSystem fs = context.getAppFileSystem();
341                    fs.delete(context.getActionDir(), true);
342                }
343                catch (Exception ex) {
344                    throw convertException(ex);
345                }
346            }
347        }
348    
349        @Override
350        public boolean isCompleted(String externalStatus) {
351            return true;
352        }
353    
354        /**
355         * @param context
356         * @return
357         * @throws HadoopAccessorException
358         * @throws IOException
359         * @throws URISyntaxException
360         */
361        public Path getRecoveryPath(Context context) throws HadoopAccessorException, IOException, URISyntaxException {
362            return new Path(context.getActionDir(), "fs-" + context.getRecoveryId());
363        }
364    
365    }