This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.action.hadoop;
019
020 import java.io.IOException;
021 import java.net.URISyntaxException;
022 import java.util.List;
023
024 import org.apache.hadoop.fs.FileStatus;
025 import org.apache.hadoop.fs.FileSystem;
026 import org.apache.hadoop.fs.Path;
027 import org.apache.hadoop.fs.permission.FsPermission;
028 import org.apache.hadoop.mapred.JobConf;
029 import org.apache.oozie.action.ActionExecutor;
030 import org.apache.oozie.action.ActionExecutorException;
031 import org.apache.oozie.client.WorkflowAction;
032 import org.apache.oozie.service.HadoopAccessorException;
033 import org.apache.oozie.service.HadoopAccessorService;
034 import org.apache.oozie.service.Services;
035 import org.apache.oozie.util.XConfiguration;
036 import org.apache.oozie.util.XmlUtils;
037 import org.jdom.Element;
038
039 /**
040 * File system action executor. <p/> This executes the file system mkdir, move and delete commands
041 */
042 public class FsActionExecutor extends ActionExecutor {
043
044 public FsActionExecutor() {
045 super("fs");
046 }
047
048 Path getPath(Element element, String attribute) {
049 String str = element.getAttributeValue(attribute).trim();
050 return new Path(str);
051 }
052
053 void validatePath(Path path, boolean withScheme) throws ActionExecutorException {
054 String scheme = path.toUri().getScheme();
055 if (withScheme) {
056 if (scheme == null) {
057 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS001",
058 "Missing scheme in path [{0}]", path);
059 }
060 else {
061 if (!scheme.equals("hdfs")) {
062 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS002",
063 "Scheme [{0}] not supported in path [{1}]", scheme, path);
064 }
065 }
066 }
067 else {
068 if (scheme != null) {
069 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS003",
070 "Scheme [{0}] not allowed in path [{1}]", scheme, path);
071 }
072 }
073 }
074
075 void validateSameNN(Path source, Path dest) throws ActionExecutorException {
076 Path destPath = new Path(source, dest);
077 String t = destPath.toUri().getScheme() + destPath.toUri().getAuthority();
078 String s = source.toUri().getScheme() + source.toUri().getAuthority();
079
080 //checking whether NN prefix of source and target is same. can modify this to adjust for a set of multiple whitelisted NN
081 if(!t.equals(s)) {
082 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS007",
083 "move, target NN URI different from that of source", dest);
084 }
085 }
086
087 @SuppressWarnings("unchecked")
088 void doOperations(Context context, Element element) throws ActionExecutorException {
089 try {
090 FileSystem fs = context.getAppFileSystem();
091 boolean recovery = fs.exists(getRecoveryPath(context));
092 if (!recovery) {
093 fs.mkdirs(getRecoveryPath(context));
094 }
095 for (Element commandElement : (List<Element>) element.getChildren()) {
096 String command = commandElement.getName();
097 if (command.equals("mkdir")) {
098 Path path = getPath(commandElement, "path");
099 mkdir(context, path);
100 }
101 else {
102 if (command.equals("delete")) {
103 Path path = getPath(commandElement, "path");
104 delete(context, path);
105 }
106 else {
107 if (command.equals("move")) {
108 Path source = getPath(commandElement, "source");
109 Path target = getPath(commandElement, "target");
110 move(context, source, target, recovery);
111 }
112 else {
113 if (command.equals("chmod")) {
114 Path path = getPath(commandElement, "path");
115 String str = commandElement.getAttributeValue("dir-files");
116 boolean dirFiles = (str == null) || Boolean.parseBoolean(str);
117 String permissionsMask = commandElement.getAttributeValue("permissions").trim();
118 chmod(context, path, permissionsMask, dirFiles);
119 }
120 }
121 }
122 }
123 }
124 }
125 catch (Exception ex) {
126 throw convertException(ex);
127 }
128 }
129
130 /**
131 * @param path
132 * @param context
133 * @return FileSystem
134 * @throws HadoopAccessorException
135 */
136 private FileSystem getFileSystemFor(Path path, Context context) throws HadoopAccessorException {
137 String user = context.getWorkflow().getUser();
138 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
139 JobConf conf = has.createJobConf(path.toUri().getAuthority());
140 XConfiguration.copy(context.getProtoActionConf(), conf);
141 return has.createFileSystem(user, path.toUri(), conf);
142 }
143
144 /**
145 * @param path
146 * @param user
147 * @param group
148 * @return FileSystem
149 * @throws HadoopAccessorException
150 */
151 private FileSystem getFileSystemFor(Path path, String user) throws HadoopAccessorException {
152 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
153 JobConf jobConf = has.createJobConf(path.toUri().getAuthority());
154 return has.createFileSystem(user, path.toUri(), jobConf);
155 }
156
157 void mkdir(Context context, Path path) throws ActionExecutorException {
158 try {
159 validatePath(path, true);
160 FileSystem fs = getFileSystemFor(path, context);
161
162 if (!fs.exists(path)) {
163 if (!fs.mkdirs(path)) {
164 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS004",
165 "mkdir, path [{0}] could not create directory", path);
166 }
167 }
168 }
169 catch (Exception ex) {
170 throw convertException(ex);
171 }
172 }
173
174 /**
175 * Delete path
176 *
177 * @param context
178 * @param path
179 * @throws ActionExecutorException
180 */
181 public void delete(Context context, Path path) throws ActionExecutorException {
182 try {
183 validatePath(path, true);
184 FileSystem fs = getFileSystemFor(path, context);
185
186 if (fs.exists(path)) {
187 if (!fs.delete(path, true)) {
188 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS005",
189 "delete, path [{0}] could not delete path", path);
190 }
191 }
192 }
193 catch (Exception ex) {
194 throw convertException(ex);
195 }
196 }
197
198 /**
199 * Delete path
200 *
201 * @param user
202 * @param group
203 * @param path
204 * @throws ActionExecutorException
205 */
206 public void delete(String user, String group, Path path) throws ActionExecutorException {
207 try {
208 validatePath(path, true);
209 FileSystem fs = getFileSystemFor(path, user);
210
211 if (fs.exists(path)) {
212 if (!fs.delete(path, true)) {
213 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS005",
214 "delete, path [{0}] could not delete path", path);
215 }
216 }
217 }
218 catch (Exception ex) {
219 throw convertException(ex);
220 }
221 }
222
223 /**
224 * Move source to target
225 *
226 * @param context
227 * @param source
228 * @param target
229 * @param recovery
230 * @throws ActionExecutorException
231 */
232 public void move(Context context, Path source, Path target, boolean recovery) throws ActionExecutorException {
233 try {
234 validatePath(source, true);
235 validateSameNN(source, target);
236 FileSystem fs = getFileSystemFor(source, context);
237
238 if (!fs.exists(source) && !recovery) {
239 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS006",
240 "move, source path [{0}] does not exist", source);
241 }
242
243 if (!fs.rename(source, target) && !recovery) {
244 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS008",
245 "move, could not move [{0}] to [{1}]", source, target);
246 }
247 }
248 catch (Exception ex) {
249 throw convertException(ex);
250 }
251 }
252
253 void chmod(Context context, Path path, String permissions, boolean dirFiles) throws ActionExecutorException {
254 try {
255 validatePath(path, true);
256 FileSystem fs = getFileSystemFor(path, context);
257
258 if (!fs.exists(path)) {
259 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS009",
260 "chmod, path [{0}] does not exist", path);
261 }
262
263 FileStatus pathStatus = fs.getFileStatus(path);
264
265 Path[] paths;
266 if (dirFiles && pathStatus.isDir()) {
267 FileStatus[] filesStatus = fs.listStatus(path);
268 paths = new Path[filesStatus.length];
269 for (int i = 0; i < filesStatus.length; i++) {
270 paths[i] = filesStatus[i].getPath();
271 }
272 }
273 else {
274 paths = new Path[]{path};
275 }
276
277 FsPermission newFsPermission = createShortPermission(permissions, path);
278 fs.setPermission(path, newFsPermission);
279 for (Path p : paths) {
280 fs.setPermission(p, newFsPermission);
281 }
282 }
283 catch (Exception ex) {
284 throw convertException(ex);
285 }
286 }
287
288 FsPermission createShortPermission(String permissions, Path path) throws ActionExecutorException {
289 if (permissions.length() == 3) {
290 char user = permissions.charAt(0);
291 char group = permissions.charAt(1);
292 char other = permissions.charAt(2);
293 int useri = user - '0';
294 int groupi = group - '0';
295 int otheri = other - '0';
296 int mask = useri * 100 + groupi * 10 + otheri;
297 short omask = Short.parseShort(Integer.toString(mask), 8);
298 return new FsPermission(omask);
299 }
300 else {
301 if (permissions.length() == 10) {
302 return FsPermission.valueOf(permissions);
303 }
304 else {
305 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FS010",
306 "chmod, path [{0}] invalid permissions mask [{1}]", path, permissions);
307 }
308 }
309 }
310
311 @Override
312 public void check(Context context, WorkflowAction action) throws ActionExecutorException {
313 }
314
315 @Override
316 public void kill(Context context, WorkflowAction action) throws ActionExecutorException {
317 }
318
319 @Override
320 public void start(Context context, WorkflowAction action) throws ActionExecutorException {
321 try {
322 context.setStartData("-", "-", "-");
323 Element actionXml = XmlUtils.parseXml(action.getConf());
324 doOperations(context, actionXml);
325 context.setExecutionData("OK", null);
326 }
327 catch (Exception ex) {
328 throw convertException(ex);
329 }
330 }
331
332 @Override
333 public void end(Context context, WorkflowAction action) throws ActionExecutorException {
334 String externalStatus = action.getExternalStatus();
335 WorkflowAction.Status status = externalStatus.equals("OK") ? WorkflowAction.Status.OK :
336 WorkflowAction.Status.ERROR;
337 context.setEndData(status, getActionSignal(status));
338 if (!context.getProtoActionConf().getBoolean("oozie.action.keep.action.dir", false)) {
339 try {
340 FileSystem fs = context.getAppFileSystem();
341 fs.delete(context.getActionDir(), true);
342 }
343 catch (Exception ex) {
344 throw convertException(ex);
345 }
346 }
347 }
348
349 @Override
350 public boolean isCompleted(String externalStatus) {
351 return true;
352 }
353
354 /**
355 * @param context
356 * @return
357 * @throws HadoopAccessorException
358 * @throws IOException
359 * @throws URISyntaxException
360 */
361 public Path getRecoveryPath(Context context) throws HadoopAccessorException, IOException, URISyntaxException {
362 return new Path(context.getActionDir(), "fs-" + context.getRecoveryId());
363 }
364
365 }