001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.command.coord; 020 021import java.util.Date; 022import java.util.List; 023 024import org.apache.oozie.CoordinatorActionBean; 025import org.apache.oozie.CoordinatorJobBean; 026import org.apache.oozie.ErrorCode; 027import org.apache.oozie.XException; 028import org.apache.oozie.client.CoordinatorJob; 029import org.apache.oozie.client.Job; 030import org.apache.oozie.command.CommandException; 031import org.apache.oozie.command.PreconditionException; 032import org.apache.oozie.command.SuspendTransitionXCommand; 033import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand; 034import org.apache.oozie.command.wf.SuspendXCommand; 035import org.apache.oozie.executor.jpa.BatchQueryExecutor; 036import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry; 037import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery; 038import org.apache.oozie.executor.jpa.CoordJobGetActionsRunningJPAExecutor; 039import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; 040import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery; 041import org.apache.oozie.executor.jpa.JPAExecutorException; 042import org.apache.oozie.service.JPAService; 043import org.apache.oozie.service.Services; 044import org.apache.oozie.util.InstrumentUtils; 045import org.apache.oozie.util.LogUtils; 046import org.apache.oozie.util.ParamChecker; 047import org.apache.oozie.util.StatusUtils; 048 049/** 050 * Suspend coordinator job and actions. 051 * 052 */ 053public class CoordSuspendXCommand extends SuspendTransitionXCommand { 054 private final String jobId; 055 private CoordinatorJobBean coordJob; 056 private JPAService jpaService; 057 private boolean exceptionOccured = false; 058 private CoordinatorJob.Status prevStatus = null; 059 060 public CoordSuspendXCommand(String id) { 061 super("coord_suspend", "coord_suspend", 1); 062 this.jobId = ParamChecker.notEmpty(id, "id"); 063 } 064 065 @Override 066 public String getEntityKey() { 067 return jobId; 068 } 069 070 @Override 071 public String getKey() { 072 return getName() + "_" + jobId; 073 } 074 075 @Override 076 protected boolean isLockRequired() { 077 return true; 078 } 079 080 @Override 081 protected void loadState() throws CommandException { 082 super.eagerLoadState(); 083 try { 084 jpaService = Services.get().get(JPAService.class); 085 if (jpaService != null) { 086 this.coordJob = CoordJobQueryExecutor.getInstance() 087 .get(CoordJobQuery.GET_COORD_JOB_SUSPEND_KILL, this.jobId); 088 prevStatus = coordJob.getStatus(); 089 } 090 else { 091 throw new CommandException(ErrorCode.E0610); 092 } 093 } 094 catch (Exception ex) { 095 throw new CommandException(ErrorCode.E0603, ex.getMessage(), ex); 096 } 097 LogUtils.setLogInfo(this.coordJob); 098 } 099 100 @Override 101 protected void verifyPrecondition() throws CommandException, PreconditionException { 102 super.eagerVerifyPrecondition(); 103 if (coordJob.getStatus() == CoordinatorJob.Status.SUCCEEDED 104 || coordJob.getStatus() == CoordinatorJob.Status.FAILED 105 || coordJob.getStatus() == CoordinatorJob.Status.KILLED 106 || coordJob.getStatus() == CoordinatorJob.Status.IGNORED) { 107 LOG.info("CoordSuspendXCommand is not going to execute because " 108 + "job finished or failed or killed, id = " + jobId + ", status = " + coordJob.getStatus()); 109 throw new PreconditionException(ErrorCode.E0728, jobId, coordJob.getStatus().toString()); 110 } 111 } 112 113 @Override 114 public void suspendChildren() throws CommandException { 115 try { 116 //Get all running actions of a job to suspend them 117 List<CoordinatorActionBean> actionList = jpaService 118 .execute(new CoordJobGetActionsRunningJPAExecutor(jobId)); 119 for (CoordinatorActionBean action : actionList) { 120 // queue a SuspendXCommand 121 if (action.getExternalId() != null) { 122 queue(new SuspendXCommand(action.getExternalId())); 123 updateCoordAction(action); 124 LOG.debug( 125 "Suspend coord action = [{0}], new status = [{1}], pending = [{2}] and queue SuspendXCommand for [{3}]", 126 action.getId(), action.getStatus(), action.getPending(), action.getExternalId()); 127 } 128 else { 129 updateCoordAction(action); 130 LOG.debug( 131 "Suspend coord action = [{0}], new status = [{1}], pending = [{2}] and external id is null", 132 action.getId(), action.getStatus(), action.getPending()); 133 } 134 135 } 136 LOG.debug("Suspended coordinator actions for the coordinator=[{0}]", jobId); 137 } 138 catch (XException ex) { 139 exceptionOccured = true; 140 throw new CommandException(ex); 141 } 142 finally { 143 if (exceptionOccured) { 144 coordJob.setStatus(CoordinatorJob.Status.FAILED); 145 coordJob.resetPending(); 146 LOG.debug("Exception happened, fail coordinator job id = " + jobId + ", status = " 147 + coordJob.getStatus()); 148 updateList.add(new UpdateEntry<CoordJobQuery>(CoordJobQuery.UPDATE_COORD_JOB_STATUS_PENDING_TIME, coordJob)); 149 } 150 } 151 } 152 153 @Override 154 public void notifyParent() throws CommandException { 155 // update bundle action 156 if (this.coordJob.getBundleId() != null) { 157 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus); 158 bundleStatusUpdate.call(); 159 } 160 } 161 162 @Override 163 public void updateJob() { 164 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation()); 165 coordJob.setLastModifiedTime(new Date()); 166 coordJob.setSuspendedTime(new Date()); 167 LOG.debug("Suspend coordinator job id = " + jobId + ", status = " + coordJob.getStatus() + ", pending = " + coordJob.isPending()); 168 updateList.add(new UpdateEntry<CoordJobQuery>(CoordJobQuery.UPDATE_COORD_JOB_STATUS_PENDING_TIME, coordJob)); 169 } 170 171 @Override 172 public void performWrites() throws CommandException { 173 try { 174 BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null); 175 } 176 catch (JPAExecutorException jex) { 177 throw new CommandException(jex); 178 } 179 } 180 181 private void updateCoordAction(CoordinatorActionBean action) { 182 action.setStatus(CoordinatorActionBean.Status.SUSPENDED); 183 action.incrementAndGetPending(); 184 action.setLastModifiedTime(new Date()); 185 updateList.add(new UpdateEntry<CoordActionQuery>(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME, action)); 186 } 187 188 @Override 189 public Job getJob() { 190 return coordJob; 191 } 192 193 /** 194 * Transit job to suspended from running or to prepsuspended from prep. 195 * 196 * @see org.apache.oozie.command.TransitionXCommand#transitToNext() 197 */ 198 @Override 199 public void transitToNext() { 200 if (coordJob == null) { 201 coordJob = (CoordinatorJobBean) this.getJob(); 202 } 203 if (coordJob.getStatus() == Job.Status.PREP) { 204 coordJob.setStatus(Job.Status.PREPSUSPENDED); 205 coordJob.setStatus(StatusUtils.getStatus(coordJob)); 206 } 207 else if (coordJob.getStatus() == Job.Status.RUNNING) { 208 coordJob.setStatus(Job.Status.SUSPENDED); 209 } 210 else if (coordJob.getStatus() == Job.Status.RUNNINGWITHERROR || coordJob.getStatus() == Job.Status.PAUSEDWITHERROR) { 211 coordJob.setStatus(Job.Status.SUSPENDEDWITHERROR); 212 } 213 else if (coordJob.getStatus() == Job.Status.PAUSED) { 214 coordJob.setStatus(Job.Status.SUSPENDED); 215 } 216 else if (coordJob.getStatus() == Job.Status.PREPPAUSED) { 217 coordJob.setStatus(Job.Status.PREPSUSPENDED); 218 } 219 coordJob.setPending(); 220 } 221 222}