001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.coord; 019 020 import org.apache.oozie.client.CoordinatorJob; 021 import org.apache.oozie.client.Job; 022 import org.apache.oozie.CoordinatorActionBean; 023 import org.apache.oozie.CoordinatorJobBean; 024 import org.apache.oozie.ErrorCode; 025 import org.apache.oozie.XException; 026 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand; 027 import org.apache.oozie.command.wf.KillXCommand; 028 import org.apache.oozie.command.CommandException; 029 import org.apache.oozie.command.KillTransitionXCommand; 030 import org.apache.oozie.command.PreconditionException; 031 import org.apache.oozie.executor.jpa.CoordActionUpdateJPAExecutor; 032 import org.apache.oozie.executor.jpa.CoordJobGetActionsJPAExecutor; 033 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; 034 import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor; 035 import org.apache.oozie.executor.jpa.JPAExecutorException; 036 import org.apache.oozie.service.JPAService; 037 import org.apache.oozie.service.Services; 038 import org.apache.oozie.util.LogUtils; 039 import org.apache.oozie.util.ParamChecker; 040 import org.apache.oozie.util.StatusUtils; 041 042 import java.util.Date; 043 import java.util.List; 044 045 public class CoordKillXCommand extends KillTransitionXCommand { 046 047 private final String jobId; 048 private CoordinatorJobBean coordJob; 049 private List<CoordinatorActionBean> actionList; 050 private JPAService jpaService = null; 051 private CoordinatorJob.Status prevStatus = null; 052 053 public CoordKillXCommand(String id) { 054 super("coord_kill", "coord_kill", 2); 055 this.jobId = ParamChecker.notEmpty(id, "id"); 056 } 057 058 @Override 059 protected boolean isLockRequired() { 060 return true; 061 } 062 063 @Override 064 protected String getEntityKey() { 065 return this.jobId; 066 } 067 068 @Override 069 protected void loadState() throws CommandException { 070 try { 071 jpaService = Services.get().get(JPAService.class); 072 073 if (jpaService != null) { 074 this.coordJob = jpaService.execute(new CoordJobGetJPAExecutor(jobId)); 075 this.actionList = jpaService.execute(new CoordJobGetActionsJPAExecutor(jobId)); 076 prevStatus = coordJob.getStatus(); 077 LogUtils.setLogInfo(coordJob, logInfo); 078 } 079 else { 080 throw new CommandException(ErrorCode.E0610); 081 } 082 } 083 catch (XException ex) { 084 throw new CommandException(ex); 085 } 086 } 087 088 @Override 089 protected void verifyPrecondition() throws CommandException, PreconditionException { 090 // if namespace 0.1 is used and backward support is true, SUCCEEDED coord job can be killed 091 if (StatusUtils.isV1CoordjobKillable(coordJob)) { 092 return; 093 } 094 if (coordJob.getStatus() == CoordinatorJob.Status.SUCCEEDED 095 || coordJob.getStatus() == CoordinatorJob.Status.FAILED 096 || coordJob.getStatus() == CoordinatorJob.Status.DONEWITHERROR) { 097 LOG.info("CoordKillXCommand not killed - job either finished SUCCEEDED, FAILED or DONEWITHERROR, job id = " 098 + jobId + ", status = " + coordJob.getStatus()); 099 throw new PreconditionException(ErrorCode.E1020, jobId); 100 } 101 } 102 103 private void updateCoordAction(CoordinatorActionBean action) throws CommandException { 104 action.setStatus(CoordinatorActionBean.Status.KILLED); 105 action.incrementAndGetPending(); 106 action.setLastModifiedTime(new Date()); 107 try { 108 jpaService.execute(new CoordActionUpdateJPAExecutor(action)); 109 } 110 catch (JPAExecutorException e) { 111 throw new CommandException(e); 112 } 113 } 114 115 @Override 116 public void killChildren() throws CommandException { 117 try { 118 if (actionList != null) { 119 for (CoordinatorActionBean action : actionList) { 120 if (action.getStatus() != CoordinatorActionBean.Status.FAILED 121 && action.getStatus() != CoordinatorActionBean.Status.TIMEDOUT 122 && action.getStatus() != CoordinatorActionBean.Status.SUCCEEDED 123 && action.getStatus() != CoordinatorActionBean.Status.KILLED) { 124 // queue a WorkflowKillXCommand to delete the workflow job and actions 125 if (action.getExternalId() != null) { 126 queue(new KillXCommand(action.getExternalId())); 127 updateCoordAction(action); 128 LOG.debug("Killed coord action = [{0}], new status = [{1}], pending = [{2}] and queue KillXCommand for [{3}]", 129 action.getId(), action.getStatus(), action.getPending(), action.getExternalId()); 130 } 131 else { 132 updateCoordAction(action); 133 LOG.debug("Killed coord action = [{0}], current status = [{1}], pending = [{2}]", action.getId(), action 134 .getStatus(), action.getPending()); 135 } 136 } 137 } 138 } 139 jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob)); 140 141 LOG.debug("Killed coord actions for the coordinator=[{0}]", jobId); 142 } 143 catch (JPAExecutorException ex) { 144 throw new CommandException(ex); 145 } 146 } 147 148 @Override 149 public void notifyParent() throws CommandException { 150 // update bundle action 151 if (coordJob.getBundleId() != null) { 152 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus); 153 bundleStatusUpdate.call(); 154 } 155 } 156 157 @Override 158 public void updateJob() throws CommandException { 159 try { 160 jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob)); 161 } 162 catch (JPAExecutorException ex) { 163 throw new CommandException(ex); 164 } 165 } 166 167 /* (non-Javadoc) 168 * @see org.apache.oozie.command.TransitionXCommand#getJob() 169 */ 170 @Override 171 public Job getJob() { 172 return coordJob; 173 } 174 175 /* (non-Javadoc) 176 * @see org.apache.oozie.command.XCommand#getKey() 177 */ 178 @Override 179 public String getKey(){ 180 return getName() + "_" + jobId; 181 } 182 183 }