This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.wf;
019
020 import java.io.IOException;
021 import java.net.URI;
022 import java.net.URISyntaxException;
023
024 import org.apache.hadoop.conf.Configuration;
025 import org.apache.hadoop.fs.FileSystem;
026 import org.apache.hadoop.fs.Path;
027 import org.apache.oozie.ErrorCode;
028 import org.apache.oozie.client.WorkflowJob;
029 import org.apache.oozie.command.CommandException;
030 import org.apache.oozie.command.PreconditionException;
031 import org.apache.oozie.service.HadoopAccessorException;
032 import org.apache.oozie.service.HadoopAccessorService;
033 import org.apache.oozie.service.Services;
034
035 /**
036 * This Command is expected to be called when a Workflow moves to any terminal
037 * state ( such as SUCCEEDED, KILLED, FAILED). This class primarily removes the
038 * temporary directory created for specific workflow id
039 */
040 public class WfEndXCommand extends WorkflowXCommand<Void> {
041
042 private WorkflowJob job = null;
043
044 public WfEndXCommand(WorkflowJob job) {
045 super("wf_end", "wf_end", 1);
046 this.job = job;
047 }
048
049 @Override
050 protected Void execute() throws CommandException {
051 LOG.debug("STARTED WFEndXCommand " + job.getId());
052 deleteWFDir();
053 LOG.debug("ENDED WFEndXCommand " + job.getId());
054 return null;
055 }
056
057 private void deleteWFDir() throws CommandException {
058 FileSystem fs;
059 try {
060 fs = getAppFileSystem(job);
061 String wfDir = Services.get().getSystemId() + "/" + job.getId();
062 Path wfDirPath = new Path(fs.getHomeDirectory(), wfDir);
063 LOG.debug("WF tmp dir :" + wfDirPath);
064 if (fs.exists(wfDirPath)) {
065 fs.delete(wfDirPath, true);
066 }
067 else {
068 LOG.debug("Tmp dir doesn't exist :" + wfDirPath);
069 }
070 }
071 catch (Exception e) {
072 LOG.error("Unable to delete WF temp dir of wf id :" + job.getId(), e);
073 throw new CommandException(ErrorCode.E0819, job.getId(), e);
074 }
075
076 }
077
078 protected FileSystem getAppFileSystem(WorkflowJob workflow) throws HadoopAccessorException, IOException,
079 URISyntaxException {
080 URI uri = new URI(workflow.getAppPath());
081 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
082 Configuration fsConf = has.createJobConf(uri.getAuthority());
083 return has.createFileSystem(workflow.getUser(), uri, fsConf);
084 }
085
086 @Override
087 public String getEntityKey() {
088 return job.getId();
089 }
090
091 @Override
092 protected boolean isLockRequired() {
093 return false;
094 }
095
096 @Override
097 protected void loadState() throws CommandException {
098
099 }
100
101 @Override
102 protected void verifyPrecondition() throws CommandException, PreconditionException {
103
104 }
105
106 }