001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.coord; 019 020 import org.apache.oozie.client.CoordinatorAction; 021 import org.apache.oozie.client.CoordinatorJob; 022 import org.apache.oozie.client.Job; 023 import org.apache.oozie.CoordinatorActionBean; 024 import org.apache.oozie.CoordinatorJobBean; 025 import org.apache.oozie.ErrorCode; 026 import org.apache.oozie.XException; 027 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand; 028 import org.apache.oozie.command.wf.KillXCommand; 029 import org.apache.oozie.command.CommandException; 030 import org.apache.oozie.command.KillTransitionXCommand; 031 import org.apache.oozie.command.PreconditionException; 032 import org.apache.oozie.dependency.DependencyChecker; 033 import org.apache.oozie.executor.jpa.BulkUpdateInsertForCoordActionStatusJPAExecutor; 034 import org.apache.oozie.executor.jpa.CoordJobGetActionsNotCompletedJPAExecutor; 035 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; 036 import org.apache.oozie.executor.jpa.JPAExecutorException; 037 import org.apache.oozie.service.EventHandlerService; 038 import org.apache.oozie.service.JPAService; 039 import org.apache.oozie.service.Services; 040 import org.apache.oozie.util.LogUtils; 041 import org.apache.oozie.util.ParamChecker; 042 import org.apache.oozie.util.StatusUtils; 043 044 import java.util.Arrays; 045 import java.util.Date; 046 import java.util.List; 047 048 public class CoordKillXCommand extends KillTransitionXCommand { 049 050 private final String jobId; 051 private CoordinatorJobBean coordJob; 052 private List<CoordinatorActionBean> actionList; 053 private JPAService jpaService = null; 054 private CoordinatorJob.Status prevStatus = null; 055 056 public CoordKillXCommand(String id) { 057 super("coord_kill", "coord_kill", 2); 058 this.jobId = ParamChecker.notEmpty(id, "id"); 059 } 060 061 @Override 062 protected boolean isLockRequired() { 063 return true; 064 } 065 066 @Override 067 public String getEntityKey() { 068 return this.jobId; 069 } 070 071 @Override 072 public String getKey() { 073 return getName() + "_" + this.jobId; 074 } 075 076 @Override 077 protected void loadState() throws CommandException { 078 try { 079 jpaService = Services.get().get(JPAService.class); 080 081 if (jpaService != null) { 082 this.coordJob = jpaService.execute(new CoordJobGetJPAExecutor(jobId)); 083 //Get actions which are not succeeded, failed, timed out or killed 084 this.actionList = jpaService.execute(new CoordJobGetActionsNotCompletedJPAExecutor(jobId)); 085 prevStatus = coordJob.getStatus(); 086 LogUtils.setLogInfo(coordJob, logInfo); 087 } 088 else { 089 throw new CommandException(ErrorCode.E0610); 090 } 091 } 092 catch (XException ex) { 093 throw new CommandException(ex); 094 } 095 } 096 097 @Override 098 protected void verifyPrecondition() throws CommandException, PreconditionException { 099 // if namespace 0.1 is used and backward support is true, SUCCEEDED coord job can be killed 100 if (StatusUtils.isV1CoordjobKillable(coordJob)) { 101 return; 102 } 103 if (coordJob.getStatus() == CoordinatorJob.Status.SUCCEEDED 104 || coordJob.getStatus() == CoordinatorJob.Status.FAILED 105 || coordJob.getStatus() == CoordinatorJob.Status.DONEWITHERROR 106 || coordJob.getStatus() == CoordinatorJob.Status.KILLED) { 107 LOG.info("CoordKillXCommand not killed - job either finished SUCCEEDED, FAILED, KILLED or DONEWITHERROR, job id = " 108 + jobId + ", status = " + coordJob.getStatus()); 109 throw new PreconditionException(ErrorCode.E1020, jobId); 110 } 111 } 112 113 private void updateCoordAction(CoordinatorActionBean action, boolean makePending) { 114 CoordinatorAction.Status prevStatus = action.getStatus(); 115 action.setStatus(CoordinatorActionBean.Status.KILLED); 116 if (makePending) { 117 action.incrementAndGetPending(); 118 } else { 119 // set pending to false 120 action.setPending(0); 121 } 122 if (EventHandlerService.isEnabled() && prevStatus != CoordinatorAction.Status.RUNNING 123 && prevStatus != CoordinatorAction.Status.SUSPENDED) { 124 CoordinatorXCommand.generateEvent(action, coordJob.getUser(), coordJob.getAppName(), null); 125 } 126 action.setLastModifiedTime(new Date()); 127 updateList.add(action); 128 } 129 130 @Override 131 public void killChildren() throws CommandException { 132 if (actionList != null) { 133 for (CoordinatorActionBean action : actionList) { 134 // queue a WorkflowKillXCommand to delete the workflow job and actions 135 if (action.getExternalId() != null) { 136 queue(new KillXCommand(action.getExternalId())); 137 // As the kill command for children is queued, set pending flag for coord action to be true 138 updateCoordAction(action, true); 139 LOG.debug( 140 "Killed coord action = [{0}], new status = [{1}], pending = [{2}] and queue KillXCommand for [{3}]", 141 action.getId(), action.getStatus(), action.getPending(), action.getExternalId()); 142 } 143 else { 144 // As killing children is not required, set pending flag for coord action to be false 145 updateCoordAction(action, false); 146 LOG.debug("Killed coord action = [{0}], current status = [{1}], pending = [{2}]", 147 action.getId(), action.getStatus(), action.getPending()); 148 } 149 String pushMissingDeps = action.getPushMissingDependencies(); 150 if (pushMissingDeps != null) { 151 CoordPushDependencyCheckXCommand.unregisterMissingDependencies( 152 Arrays.asList(DependencyChecker.dependenciesAsArray(pushMissingDeps)), action.getId()); 153 } 154 } 155 } 156 coordJob.setDoneMaterialization(); 157 updateList.add(coordJob); 158 159 LOG.debug("Killed coord actions for the coordinator=[{0}]", jobId); 160 } 161 162 @Override 163 public void notifyParent() throws CommandException { 164 // update bundle action 165 if (coordJob.getBundleId() != null) { 166 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus); 167 bundleStatusUpdate.call(); 168 } 169 } 170 171 @Override 172 public void updateJob() throws CommandException { 173 updateList.add(coordJob); 174 } 175 176 @Override 177 public void performWrites() throws CommandException { 178 try { 179 jpaService.execute(new BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, null)); 180 } 181 catch (JPAExecutorException e) { 182 throw new CommandException(e); 183 } 184 } 185 186 @Override 187 public Job getJob() { 188 return coordJob; 189 } 190 191 }