001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.bundle; 019 020 import java.util.Date; 021 import java.util.List; 022 023 import org.apache.oozie.BundleActionBean; 024 import org.apache.oozie.BundleJobBean; 025 import org.apache.oozie.ErrorCode; 026 import org.apache.oozie.XException; 027 import org.apache.oozie.client.CoordinatorJob; 028 import org.apache.oozie.client.Job; 029 import org.apache.oozie.command.CommandException; 030 import org.apache.oozie.command.KillTransitionXCommand; 031 import org.apache.oozie.command.PreconditionException; 032 import org.apache.oozie.command.coord.CoordKillXCommand; 033 import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor; 034 import org.apache.oozie.executor.jpa.BundleActionsGetJPAExecutor; 035 import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor; 036 import org.apache.oozie.executor.jpa.JPAExecutorException; 037 import org.apache.oozie.service.JPAService; 038 import org.apache.oozie.service.Services; 039 import org.apache.oozie.util.LogUtils; 040 import org.apache.oozie.util.ParamChecker; 041 042 public class BundleKillXCommand extends KillTransitionXCommand { 043 private final String jobId; 044 private BundleJobBean bundleJob; 045 private List<BundleActionBean> bundleActions; 046 private JPAService jpaService = null; 047 048 public BundleKillXCommand(String jobId) { 049 super("bundle_kill", "bundle_kill", 1); 050 this.jobId = ParamChecker.notEmpty(jobId, "jobId"); 051 } 052 053 /* (non-Javadoc) 054 * @see org.apache.oozie.command.XCommand#getEntityKey() 055 */ 056 @Override 057 public String getEntityKey() { 058 return jobId; 059 } 060 061 /* (non-Javadoc) 062 * @see org.apache.oozie.command.XCommand#isLockRequired() 063 */ 064 @Override 065 protected boolean isLockRequired() { 066 return true; 067 } 068 069 /* (non-Javadoc) 070 * @see org.apache.oozie.command.XCommand#loadState() 071 */ 072 @Override 073 public void loadState() throws CommandException { 074 try { 075 jpaService = Services.get().get(JPAService.class); 076 077 if (jpaService != null) { 078 this.bundleJob = jpaService.execute(new BundleJobGetJPAExecutor(jobId)); 079 this.bundleActions = jpaService.execute(new BundleActionsGetJPAExecutor(jobId)); 080 LogUtils.setLogInfo(bundleJob, logInfo); 081 super.setJob(bundleJob); 082 083 } 084 else { 085 throw new CommandException(ErrorCode.E0610); 086 } 087 } 088 catch (XException ex) { 089 throw new CommandException(ex); 090 } 091 } 092 093 /* (non-Javadoc) 094 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 095 */ 096 @Override 097 protected void verifyPrecondition() throws CommandException, PreconditionException { 098 if (bundleJob.getStatus() == Job.Status.SUCCEEDED 099 || bundleJob.getStatus() == Job.Status.FAILED 100 || bundleJob.getStatus() == Job.Status.DONEWITHERROR 101 || bundleJob.getStatus() == Job.Status.KILLED) { 102 LOG.info("Bundle job cannot be killed - job already SUCCEEDED, FAILED, KILLED or DONEWITHERROR, job id = " 103 + jobId + ", status = " + bundleJob.getStatus()); 104 throw new PreconditionException(ErrorCode.E1020, jobId); 105 } 106 } 107 108 /* (non-Javadoc) 109 * @see org.apache.oozie.command.KillTransitionXCommand#killChildren() 110 */ 111 @Override 112 public void killChildren() throws CommandException { 113 if (bundleActions != null) { 114 for (BundleActionBean action : bundleActions) { 115 if (action.getCoordId() != null) { 116 queue(new CoordKillXCommand(action.getCoordId())); 117 updateBundleAction(action); 118 LOG.debug("Killed bundle action = [{0}], new status = [{1}], pending = [{2}] and queue CoordKillXCommand for [{3}]", 119 action.getBundleActionId(), action.getStatus(), action.getPending(), action.getCoordId()); 120 } else { 121 updateBundleAction(action); 122 LOG.debug("Killed bundle action = [{0}], current status = [{1}], pending = [{2}]", action.getBundleActionId(), action 123 .getStatus(), action.getPending()); 124 } 125 126 } 127 } 128 LOG.debug("Killed coord jobs for the bundle=[{0}]", jobId); 129 } 130 131 /** 132 * Update bundle action 133 * 134 * @param action 135 * @throws CommandException 136 */ 137 private void updateBundleAction(BundleActionBean action) { 138 action.setLastModifiedTime(new Date()); 139 if (!action.isTerminalStatus()) { 140 action.incrementAndGetPending(); 141 action.setStatus(Job.Status.KILLED); 142 } 143 updateList.add(action); 144 } 145 146 /* (non-Javadoc) 147 * @see org.apache.oozie.command.TransitionXCommand#notifyParent() 148 */ 149 @Override 150 public void notifyParent() { 151 } 152 153 /* (non-Javadoc) 154 * @see org.apache.oozie.command.TransitionXCommand#getJob() 155 */ 156 @Override 157 public Job getJob() { 158 return bundleJob; 159 } 160 161 /* (non-Javadoc) 162 * @see org.apache.oozie.command.TransitionXCommand#updateJob() 163 */ 164 @Override 165 public void updateJob() { 166 updateList.add(bundleJob); 167 } 168 169 /* (non-Javadoc) 170 * @see org.apache.oozie.command.KillTransitionXCommand#performWrites() 171 */ 172 @Override 173 public void performWrites() throws CommandException { 174 try { 175 jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, null)); 176 } 177 catch (JPAExecutorException e) { 178 throw new CommandException(e); 179 } 180 } 181 182 }