001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.bundle; 019 020 import java.util.Date; 021 import java.util.List; 022 023 import org.apache.oozie.BundleActionBean; 024 import org.apache.oozie.BundleJobBean; 025 import org.apache.oozie.ErrorCode; 026 import org.apache.oozie.XException; 027 import org.apache.oozie.client.Job; 028 import org.apache.oozie.command.CommandException; 029 import org.apache.oozie.command.KillTransitionXCommand; 030 import org.apache.oozie.command.PreconditionException; 031 import org.apache.oozie.command.coord.CoordKillXCommand; 032 import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor; 033 import org.apache.oozie.executor.jpa.BundleActionsGetJPAExecutor; 034 import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor; 035 import org.apache.oozie.executor.jpa.JPAExecutorException; 036 import org.apache.oozie.service.JPAService; 037 import org.apache.oozie.service.Services; 038 import org.apache.oozie.util.LogUtils; 039 import org.apache.oozie.util.ParamChecker; 040 041 public class BundleKillXCommand extends KillTransitionXCommand { 042 private final String jobId; 043 private BundleJobBean bundleJob; 044 private List<BundleActionBean> bundleActions; 045 private JPAService jpaService = null; 046 047 public BundleKillXCommand(String jobId) { 048 super("bundle_kill", "bundle_kill", 1); 049 this.jobId = ParamChecker.notEmpty(jobId, "jobId"); 050 } 051 052 /* (non-Javadoc) 053 * @see org.apache.oozie.command.XCommand#getEntityKey() 054 */ 055 @Override 056 public String getEntityKey() { 057 return jobId; 058 } 059 060 @Override 061 public String getKey() { 062 return getName() + "_" + jobId; 063 } 064 065 /* (non-Javadoc) 066 * @see org.apache.oozie.command.XCommand#isLockRequired() 067 */ 068 @Override 069 protected boolean isLockRequired() { 070 return true; 071 } 072 073 /* (non-Javadoc) 074 * @see org.apache.oozie.command.XCommand#loadState() 075 */ 076 @Override 077 public void loadState() throws CommandException { 078 try { 079 jpaService = Services.get().get(JPAService.class); 080 081 if (jpaService != null) { 082 this.bundleJob = jpaService.execute(new BundleJobGetJPAExecutor(jobId)); 083 this.bundleActions = jpaService.execute(new BundleActionsGetJPAExecutor(jobId)); 084 LogUtils.setLogInfo(bundleJob, logInfo); 085 super.setJob(bundleJob); 086 087 } 088 else { 089 throw new CommandException(ErrorCode.E0610); 090 } 091 } 092 catch (XException ex) { 093 throw new CommandException(ex); 094 } 095 } 096 097 /* (non-Javadoc) 098 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 099 */ 100 @Override 101 protected void verifyPrecondition() throws CommandException, PreconditionException { 102 if (bundleJob.getStatus() == Job.Status.SUCCEEDED 103 || bundleJob.getStatus() == Job.Status.FAILED 104 || bundleJob.getStatus() == Job.Status.DONEWITHERROR 105 || bundleJob.getStatus() == Job.Status.KILLED) { 106 LOG.info("Bundle job cannot be killed - job already SUCCEEDED, FAILED, KILLED or DONEWITHERROR, job id = " 107 + jobId + ", status = " + bundleJob.getStatus()); 108 throw new PreconditionException(ErrorCode.E1020, jobId); 109 } 110 } 111 112 /* (non-Javadoc) 113 * @see org.apache.oozie.command.KillTransitionXCommand#killChildren() 114 */ 115 @Override 116 public void killChildren() throws CommandException { 117 if (bundleActions != null) { 118 for (BundleActionBean action : bundleActions) { 119 if (action.getCoordId() != null) { 120 queue(new CoordKillXCommand(action.getCoordId())); 121 updateBundleAction(action); 122 LOG.debug("Killed bundle action = [{0}], new status = [{1}], pending = [{2}] and queue CoordKillXCommand for [{3}]", 123 action.getBundleActionId(), action.getStatus(), action.getPending(), action.getCoordId()); 124 } else { 125 updateBundleAction(action); 126 LOG.debug("Killed bundle action = [{0}], current status = [{1}], pending = [{2}]", action.getBundleActionId(), action 127 .getStatus(), action.getPending()); 128 } 129 130 } 131 } 132 LOG.debug("Killed coord jobs for the bundle=[{0}]", jobId); 133 } 134 135 /** 136 * Update bundle action 137 * 138 * @param action 139 * @throws CommandException 140 */ 141 private void updateBundleAction(BundleActionBean action) { 142 action.setLastModifiedTime(new Date()); 143 if (!action.isTerminalStatus()) { 144 action.incrementAndGetPending(); 145 action.setStatus(Job.Status.KILLED); 146 } 147 updateList.add(action); 148 } 149 150 /* (non-Javadoc) 151 * @see org.apache.oozie.command.TransitionXCommand#notifyParent() 152 */ 153 @Override 154 public void notifyParent() { 155 } 156 157 /* (non-Javadoc) 158 * @see org.apache.oozie.command.TransitionXCommand#getJob() 159 */ 160 @Override 161 public Job getJob() { 162 return bundleJob; 163 } 164 165 /* (non-Javadoc) 166 * @see org.apache.oozie.command.TransitionXCommand#updateJob() 167 */ 168 @Override 169 public void updateJob() { 170 updateList.add(bundleJob); 171 } 172 173 /* (non-Javadoc) 174 * @see org.apache.oozie.command.KillTransitionXCommand#performWrites() 175 */ 176 @Override 177 public void performWrites() throws CommandException { 178 try { 179 jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, null)); 180 } 181 catch (JPAExecutorException e) { 182 throw new CommandException(e); 183 } 184 } 185 186 }