001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.coord; 019 020 import org.apache.oozie.client.CoordinatorJob; 021 import org.apache.oozie.CoordinatorActionBean; 022 import org.apache.oozie.CoordinatorJobBean; 023 import org.apache.oozie.XException; 024 import org.apache.oozie.command.wf.KillCommand; 025 import org.apache.oozie.command.CommandException; 026 import org.apache.oozie.store.CoordinatorStore; 027 import org.apache.oozie.store.StoreException; 028 import org.apache.oozie.util.ParamChecker; 029 import org.apache.oozie.util.XLog; 030 031 import java.util.Date; 032 import java.util.List; 033 034 public class CoordKillCommand extends CoordinatorCommand<Void> { 035 036 private String jobId; 037 private final XLog log = XLog.getLog(getClass()); 038 039 public CoordKillCommand(String id) { 040 super("coord_kill", "coord_kill", 1, XLog.STD); 041 this.jobId = ParamChecker.notEmpty(id, "id"); 042 } 043 044 protected Void call(CoordinatorStore store) throws StoreException, CommandException { 045 try { 046 // CoordinatorJobBean coordJob = store.getCoordinatorJob(jobId, 047 // false); 048 CoordinatorJobBean coordJob = store.getEntityManager().find(CoordinatorJobBean.class, jobId); 049 setLogInfo(coordJob); 050 if (coordJob.getStatus() != CoordinatorJob.Status.SUCCEEDED 051 || coordJob.getStatus() != CoordinatorJob.Status.FAILED) { 052 coordJob.setEndTime(new Date()); 053 incrJobCounter(1); 054 coordJob.setStatus(CoordinatorJob.Status.KILLED); 055 List<CoordinatorActionBean> actionList = store.getActionsForCoordinatorJob(jobId, false); 056 for (CoordinatorActionBean action : actionList) { 057 if (action.getStatus() != CoordinatorActionBean.Status.FAILED 058 && action.getStatus() != CoordinatorActionBean.Status.TIMEDOUT 059 && action.getStatus() != CoordinatorActionBean.Status.SUCCEEDED 060 && action.getStatus() != CoordinatorActionBean.Status.KILLED) { 061 // queue a KillCommand to delete the workflow job 062 if (action.getExternalId() != null) { 063 queueCallable(new KillCommand(action.getExternalId())); 064 } 065 action.setStatus(CoordinatorActionBean.Status.KILLED); 066 store.updateCoordinatorAction(action); 067 } 068 } 069 store.updateCoordinatorJob(coordJob); 070 // TODO queueCallable(new NotificationCommand(coordJob)); 071 } 072 else { 073 log.info("CoordKillCommand not killed - job either " + "finished successfully or does not exist " 074 + jobId); 075 } 076 return null; 077 } 078 catch (XException ex) { 079 throw new CommandException(ex); 080 } 081 } 082 083 @Override 084 protected Void execute(CoordinatorStore store) throws StoreException, CommandException { 085 log.info("STARTED CoordKillCommand for jobId=" + jobId); 086 try { 087 if (lock(jobId)) { 088 call(store); 089 } 090 else { 091 queueCallable(new CoordKillCommand(jobId), LOCK_FAILURE_REQUEUE_INTERVAL); 092 log.warn("CoordKillCommand lock was not acquired - " + " failed " + jobId + ". Requeing the same."); 093 } 094 } 095 catch (InterruptedException e) { 096 queueCallable(new CoordKillCommand(jobId), LOCK_FAILURE_REQUEUE_INTERVAL); 097 log.warn("CoordKillCommand lock acquiring failed " + " with exception " + e.getMessage() + " for job id " 098 + jobId + ". Requeing the same."); 099 } 100 finally { 101 log.info("ENDED CoordKillCommand for jobId=" + jobId); 102 } 103 return null; 104 } 105 106 }