001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.coord; 019 020 import org.apache.oozie.client.CoordinatorJob; 021 import org.apache.oozie.client.Job; 022 import org.apache.oozie.CoordinatorActionBean; 023 import org.apache.oozie.CoordinatorJobBean; 024 import org.apache.oozie.ErrorCode; 025 import org.apache.oozie.XException; 026 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand; 027 import org.apache.oozie.command.wf.KillXCommand; 028 import org.apache.oozie.command.CommandException; 029 import org.apache.oozie.command.KillTransitionXCommand; 030 import org.apache.oozie.command.PreconditionException; 031 import org.apache.oozie.executor.jpa.CoordActionUpdateStatusJPAExecutor; 032 import org.apache.oozie.executor.jpa.CoordJobGetActionsNotCompletedJPAExecutor; 033 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; 034 import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor; 035 import org.apache.oozie.executor.jpa.JPAExecutorException; 036 import org.apache.oozie.service.JPAService; 037 import org.apache.oozie.service.Services; 038 import org.apache.oozie.util.LogUtils; 039 import org.apache.oozie.util.ParamChecker; 040 import org.apache.oozie.util.StatusUtils; 041 042 import java.util.Date; 043 import java.util.List; 044 045 public class CoordKillXCommand extends KillTransitionXCommand { 046 047 private final String jobId; 048 private CoordinatorJobBean coordJob; 049 private List<CoordinatorActionBean> actionList; 050 private JPAService jpaService = null; 051 private CoordinatorJob.Status prevStatus = null; 052 053 public CoordKillXCommand(String id) { 054 super("coord_kill", "coord_kill", 2); 055 this.jobId = ParamChecker.notEmpty(id, "id"); 056 } 057 058 @Override 059 protected boolean isLockRequired() { 060 return true; 061 } 062 063 @Override 064 public String getEntityKey() { 065 return this.jobId; 066 } 067 068 @Override 069 protected void loadState() throws CommandException { 070 try { 071 jpaService = Services.get().get(JPAService.class); 072 073 if (jpaService != null) { 074 this.coordJob = jpaService.execute(new CoordJobGetJPAExecutor(jobId)); 075 //Get actions which are not succeeded, failed, timed out or killed 076 this.actionList = jpaService.execute(new CoordJobGetActionsNotCompletedJPAExecutor(jobId)); 077 prevStatus = coordJob.getStatus(); 078 LogUtils.setLogInfo(coordJob, logInfo); 079 } 080 else { 081 throw new CommandException(ErrorCode.E0610); 082 } 083 } 084 catch (XException ex) { 085 throw new CommandException(ex); 086 } 087 } 088 089 @Override 090 protected void verifyPrecondition() throws CommandException, PreconditionException { 091 // if namespace 0.1 is used and backward support is true, SUCCEEDED coord job can be killed 092 if (StatusUtils.isV1CoordjobKillable(coordJob)) { 093 return; 094 } 095 if (coordJob.getStatus() == CoordinatorJob.Status.SUCCEEDED 096 || coordJob.getStatus() == CoordinatorJob.Status.FAILED 097 || coordJob.getStatus() == CoordinatorJob.Status.DONEWITHERROR 098 || coordJob.getStatus() == CoordinatorJob.Status.KILLED) { 099 LOG.info("CoordKillXCommand not killed - job either finished SUCCEEDED, FAILED, KILLED or DONEWITHERROR, job id = " 100 + jobId + ", status = " + coordJob.getStatus()); 101 throw new PreconditionException(ErrorCode.E1020, jobId); 102 } 103 } 104 105 private void updateCoordAction(CoordinatorActionBean action, boolean makePending) throws CommandException { 106 action.setStatus(CoordinatorActionBean.Status.KILLED); 107 if (makePending) { 108 action.incrementAndGetPending(); 109 } else { 110 // set pending to false 111 action.setPending(0); 112 } 113 action.setLastModifiedTime(new Date()); 114 try { 115 jpaService.execute(new CoordActionUpdateStatusJPAExecutor(action)); 116 } catch (JPAExecutorException e) { 117 throw new CommandException(e); 118 } 119 } 120 121 @Override 122 public void killChildren() throws CommandException { 123 try { 124 if (actionList != null) { 125 for (CoordinatorActionBean action : actionList) { 126 // queue a WorkflowKillXCommand to delete the workflow job and actions 127 if (action.getExternalId() != null) { 128 queue(new KillXCommand(action.getExternalId())); 129 // As the kill command for children is queued, set pending flag for coord action to be true 130 updateCoordAction(action, true); 131 LOG.debug( 132 "Killed coord action = [{0}], new status = [{1}], pending = [{2}] and queue KillXCommand for [{3}]", 133 action.getId(), action.getStatus(), action.getPending(), action.getExternalId()); 134 } 135 else { 136 // As killing children is not required, set pending flag for coord action to be false 137 updateCoordAction(action, false); 138 LOG.debug("Killed coord action = [{0}], current status = [{1}], pending = [{2}]", 139 action.getId(), action.getStatus(), action.getPending()); 140 } 141 } 142 } 143 144 jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob)); 145 146 LOG.debug("Killed coord actions for the coordinator=[{0}]", jobId); 147 } 148 catch (JPAExecutorException ex) { 149 throw new CommandException(ex); 150 } 151 } 152 153 @Override 154 public void notifyParent() throws CommandException { 155 // update bundle action 156 if (coordJob.getBundleId() != null) { 157 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus); 158 bundleStatusUpdate.call(); 159 } 160 } 161 162 @Override 163 public void updateJob() throws CommandException { 164 try { 165 jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob)); 166 } 167 catch (JPAExecutorException ex) { 168 throw new CommandException(ex); 169 } 170 } 171 172 /* (non-Javadoc) 173 * @see org.apache.oozie.command.TransitionXCommand#getJob() 174 */ 175 @Override 176 public Job getJob() { 177 return coordJob; 178 } 179 180 /* (non-Javadoc) 181 * @see org.apache.oozie.command.XCommand#getKey() 182 */ 183 @Override 184 public String getKey(){ 185 return getName() + "_" + jobId; 186 } 187 188 }