001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.command.bundle; 019 020import java.util.Date; 021import java.util.List; 022 023import org.apache.oozie.BundleActionBean; 024import org.apache.oozie.BundleJobBean; 025import org.apache.oozie.CoordinatorJobBean; 026import org.apache.oozie.ErrorCode; 027import org.apache.oozie.XException; 028import org.apache.oozie.client.Job; 029import org.apache.oozie.command.CommandException; 030import org.apache.oozie.command.KillTransitionXCommand; 031import org.apache.oozie.command.PreconditionException; 032import org.apache.oozie.command.coord.CoordKillXCommand; 033import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry; 034import org.apache.oozie.executor.jpa.BundleActionQueryExecutor.BundleActionQuery; 035import org.apache.oozie.executor.jpa.BatchQueryExecutor; 036import org.apache.oozie.executor.jpa.BundleActionQueryExecutor; 037import org.apache.oozie.executor.jpa.BundleJobQueryExecutor; 038import org.apache.oozie.executor.jpa.BundleJobQueryExecutor.BundleJobQuery; 039import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery; 040import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; 041import org.apache.oozie.executor.jpa.JPAExecutorException; 042import org.apache.oozie.util.LogUtils; 043import org.apache.oozie.util.ParamChecker; 044 045public class BundleKillXCommand extends KillTransitionXCommand { 046 private final String jobId; 047 private BundleJobBean bundleJob; 048 private List<BundleActionBean> bundleActions; 049 050 public BundleKillXCommand(String jobId) { 051 super("bundle_kill", "bundle_kill", 1); 052 this.jobId = ParamChecker.notEmpty(jobId, "jobId"); 053 } 054 055 @Override 056 public String getEntityKey() { 057 return jobId; 058 } 059 060 @Override 061 public String getKey() { 062 return getName() + "_" + jobId; 063 } 064 065 @Override 066 protected boolean isLockRequired() { 067 return true; 068 } 069 070 @Override 071 public void loadState() throws CommandException { 072 try { 073 this.bundleJob = BundleJobQueryExecutor.getInstance().get(BundleJobQuery.GET_BUNDLE_JOB, jobId); 074 this.bundleActions = BundleActionQueryExecutor.getInstance().getList( 075 BundleActionQuery.GET_BUNDLE_ACTIONS_STATUS_UNIGNORED_FOR_BUNDLE, jobId); 076 LogUtils.setLogInfo(bundleJob, logInfo); 077 super.setJob(bundleJob); 078 079 } 080 catch (XException ex) { 081 throw new CommandException(ex); 082 } 083 } 084 085 @Override 086 protected void verifyPrecondition() throws CommandException, PreconditionException { 087 if (bundleJob.getStatus() == Job.Status.SUCCEEDED 088 || bundleJob.getStatus() == Job.Status.FAILED 089 || bundleJob.getStatus() == Job.Status.DONEWITHERROR 090 || bundleJob.getStatus() == Job.Status.KILLED) { 091 LOG.info("Bundle job cannot be killed - job already SUCCEEDED, FAILED, KILLED or DONEWITHERROR, job id = " 092 + jobId + ", status = " + bundleJob.getStatus()); 093 throw new PreconditionException(ErrorCode.E1020, jobId); 094 } 095 } 096 097 @Override 098 public void killChildren() throws CommandException { 099 if (bundleActions != null) { 100 for (BundleActionBean action : bundleActions) { 101 if (action.getCoordId() != null) { 102 queue(new CoordKillXCommand(action.getCoordId())); 103 updateBundleAction(action); 104 LOG.debug("Killed bundle action = [{0}], new status = [{1}], pending = [{2}] and queue CoordKillXCommand for [{3}]", 105 action.getBundleActionId(), action.getStatus(), action.getPending(), action.getCoordId()); 106 } else { 107 updateBundleAction(action); 108 LOG.debug("Killed bundle action = [{0}], current status = [{1}], pending = [{2}]", action.getBundleActionId(), action 109 .getStatus(), action.getPending()); 110 } 111 112 } 113 } 114 LOG.debug("Killed coord jobs for the bundle=[{0}]", jobId); 115 } 116 117 /** 118 * Update bundle action 119 * 120 * @param action 121 * @throws CommandException 122 */ 123 private void updateBundleAction(BundleActionBean action) { 124 action.setLastModifiedTime(new Date()); 125 if (!action.isTerminalStatus()) { 126 action.incrementAndGetPending(); 127 action.setStatus(Job.Status.KILLED); 128 } 129 else { 130 // Due to race condition bundle action pending might be true 131 // while coordinator is killed. 132 if (action.isPending()) { 133 if (action.getCoordId() == null) { 134 action.setPending(0); 135 } 136 else { 137 try { 138 CoordinatorJobBean coordJob = CoordJobQueryExecutor.getInstance().get( 139 CoordJobQuery.GET_COORD_JOB, action.getCoordId()); 140 if (!coordJob.isPending() && coordJob.isTerminalStatus()) { 141 action.setPending(0); 142 action.setStatus(coordJob.getStatus()); 143 } 144 } 145 catch (JPAExecutorException e) { 146 LOG.warn("Error in checking coord job status:" + action.getCoordId(), e); 147 } 148 } 149 } 150 } 151 updateList.add(new UpdateEntry<BundleActionQuery>(BundleActionQuery.UPDATE_BUNDLE_ACTION_STATUS_PENDING_MODTIME, action)); 152 } 153 154 @Override 155 public void notifyParent() { 156 } 157 158 @Override 159 public Job getJob() { 160 return bundleJob; 161 } 162 163 @Override 164 public void updateJob() { 165 updateList.add(new UpdateEntry<BundleJobQuery>(BundleJobQuery.UPDATE_BUNDLE_JOB_STATUS_PENDING_MODTIME, bundleJob)); 166 } 167 168 @Override 169 public void performWrites() throws CommandException { 170 try { 171 BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null); 172 } 173 catch (JPAExecutorException e) { 174 throw new CommandException(e); 175 } 176 } 177 178}