001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.coord; 019 020 import org.apache.oozie.client.CoordinatorJob; 021 import org.apache.oozie.client.Job; 022 import org.apache.oozie.CoordinatorActionBean; 023 import org.apache.oozie.CoordinatorJobBean; 024 import org.apache.oozie.ErrorCode; 025 import org.apache.oozie.XException; 026 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand; 027 import org.apache.oozie.command.wf.KillXCommand; 028 import org.apache.oozie.command.CommandException; 029 import org.apache.oozie.command.KillTransitionXCommand; 030 import org.apache.oozie.command.PreconditionException; 031 import org.apache.oozie.executor.jpa.BulkUpdateInsertForCoordActionStatusJPAExecutor; 032 import org.apache.oozie.executor.jpa.CoordJobGetActionsNotCompletedJPAExecutor; 033 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; 034 import org.apache.oozie.executor.jpa.JPAExecutorException; 035 import org.apache.oozie.service.JPAService; 036 import org.apache.oozie.service.Services; 037 import org.apache.oozie.util.LogUtils; 038 import org.apache.oozie.util.ParamChecker; 039 import org.apache.oozie.util.StatusUtils; 040 041 import java.util.Date; 042 import java.util.List; 043 044 public class CoordKillXCommand extends KillTransitionXCommand { 045 046 private final String jobId; 047 private CoordinatorJobBean coordJob; 048 private List<CoordinatorActionBean> actionList; 049 private JPAService jpaService = null; 050 private CoordinatorJob.Status prevStatus = null; 051 052 public CoordKillXCommand(String id) { 053 super("coord_kill", "coord_kill", 2); 054 this.jobId = ParamChecker.notEmpty(id, "id"); 055 } 056 057 @Override 058 protected boolean isLockRequired() { 059 return true; 060 } 061 062 @Override 063 public String getEntityKey() { 064 return this.jobId; 065 } 066 067 @Override 068 protected void loadState() throws CommandException { 069 try { 070 jpaService = Services.get().get(JPAService.class); 071 072 if (jpaService != null) { 073 this.coordJob = jpaService.execute(new CoordJobGetJPAExecutor(jobId)); 074 //Get actions which are not succeeded, failed, timed out or killed 075 this.actionList = jpaService.execute(new CoordJobGetActionsNotCompletedJPAExecutor(jobId)); 076 prevStatus = coordJob.getStatus(); 077 LogUtils.setLogInfo(coordJob, logInfo); 078 } 079 else { 080 throw new CommandException(ErrorCode.E0610); 081 } 082 } 083 catch (XException ex) { 084 throw new CommandException(ex); 085 } 086 } 087 088 @Override 089 protected void verifyPrecondition() throws CommandException, PreconditionException { 090 // if namespace 0.1 is used and backward support is true, SUCCEEDED coord job can be killed 091 if (StatusUtils.isV1CoordjobKillable(coordJob)) { 092 return; 093 } 094 if (coordJob.getStatus() == CoordinatorJob.Status.SUCCEEDED 095 || coordJob.getStatus() == CoordinatorJob.Status.FAILED 096 || coordJob.getStatus() == CoordinatorJob.Status.DONEWITHERROR 097 || coordJob.getStatus() == CoordinatorJob.Status.KILLED) { 098 LOG.info("CoordKillXCommand not killed - job either finished SUCCEEDED, FAILED, KILLED or DONEWITHERROR, job id = " 099 + jobId + ", status = " + coordJob.getStatus()); 100 throw new PreconditionException(ErrorCode.E1020, jobId); 101 } 102 } 103 104 private void updateCoordAction(CoordinatorActionBean action, boolean makePending) { 105 action.setStatus(CoordinatorActionBean.Status.KILLED); 106 if (makePending) { 107 action.incrementAndGetPending(); 108 } else { 109 // set pending to false 110 action.setPending(0); 111 } 112 action.setLastModifiedTime(new Date()); 113 updateList.add(action); 114 } 115 116 @Override 117 public void killChildren() throws CommandException { 118 if (actionList != null) { 119 for (CoordinatorActionBean action : actionList) { 120 // queue a WorkflowKillXCommand to delete the workflow job and actions 121 if (action.getExternalId() != null) { 122 queue(new KillXCommand(action.getExternalId())); 123 // As the kill command for children is queued, set pending flag for coord action to be true 124 updateCoordAction(action, true); 125 LOG.debug( 126 "Killed coord action = [{0}], new status = [{1}], pending = [{2}] and queue KillXCommand for [{3}]", 127 action.getId(), action.getStatus(), action.getPending(), action.getExternalId()); 128 } 129 else { 130 // As killing children is not required, set pending flag for coord action to be false 131 updateCoordAction(action, false); 132 LOG.debug("Killed coord action = [{0}], current status = [{1}], pending = [{2}]", 133 action.getId(), action.getStatus(), action.getPending()); 134 } 135 } 136 } 137 138 updateList.add(coordJob); 139 140 LOG.debug("Killed coord actions for the coordinator=[{0}]", jobId); 141 } 142 143 @Override 144 public void notifyParent() throws CommandException { 145 // update bundle action 146 if (coordJob.getBundleId() != null) { 147 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus); 148 bundleStatusUpdate.call(); 149 } 150 } 151 152 @Override 153 public void updateJob() throws CommandException { 154 updateList.add(coordJob); 155 } 156 157 /* (non-Javadoc) 158 * @see org.apache.oozie.command.KillTransitionXCommand#performWrites() 159 */ 160 @Override 161 public void performWrites() throws CommandException { 162 try { 163 jpaService.execute(new BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, null)); 164 } 165 catch (JPAExecutorException e) { 166 throw new CommandException(e); 167 } 168 } 169 170 /* (non-Javadoc) 171 * @see org.apache.oozie.command.TransitionXCommand#getJob() 172 */ 173 @Override 174 public Job getJob() { 175 return coordJob; 176 } 177 178 /* (non-Javadoc) 179 * @see org.apache.oozie.command.XCommand#getKey() 180 */ 181 @Override 182 public String getKey(){ 183 return getName() + "_" + jobId; 184 } 185 186 }