001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.command.wf; 020 021import java.io.IOException; 022import java.net.URI; 023import java.net.URISyntaxException; 024 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.fs.FileSystem; 027import org.apache.hadoop.fs.Path; 028import org.apache.oozie.ErrorCode; 029import org.apache.oozie.client.WorkflowJob; 030import org.apache.oozie.command.CommandException; 031import org.apache.oozie.command.PreconditionException; 032import org.apache.oozie.service.HadoopAccessorException; 033import org.apache.oozie.service.HadoopAccessorService; 034import org.apache.oozie.service.Services; 035 036/** 037 * This Command is expected to be called when a Workflow moves to any terminal 038 * state ( such as SUCCEEDED, KILLED, FAILED). This class primarily removes the 039 * temporary directory created for specific workflow id 040 */ 041public class WfEndXCommand extends WorkflowXCommand<Void> { 042 043 private WorkflowJob job = null; 044 045 public WfEndXCommand(WorkflowJob job) { 046 super("wf_end", "wf_end", 1); 047 this.job = job; 048 } 049 050 @Override 051 protected Void execute() throws CommandException { 052 LOG.debug("STARTED WFEndXCommand " + job.getId()); 053 deleteWFDir(); 054 LOG.debug("ENDED WFEndXCommand " + job.getId()); 055 return null; 056 } 057 058 private void deleteWFDir() throws CommandException { 059 FileSystem fs; 060 try { 061 fs = getAppFileSystem(job); 062 String wfDir = Services.get().getSystemId() + "/" + job.getId(); 063 Path wfDirPath = new Path(fs.getHomeDirectory(), wfDir); 064 LOG.debug("WF tmp dir :" + wfDirPath); 065 if (fs.exists(wfDirPath)) { 066 fs.delete(wfDirPath, true); 067 } 068 else { 069 LOG.debug("Tmp dir doesn't exist :" + wfDirPath); 070 } 071 } 072 catch (Exception e) { 073 LOG.error("Unable to delete WF temp dir of wf id :" + job.getId(), e); 074 throw new CommandException(ErrorCode.E0819, job.getId(), e); 075 } 076 077 } 078 079 protected FileSystem getAppFileSystem(WorkflowJob workflow) throws HadoopAccessorException, IOException, 080 URISyntaxException { 081 URI uri = new URI(workflow.getAppPath()); 082 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 083 Configuration fsConf = has.createJobConf(uri.getAuthority()); 084 return has.createFileSystem(workflow.getUser(), uri, fsConf); 085 } 086 087 @Override 088 public String getEntityKey() { 089 return job.getId(); 090 } 091 092 @Override 093 protected boolean isLockRequired() { 094 return false; 095 } 096 097 @Override 098 protected void loadState() throws CommandException { 099 100 } 101 102 @Override 103 protected void verifyPrecondition() throws CommandException, PreconditionException { 104 105 } 106 107}