001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.command.bundle; 020 021import java.util.Date; 022import java.util.List; 023 024import org.apache.oozie.BundleActionBean; 025import org.apache.oozie.BundleJobBean; 026import org.apache.oozie.CoordinatorJobBean; 027import org.apache.oozie.ErrorCode; 028import org.apache.oozie.XException; 029import org.apache.oozie.client.Job; 030import org.apache.oozie.command.CommandException; 031import org.apache.oozie.command.KillTransitionXCommand; 032import org.apache.oozie.command.PreconditionException; 033import org.apache.oozie.command.coord.CoordKillXCommand; 034import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry; 035import org.apache.oozie.executor.jpa.BundleActionQueryExecutor.BundleActionQuery; 036import org.apache.oozie.executor.jpa.BatchQueryExecutor; 037import org.apache.oozie.executor.jpa.BundleActionQueryExecutor; 038import org.apache.oozie.executor.jpa.BundleJobQueryExecutor; 039import org.apache.oozie.executor.jpa.BundleJobQueryExecutor.BundleJobQuery; 040import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery; 041import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; 042import org.apache.oozie.executor.jpa.JPAExecutorException; 043import org.apache.oozie.util.LogUtils; 044import org.apache.oozie.util.ParamChecker; 045 046public class BundleKillXCommand extends KillTransitionXCommand { 047 private final String jobId; 048 private BundleJobBean bundleJob; 049 private List<BundleActionBean> bundleActions; 050 051 public BundleKillXCommand(String jobId) { 052 super("bundle_kill", "bundle_kill", 1); 053 this.jobId = ParamChecker.notEmpty(jobId, "jobId"); 054 } 055 056 @Override 057 public String getEntityKey() { 058 return jobId; 059 } 060 061 @Override 062 public String getKey() { 063 return getName() + "_" + jobId; 064 } 065 066 @Override 067 protected boolean isLockRequired() { 068 return true; 069 } 070 071 @Override 072 public void loadState() throws CommandException { 073 try { 074 this.bundleJob = BundleJobQueryExecutor.getInstance().get(BundleJobQuery.GET_BUNDLE_JOB, jobId); 075 this.bundleActions = BundleActionQueryExecutor.getInstance().getList( 076 BundleActionQuery.GET_BUNDLE_ACTIONS_STATUS_UNIGNORED_FOR_BUNDLE, jobId); 077 LogUtils.setLogInfo(bundleJob); 078 super.setJob(bundleJob); 079 080 } 081 catch (XException ex) { 082 throw new CommandException(ex); 083 } 084 } 085 086 @Override 087 protected void verifyPrecondition() throws CommandException, PreconditionException { 088 if (bundleJob.getStatus() == Job.Status.SUCCEEDED 089 || bundleJob.getStatus() == Job.Status.FAILED 090 || bundleJob.getStatus() == Job.Status.DONEWITHERROR 091 || bundleJob.getStatus() == Job.Status.KILLED) { 092 LOG.info("Bundle job cannot be killed - job already SUCCEEDED, FAILED, KILLED or DONEWITHERROR, job id = " 093 + jobId + ", status = " + bundleJob.getStatus()); 094 throw new PreconditionException(ErrorCode.E1323, jobId); 095 } 096 } 097 098 @Override 099 public void killChildren() throws CommandException { 100 if (bundleActions != null) { 101 for (BundleActionBean action : bundleActions) { 102 if (action.getCoordId() != null) { 103 queue(new CoordKillXCommand(action.getCoordId())); 104 updateBundleAction(action); 105 LOG.debug("Killed bundle action = [{0}], new status = [{1}], pending = [{2}] and queue CoordKillXCommand for [{3}]", 106 action.getBundleActionId(), action.getStatus(), action.getPending(), action.getCoordId()); 107 } else { 108 updateBundleAction(action); 109 LOG.debug("Killed bundle action = [{0}], current status = [{1}], pending = [{2}]", action.getBundleActionId(), action 110 .getStatus(), action.getPending()); 111 } 112 113 } 114 } 115 LOG.debug("Killed coord jobs for the bundle=[{0}]", jobId); 116 } 117 118 /** 119 * Update bundle action 120 * 121 * @param action 122 * @throws CommandException 123 */ 124 private void updateBundleAction(BundleActionBean action) { 125 action.setLastModifiedTime(new Date()); 126 if (!action.isTerminalStatus()) { 127 action.incrementAndGetPending(); 128 action.setStatus(Job.Status.KILLED); 129 } 130 else { 131 // Due to race condition bundle action pending might be true 132 // while coordinator is killed. 133 if (action.isPending()) { 134 if (action.getCoordId() == null) { 135 action.setPending(0); 136 } 137 else { 138 try { 139 CoordinatorJobBean coordJob = CoordJobQueryExecutor.getInstance().get( 140 CoordJobQuery.GET_COORD_JOB, action.getCoordId()); 141 if (!coordJob.isPending() && coordJob.isTerminalStatus()) { 142 action.setPending(0); 143 action.setStatus(coordJob.getStatus()); 144 } 145 } 146 catch (JPAExecutorException e) { 147 LOG.warn("Error in checking coord job status:" + action.getCoordId(), e); 148 } 149 } 150 } 151 } 152 updateList.add(new UpdateEntry<BundleActionQuery>(BundleActionQuery.UPDATE_BUNDLE_ACTION_STATUS_PENDING_MODTIME, action)); 153 } 154 155 @Override 156 public void notifyParent() { 157 } 158 159 @Override 160 public Job getJob() { 161 return bundleJob; 162 } 163 164 @Override 165 public void updateJob() { 166 updateList.add(new UpdateEntry<BundleJobQuery>(BundleJobQuery.UPDATE_BUNDLE_JOB_STATUS_PENDING_MODTIME, bundleJob)); 167 } 168 169 @Override 170 public void performWrites() throws CommandException { 171 try { 172 BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null); 173 } 174 catch (JPAExecutorException e) { 175 throw new CommandException(e); 176 } 177 } 178 179}