001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.command.coord; 020 021import java.util.Date; 022import java.util.List; 023import org.apache.oozie.CoordinatorActionBean; 024import org.apache.oozie.CoordinatorActionInfo; 025import org.apache.oozie.CoordinatorJobBean; 026import org.apache.oozie.ErrorCode; 027import org.apache.oozie.XException; 028import org.apache.oozie.client.CoordinatorAction; 029import org.apache.oozie.client.CoordinatorJob; 030import org.apache.oozie.client.rest.RestConstants; 031import org.apache.oozie.command.CommandException; 032import org.apache.oozie.command.KillTransitionXCommand; 033import org.apache.oozie.command.PreconditionException; 034import org.apache.oozie.command.wf.KillXCommand; 035import org.apache.oozie.coord.CoordUtils; 036import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry; 037import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery; 038import org.apache.oozie.executor.jpa.BatchQueryExecutor; 039import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; 040import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; 041import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery; 042import org.apache.oozie.executor.jpa.JPAExecutorException; 043import org.apache.oozie.service.EventHandlerService; 044import org.apache.oozie.service.JPAService; 045import org.apache.oozie.service.Services; 046import org.apache.oozie.util.InstrumentUtils; 047import org.apache.oozie.util.LogUtils; 048import org.apache.oozie.util.ParamChecker; 049 050/** 051 * Kill coordinator actions by a range of dates (nominal time) or action number. 052 * <p> 053 * The "range" can be set with {@link RestConstants#JOB_COORD_SCOPE_DATE} or 054 * {@link RestConstants#JOB_COORD_SCOPE_ACTION}. 055 * <p> 056 */ 057public class CoordActionsKillXCommand extends KillTransitionXCommand<CoordinatorActionInfo> { 058 059 private String jobId; 060 private CoordinatorJobBean coordJob; 061 List<CoordinatorActionBean> coordActions; 062 private String rangeType; 063 private String scope; 064 private JPAService jpaService = null; 065 066 public CoordActionsKillXCommand(String id, String rangeType, String scope) { 067 super("coord_action_kill", "coord_action_kill", 2); 068 this.jobId = id; 069 this.rangeType = ParamChecker.notEmpty(rangeType, "rangeType"); 070 this.scope = ParamChecker.notEmpty(scope, "scope"); 071 } 072 073 @Override 074 protected boolean isLockRequired() { 075 return true; 076 } 077 078 @Override 079 public String getEntityKey() { 080 return this.jobId; 081 } 082 083 @Override 084 protected void loadState() throws CommandException { 085 try { 086 jpaService = Services.get().get(JPAService.class); 087 if (jpaService != null) { 088 coordJob = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB_ACTION_KILL, jobId); 089 LogUtils.setLogInfo(coordJob); 090 coordActions = CoordUtils.getCoordActions(rangeType, coordJob.getId(), scope, true); 091 } 092 else { 093 throw new CommandException(ErrorCode.E0610); 094 } 095 } 096 catch (XException ex) { 097 throw new CommandException(ex); 098 } 099 } 100 101 @Override 102 protected void verifyPrecondition() throws CommandException, PreconditionException { 103 if (coordJob.getStatus() == CoordinatorJob.Status.SUCCEEDED 104 || coordJob.getStatus() == CoordinatorJob.Status.FAILED 105 || coordJob.getStatus() == CoordinatorJob.Status.DONEWITHERROR 106 || coordJob.getStatus() == CoordinatorJob.Status.KILLED) { 107 LOG.info("Coord actions not killed - job either finished SUCCEEDED, FAILED, KILLED or DONEWITHERROR, job id = " 108 + jobId + ", status = " + coordJob.getStatus()); 109 throw new PreconditionException(ErrorCode.E1020, jobId); 110 } 111 } 112 113 @Override 114 public void transitToNext() { 115 } 116 117 @Override 118 public void killChildren() throws CommandException { 119 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation()); 120 for (CoordinatorActionBean coordAction : coordActions) { 121 coordAction.setStatus(CoordinatorAction.Status.KILLED); 122 coordAction.setLastModifiedTime(new Date()); 123 // kill Workflow job associated with this Coord action 124 if (coordAction.getExternalId() != null) { 125 queue(new KillXCommand(coordAction.getExternalId())); 126 coordAction.incrementAndGetPending(); 127 } 128 else { 129 coordAction.setPending(0); 130 } 131 updateList.add(new UpdateEntry(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME, coordAction)); 132 if (EventHandlerService.isEnabled()) { 133 CoordinatorXCommand.generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName(), 134 coordAction.getCreatedTime()); 135 } 136 queue(new CoordActionNotificationXCommand(coordAction), 100); 137 } 138 CoordinatorActionInfo coordInfo = new CoordinatorActionInfo(coordActions); 139 ret = coordInfo; 140 } 141 142 /* 143 * (non-Javadoc) 144 * 145 * @see org.apache.oozie.command.KillTransitionXCommand#performWrites() 146 */ 147 @Override 148 public void performWrites() throws CommandException { 149 try { 150 BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null); 151 } 152 catch (JPAExecutorException e) { 153 throw new CommandException(e); 154 } 155 } 156 157 @Override 158 public void updateJob() throws CommandException { 159 coordJob.setPending(); 160 updateList.add(new UpdateEntry(CoordJobQuery.UPDATE_COORD_JOB_STATUS_PENDING, coordJob)); 161 } 162 163 @Override 164 public void notifyParent() throws CommandException { 165 } 166 167}