001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.command.coord; 019 020import java.util.Date; 021import java.util.List; 022import org.apache.oozie.CoordinatorActionBean; 023import org.apache.oozie.CoordinatorActionInfo; 024import org.apache.oozie.CoordinatorJobBean; 025import org.apache.oozie.ErrorCode; 026import org.apache.oozie.XException; 027import org.apache.oozie.client.CoordinatorAction; 028import org.apache.oozie.client.CoordinatorJob; 029import org.apache.oozie.client.rest.RestConstants; 030import org.apache.oozie.command.CommandException; 031import org.apache.oozie.command.KillTransitionXCommand; 032import org.apache.oozie.command.PreconditionException; 033import org.apache.oozie.command.wf.KillXCommand; 034import org.apache.oozie.coord.CoordUtils; 035import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry; 036import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery; 037import org.apache.oozie.executor.jpa.BatchQueryExecutor; 038import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; 039import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; 040import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery; 041import org.apache.oozie.executor.jpa.JPAExecutorException; 042import org.apache.oozie.service.EventHandlerService; 043import org.apache.oozie.service.JPAService; 044import org.apache.oozie.service.Services; 045import org.apache.oozie.util.InstrumentUtils; 046import org.apache.oozie.util.LogUtils; 047import org.apache.oozie.util.ParamChecker; 048 049/** 050 * Kill coordinator actions by a range of dates (nominal time) or action number. 051 * <p/> 052 * The "range" can be set with {@link RestConstants.JOB_COORD_SCOPE_DATE} or 053 * {@link RestConstants.JOB_COORD_SCOPE_ACTION}. 054 * <p/> 055 */ 056public class CoordActionsKillXCommand extends KillTransitionXCommand<CoordinatorActionInfo> { 057 058 private String jobId; 059 private CoordinatorJobBean coordJob; 060 List<CoordinatorActionBean> coordActions; 061 private String rangeType; 062 private String scope; 063 private JPAService jpaService = null; 064 065 public CoordActionsKillXCommand(String id, String rangeType, String scope) { 066 super("coord_action_kill", "coord_action_kill", 2); 067 this.jobId = id; 068 this.rangeType = ParamChecker.notEmpty(rangeType, "rangeType"); 069 this.scope = ParamChecker.notEmpty(scope, "scope"); 070 } 071 072 @Override 073 protected boolean isLockRequired() { 074 return true; 075 } 076 077 @Override 078 public String getEntityKey() { 079 return this.jobId; 080 } 081 082 @Override 083 protected void loadState() throws CommandException { 084 try { 085 jpaService = Services.get().get(JPAService.class); 086 if (jpaService != null) { 087 coordJob = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB_ACTION_KILL, jobId); 088 LogUtils.setLogInfo(coordJob, logInfo); 089 coordActions = CoordUtils.getCoordActions(rangeType, coordJob.getId(), scope, true); 090 } 091 else { 092 throw new CommandException(ErrorCode.E0610); 093 } 094 } 095 catch (XException ex) { 096 throw new CommandException(ex); 097 } 098 } 099 100 @Override 101 protected void verifyPrecondition() throws CommandException, PreconditionException { 102 if (coordJob.getStatus() == CoordinatorJob.Status.SUCCEEDED 103 || coordJob.getStatus() == CoordinatorJob.Status.FAILED 104 || coordJob.getStatus() == CoordinatorJob.Status.DONEWITHERROR 105 || coordJob.getStatus() == CoordinatorJob.Status.KILLED) { 106 LOG.info("Coord actions not killed - job either finished SUCCEEDED, FAILED, KILLED or DONEWITHERROR, job id = " 107 + jobId + ", status = " + coordJob.getStatus()); 108 throw new PreconditionException(ErrorCode.E1020, jobId); 109 } 110 } 111 112 @Override 113 public void transitToNext() { 114 } 115 116 @Override 117 public void killChildren() throws CommandException { 118 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation()); 119 for (CoordinatorActionBean coordAction : coordActions) { 120 coordAction.setStatus(CoordinatorAction.Status.KILLED); 121 coordAction.setLastModifiedTime(new Date()); 122 // kill Workflow job associated with this Coord action 123 if (coordAction.getExternalId() != null) { 124 queue(new KillXCommand(coordAction.getExternalId())); 125 coordAction.incrementAndGetPending(); 126 } 127 else { 128 coordAction.setPending(0); 129 } 130 updateList.add(new UpdateEntry(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME, coordAction)); 131 if (EventHandlerService.isEnabled()) { 132 CoordinatorXCommand.generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName(), 133 coordAction.getCreatedTime()); 134 } 135 queue(new CoordActionNotificationXCommand(coordAction), 100); 136 } 137 CoordinatorActionInfo coordInfo = new CoordinatorActionInfo(coordActions); 138 ret = coordInfo; 139 } 140 141 /* 142 * (non-Javadoc) 143 * 144 * @see org.apache.oozie.command.KillTransitionXCommand#performWrites() 145 */ 146 @Override 147 public void performWrites() throws CommandException { 148 try { 149 BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null); 150 } 151 catch (JPAExecutorException e) { 152 throw new CommandException(e); 153 } 154 } 155 156 @Override 157 public void updateJob() throws CommandException { 158 coordJob.setPending(); 159 updateList.add(new UpdateEntry(CoordJobQuery.UPDATE_COORD_JOB_STATUS_PENDING, coordJob)); 160 } 161 162 @Override 163 public void notifyParent() throws CommandException { 164 } 165 166}