001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.command.coord; 020 021import org.apache.oozie.client.CoordinatorAction; 022import org.apache.oozie.client.CoordinatorJob; 023import org.apache.oozie.client.Job; 024import org.apache.oozie.CoordinatorActionBean; 025import org.apache.oozie.CoordinatorJobBean; 026import org.apache.oozie.ErrorCode; 027import org.apache.oozie.XException; 028import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand; 029import org.apache.oozie.command.wf.KillXCommand; 030import org.apache.oozie.command.CommandException; 031import org.apache.oozie.command.KillTransitionXCommand; 032import org.apache.oozie.command.PreconditionException; 033import org.apache.oozie.dependency.DependencyChecker; 034import org.apache.oozie.executor.jpa.BatchQueryExecutor; 035import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry; 036import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery; 037import org.apache.oozie.executor.jpa.CoordJobGetActionsNotCompletedJPAExecutor; 038import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; 039import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery; 040import org.apache.oozie.executor.jpa.JPAExecutorException; 041import org.apache.oozie.service.EventHandlerService; 042import org.apache.oozie.service.JPAService; 043import org.apache.oozie.service.Services; 044import org.apache.oozie.util.LogUtils; 045import org.apache.oozie.util.ParamChecker; 046import org.apache.oozie.util.StatusUtils; 047 048import java.util.Arrays; 049import java.util.Date; 050import java.util.List; 051 052public class CoordKillXCommand extends KillTransitionXCommand { 053 054 private final String jobId; 055 private CoordinatorJobBean coordJob; 056 private List<CoordinatorActionBean> actionList; 057 private JPAService jpaService = null; 058 private CoordinatorJob.Status prevStatus = null; 059 060 public CoordKillXCommand(String id) { 061 super("coord_kill", "coord_kill", 2); 062 this.jobId = ParamChecker.notEmpty(id, "id"); 063 } 064 065 @Override 066 protected boolean isLockRequired() { 067 return true; 068 } 069 070 @Override 071 public String getEntityKey() { 072 return this.jobId; 073 } 074 075 @Override 076 public String getKey() { 077 return getName() + "_" + this.jobId; 078 } 079 080 @Override 081 protected void loadState() throws CommandException { 082 try { 083 jpaService = Services.get().get(JPAService.class); 084 085 if (jpaService != null) { 086 this.coordJob = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB_SUSPEND_KILL, jobId); 087 //Get actions which are not succeeded, failed, timed out or killed 088 this.actionList = jpaService.execute(new CoordJobGetActionsNotCompletedJPAExecutor(jobId)); 089 prevStatus = coordJob.getStatus(); 090 LogUtils.setLogInfo(coordJob); 091 } 092 else { 093 throw new CommandException(ErrorCode.E0610); 094 } 095 } 096 catch (XException ex) { 097 throw new CommandException(ex); 098 } 099 } 100 101 @Override 102 protected void verifyPrecondition() throws CommandException, PreconditionException { 103 // if namespace 0.1 is used and backward support is true, SUCCEEDED coord job can be killed 104 if (StatusUtils.isV1CoordjobKillable(coordJob)) { 105 return; 106 } 107 if (coordJob.getStatus() == CoordinatorJob.Status.SUCCEEDED 108 || coordJob.getStatus() == CoordinatorJob.Status.FAILED 109 || coordJob.getStatus() == CoordinatorJob.Status.DONEWITHERROR 110 || coordJob.getStatus() == CoordinatorJob.Status.KILLED 111 || coordJob.getStatus() == CoordinatorJob.Status.IGNORED) { 112 LOG.info("CoordKillXCommand not killed - job either finished SUCCEEDED, FAILED, KILLED, DONEWITHERROR" 113 + " or IGNORED,job id = " + jobId + ", status = " + coordJob.getStatus()); 114 throw new PreconditionException(ErrorCode.E1020, jobId); 115 } 116 } 117 118 private void updateCoordAction(CoordinatorActionBean action, boolean makePending) { 119 CoordinatorAction.Status prevStatus = action.getStatus(); 120 action.setStatus(CoordinatorActionBean.Status.KILLED); 121 if (makePending) { 122 action.incrementAndGetPending(); 123 } else { 124 // set pending to false 125 action.setPending(0); 126 } 127 if (EventHandlerService.isEnabled() && prevStatus != CoordinatorAction.Status.RUNNING 128 && prevStatus != CoordinatorAction.Status.SUSPENDED) { 129 CoordinatorXCommand.generateEvent(action, coordJob.getUser(), coordJob.getAppName(), null); 130 } 131 action.setLastModifiedTime(new Date()); 132 updateList.add(new UpdateEntry<CoordActionQuery>(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME,action)); 133 } 134 135 @Override 136 public void killChildren() throws CommandException { 137 if (actionList != null) { 138 for (CoordinatorActionBean action : actionList) { 139 // queue a WorkflowKillXCommand to delete the workflow job and actions 140 if (action.getExternalId() != null) { 141 queue(new KillXCommand(action.getExternalId())); 142 // As the kill command for children is queued, set pending flag for coord action to be true 143 updateCoordAction(action, true); 144 LOG.debug( 145 "Killed coord action = [{0}], new status = [{1}], pending = [{2}] and queue KillXCommand for [{3}]", 146 action.getId(), action.getStatus(), action.getPending(), action.getExternalId()); 147 } 148 else { 149 // As killing children is not required, set pending flag for coord action to be false 150 updateCoordAction(action, false); 151 LOG.debug("Killed coord action = [{0}], current status = [{1}], pending = [{2}]", 152 action.getId(), action.getStatus(), action.getPending()); 153 } 154 String pushMissingDeps = action.getPushMissingDependencies(); 155 if (pushMissingDeps != null) { 156 CoordPushDependencyCheckXCommand.unregisterMissingDependencies( 157 Arrays.asList(DependencyChecker.dependenciesAsArray(pushMissingDeps)), action.getId()); 158 } 159 } 160 } 161 coordJob.setDoneMaterialization(); 162 coordJob.setLastModifiedTime(new Date()); 163 LOG.debug("Killed coord actions for the coordinator=[{0}]", jobId); 164 } 165 166 @Override 167 public void notifyParent() throws CommandException { 168 // update bundle action 169 if (coordJob.getBundleId() != null) { 170 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus); 171 bundleStatusUpdate.call(); 172 } 173 } 174 175 @Override 176 public void updateJob() throws CommandException { 177 updateList.add(new UpdateEntry<CoordJobQuery>(CoordJobQuery.UPDATE_COORD_JOB_STATUS_PENDING_TIME, coordJob)); 178 } 179 180 @Override 181 public void performWrites() throws CommandException { 182 try { 183 BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null); 184 } 185 catch (JPAExecutorException e) { 186 throw new CommandException(e); 187 } 188 } 189 190 @Override 191 public Job getJob() { 192 return coordJob; 193 } 194 195}