This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.action.hadoop;
019
020 import java.io.IOException;
021 import java.net.URISyntaxException;
022 import java.util.List;
023
024 import org.apache.hadoop.conf.Configuration;
025 import org.apache.hadoop.fs.FileStatus;
026 import org.apache.hadoop.fs.FileSystem;
027 import org.apache.hadoop.fs.Path;
028 import org.apache.hadoop.fs.permission.FsPermission;
029 import org.apache.oozie.action.ActionExecutor;
030 import org.apache.oozie.action.ActionExecutorException;
031 import org.apache.oozie.client.WorkflowAction;
032 import org.apache.oozie.service.HadoopAccessorException;
033 import org.apache.oozie.service.HadoopAccessorService;
034 import org.apache.oozie.service.Services;
035 import org.apache.oozie.util.XmlUtils;
036 import org.jdom.Element;
037
038 /**
039 * File system action executor. <p/> This executes the file system mkdir, move and delete commands
040 */
041 public class FsActionExecutor extends ActionExecutor {
042
043 public FsActionExecutor() {
044 super("fs");
045 }
046
047 Path getPath(Element element, String attribute) {
048 String str = element.getAttributeValue(attribute).trim();
049 return new Path(str);
050 }
051
052 void validatePath(Path path, boolean withScheme) throws ActionExecutorException {
053 String scheme = path.toUri().getScheme();
054 if (withScheme) {
055 if (scheme == null) {
056 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS001",
057 "Missing scheme in path [{0}]", path);
058 }
059 else {
060 if (!scheme.equals("hdfs")) {
061 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS002",
062 "Scheme [{0}] not supported in path [{1}]", scheme, path);
063 }
064 }
065 }
066 else {
067 if (scheme != null) {
068 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS003",
069 "Scheme [{0}] not allowed in path [{1}]", scheme, path);
070 }
071 }
072 }
073
074 void validateSameNN(Path source, Path dest) throws ActionExecutorException {
075 Path destPath = new Path(source, dest);
076 String t = destPath.toUri().getScheme() + destPath.toUri().getAuthority();
077 String s = source.toUri().getScheme() + source.toUri().getAuthority();
078
079 //checking whether NN prefix of source and target is same. can modify this to adjust for a set of multiple whitelisted NN
080 if(!t.equals(s)) {
081 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS007",
082 "move, target NN URI different from that of source", dest);
083 }
084 }
085
086 @SuppressWarnings("unchecked")
087 void doOperations(Context context, Element element) throws ActionExecutorException {
088 try {
089 FileSystem fs = context.getAppFileSystem();
090 boolean recovery = fs.exists(getRecoveryPath(context));
091 if (!recovery) {
092 fs.mkdirs(getRecoveryPath(context));
093 }
094 for (Element commandElement : (List<Element>) element.getChildren()) {
095 String command = commandElement.getName();
096 if (command.equals("mkdir")) {
097 Path path = getPath(commandElement, "path");
098 mkdir(context, path);
099 }
100 else {
101 if (command.equals("delete")) {
102 Path path = getPath(commandElement, "path");
103 delete(context, path);
104 }
105 else {
106 if (command.equals("move")) {
107 Path source = getPath(commandElement, "source");
108 Path target = getPath(commandElement, "target");
109 move(context, source, target, recovery);
110 }
111 else {
112 if (command.equals("chmod")) {
113 Path path = getPath(commandElement, "path");
114 String str = commandElement.getAttributeValue("dir-files");
115 boolean dirFiles = (str == null) || Boolean.parseBoolean(str);
116 String permissionsMask = commandElement.getAttributeValue("permissions").trim();
117 chmod(context, path, permissionsMask, dirFiles);
118 }
119 }
120 }
121 }
122 }
123 }
124 catch (Exception ex) {
125 throw convertException(ex);
126 }
127 }
128
129 /**
130 * @param path
131 * @param context
132 * @return FileSystem
133 * @throws HadoopAccessorException
134 */
135 private FileSystem getFileSystemFor(Path path, Context context) throws HadoopAccessorException {
136 String user = context.getWorkflow().getUser();
137 String group = context.getWorkflow().getGroup();
138 return Services.get().get(HadoopAccessorService.class).createFileSystem(user, group, path.toUri(),
139 new Configuration());
140 }
141
142 /**
143 * @param path
144 * @param user
145 * @param group
146 * @return FileSystem
147 * @throws HadoopAccessorException
148 */
149 private FileSystem getFileSystemFor(Path path, String user, String group) throws HadoopAccessorException {
150 return Services.get().get(HadoopAccessorService.class).createFileSystem(user, group, path.toUri(),
151 new Configuration());
152 }
153
154 void mkdir(Context context, Path path) throws ActionExecutorException {
155 try {
156 validatePath(path, true);
157 FileSystem fs = getFileSystemFor(path, context);
158
159 if (!fs.exists(path)) {
160 if (!fs.mkdirs(path)) {
161 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS004",
162 "mkdir, path [{0}] could not create directory", path);
163 }
164 }
165 }
166 catch (Exception ex) {
167 throw convertException(ex);
168 }
169 }
170
171 /**
172 * Delete path
173 *
174 * @param context
175 * @param path
176 * @throws ActionExecutorException
177 */
178 public void delete(Context context, Path path) throws ActionExecutorException {
179 try {
180 validatePath(path, true);
181 FileSystem fs = getFileSystemFor(path, context);
182
183 if (fs.exists(path)) {
184 if (!fs.delete(path, true)) {
185 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS005",
186 "delete, path [{0}] could not delete path", path);
187 }
188 }
189 }
190 catch (Exception ex) {
191 throw convertException(ex);
192 }
193 }
194
195 /**
196 * Delete path
197 *
198 * @param user
199 * @param group
200 * @param path
201 * @throws ActionExecutorException
202 */
203 public void delete(String user, String group, Path path) throws ActionExecutorException {
204 try {
205 validatePath(path, true);
206 FileSystem fs = getFileSystemFor(path, user, group);
207
208 if (fs.exists(path)) {
209 if (!fs.delete(path, true)) {
210 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS005",
211 "delete, path [{0}] could not delete path", path);
212 }
213 }
214 }
215 catch (Exception ex) {
216 throw convertException(ex);
217 }
218 }
219
220 /**
221 * Move source to target
222 *
223 * @param context
224 * @param source
225 * @param target
226 * @param recovery
227 * @throws ActionExecutorException
228 */
229 public void move(Context context, Path source, Path target, boolean recovery) throws ActionExecutorException {
230 try {
231 validatePath(source, true);
232 validateSameNN(source, target);
233 FileSystem fs = getFileSystemFor(source, context);
234
235 if (!fs.exists(source) && !recovery) {
236 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS006",
237 "move, source path [{0}] does not exist", source);
238 }
239
240 if (!fs.rename(source, target) && !recovery) {
241 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS008",
242 "move, could not move [{0}] to [{1}]", source, target);
243 }
244 }
245 catch (Exception ex) {
246 throw convertException(ex);
247 }
248 }
249
250 void chmod(Context context, Path path, String permissions, boolean dirFiles) throws ActionExecutorException {
251 try {
252 validatePath(path, true);
253 FileSystem fs = getFileSystemFor(path, context);
254
255 if (!fs.exists(path)) {
256 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS009",
257 "chmod, path [{0}] does not exist", path);
258 }
259
260 FileStatus pathStatus = fs.getFileStatus(path);
261
262 Path[] paths;
263 if (dirFiles && pathStatus.isDir()) {
264 FileStatus[] filesStatus = fs.listStatus(path);
265 paths = new Path[filesStatus.length];
266 for (int i = 0; i < filesStatus.length; i++) {
267 paths[i] = filesStatus[i].getPath();
268 }
269 }
270 else {
271 paths = new Path[]{path};
272 }
273
274 FsPermission newFsPermission = createShortPermission(permissions, path);
275 fs.setPermission(path, newFsPermission);
276 for (Path p : paths) {
277 fs.setPermission(p, newFsPermission);
278 }
279 }
280 catch (Exception ex) {
281 throw convertException(ex);
282 }
283 }
284
285 FsPermission createShortPermission(String permissions, Path path) throws ActionExecutorException {
286 if (permissions.length() == 3) {
287 char user = permissions.charAt(0);
288 char group = permissions.charAt(1);
289 char other = permissions.charAt(2);
290 int useri = user - '0';
291 int groupi = group - '0';
292 int otheri = other - '0';
293 int mask = useri * 100 + groupi * 10 + otheri;
294 short omask = Short.parseShort(Integer.toString(mask), 8);
295 return new FsPermission(omask);
296 }
297 else {
298 if (permissions.length() == 10) {
299 return FsPermission.valueOf(permissions);
300 }
301 else {
302 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS010",
303 "chmod, path [{0}] invalid permissions mask [{1}]", path, permissions);
304 }
305 }
306 }
307
308 @Override
309 public void check(Context context, WorkflowAction action) throws ActionExecutorException {
310 }
311
312 @Override
313 public void kill(Context context, WorkflowAction action) throws ActionExecutorException {
314 }
315
316 @Override
317 public void start(Context context, WorkflowAction action) throws ActionExecutorException {
318 try {
319 context.setStartData("-", "-", "-");
320 Element actionXml = XmlUtils.parseXml(action.getConf());
321 doOperations(context, actionXml);
322 context.setExecutionData("OK", null);
323 }
324 catch (Exception ex) {
325 throw convertException(ex);
326 }
327 }
328
329 @Override
330 public void end(Context context, WorkflowAction action) throws ActionExecutorException {
331 String externalStatus = action.getExternalStatus();
332 WorkflowAction.Status status = externalStatus.equals("OK") ? WorkflowAction.Status.OK :
333 WorkflowAction.Status.ERROR;
334 context.setEndData(status, getActionSignal(status));
335 if (!context.getProtoActionConf().getBoolean("oozie.action.keep.action.dir", false)) {
336 try {
337 FileSystem fs = context.getAppFileSystem();
338 fs.delete(context.getActionDir(), true);
339 }
340 catch (Exception ex) {
341 throw convertException(ex);
342 }
343 }
344 }
345
346 @Override
347 public boolean isCompleted(String externalStatus) {
348 return true;
349 }
350
351 /**
352 * @param context
353 * @return
354 * @throws HadoopAccessorException
355 * @throws IOException
356 * @throws URISyntaxException
357 */
358 public Path getRecoveryPath(Context context) throws HadoopAccessorException, IOException, URISyntaxException {
359 return new Path(context.getActionDir(), "fs-" + context.getRecoveryId());
360 }
361
362 }