001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.wf; 019 020 import java.io.IOException; 021 import java.net.URI; 022 import java.net.URISyntaxException; 023 024 import org.apache.hadoop.conf.Configuration; 025 import org.apache.hadoop.fs.FileSystem; 026 import org.apache.hadoop.fs.Path; 027 import org.apache.oozie.ErrorCode; 028 import org.apache.oozie.client.WorkflowJob; 029 import org.apache.oozie.command.CommandException; 030 import org.apache.oozie.command.PreconditionException; 031 import org.apache.oozie.service.HadoopAccessorException; 032 import org.apache.oozie.service.HadoopAccessorService; 033 import org.apache.oozie.service.Services; 034 035 /** 036 * This Command is expected to be called when a Workflow moves to any terminal 037 * state ( such as SUCCEEDED, KILLED, FAILED). This class primarily removes the 038 * temporary directory created for specific workflow id 039 */ 040 public class WfEndXCommand extends WorkflowXCommand<Void> { 041 042 private WorkflowJob job = null; 043 044 public WfEndXCommand(WorkflowJob job) { 045 super("wf_end", "wf_end", 1); 046 this.job = job; 047 } 048 049 @Override 050 protected Void execute() throws CommandException { 051 LOG.debug("STARTED WFEndXCommand " + job.getId()); 052 deleteWFDir(); 053 LOG.debug("ENDED WFEndXCommand " + job.getId()); 054 return null; 055 } 056 057 private void deleteWFDir() throws CommandException { 058 FileSystem fs; 059 try { 060 fs = getAppFileSystem(job); 061 String wfDir = Services.get().getSystemId() + "/" + job.getId(); 062 Path wfDirPath = new Path(fs.getHomeDirectory(), wfDir); 063 LOG.debug("WF tmp dir :" + wfDirPath); 064 if (fs.exists(wfDirPath)) { 065 fs.delete(wfDirPath, true); 066 } 067 else { 068 LOG.debug("Tmp dir doesn't exist :" + wfDirPath); 069 } 070 } 071 catch (Exception e) { 072 LOG.error("Unable to delete WF temp dir of wf id :" + job.getId(), e); 073 throw new CommandException(ErrorCode.E0819); 074 } 075 076 } 077 078 protected FileSystem getAppFileSystem(WorkflowJob workflow) throws HadoopAccessorException, IOException, 079 URISyntaxException { 080 URI uri = new URI(workflow.getAppPath()); 081 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 082 Configuration fsConf = has.createJobConf(uri.getAuthority()); 083 return has.createFileSystem(workflow.getUser(), uri, fsConf); 084 } 085 086 @Override 087 public String getEntityKey() { 088 return job.getId(); 089 } 090 091 @Override 092 protected boolean isLockRequired() { 093 return false; 094 } 095 096 @Override 097 protected void loadState() throws CommandException { 098 099 } 100 101 @Override 102 protected void verifyPrecondition() throws CommandException, PreconditionException { 103 104 } 105 106 }