001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.coord; 019 020 import java.util.Date; 021 import java.util.List; 022 023 import org.apache.oozie.CoordinatorActionBean; 024 import org.apache.oozie.CoordinatorJobBean; 025 import org.apache.oozie.ErrorCode; 026 import org.apache.oozie.XException; 027 import org.apache.oozie.client.CoordinatorJob; 028 import org.apache.oozie.client.Job; 029 import org.apache.oozie.command.CommandException; 030 import org.apache.oozie.command.PreconditionException; 031 import org.apache.oozie.command.SuspendTransitionXCommand; 032 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand; 033 import org.apache.oozie.command.wf.SuspendXCommand; 034 import org.apache.oozie.executor.jpa.BulkUpdateInsertForCoordActionStatusJPAExecutor; 035 import org.apache.oozie.executor.jpa.CoordJobGetActionsRunningJPAExecutor; 036 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; 037 import org.apache.oozie.executor.jpa.JPAExecutorException; 038 import org.apache.oozie.service.JPAService; 039 import org.apache.oozie.service.Services; 040 import org.apache.oozie.util.InstrumentUtils; 041 import org.apache.oozie.util.LogUtils; 042 import org.apache.oozie.util.ParamChecker; 043 import org.apache.oozie.util.StatusUtils; 044 045 /** 046 * Suspend coordinator job and actions. 047 * 048 */ 049 public class CoordSuspendXCommand extends SuspendTransitionXCommand { 050 private final String jobId; 051 private CoordinatorJobBean coordJob; 052 private JPAService jpaService; 053 private boolean exceptionOccured = false; 054 private CoordinatorJob.Status prevStatus = null; 055 056 public CoordSuspendXCommand(String id) { 057 super("coord_suspend", "coord_suspend", 1); 058 this.jobId = ParamChecker.notEmpty(id, "id"); 059 } 060 061 /* (non-Javadoc) 062 * @see org.apache.oozie.command.XCommand#getEntityKey() 063 */ 064 @Override 065 public String getEntityKey() { 066 return jobId; 067 } 068 069 /* (non-Javadoc) 070 * @see org.apache.oozie.command.XCommand#isLockRequired() 071 */ 072 @Override 073 protected boolean isLockRequired() { 074 return true; 075 } 076 077 /* (non-Javadoc) 078 * @see org.apache.oozie.command.XCommand#loadState() 079 */ 080 @Override 081 protected void loadState() throws CommandException { 082 super.eagerLoadState(); 083 try { 084 jpaService = Services.get().get(JPAService.class); 085 if (jpaService != null) { 086 this.coordJob = jpaService.execute(new CoordJobGetJPAExecutor(this.jobId)); 087 prevStatus = coordJob.getStatus(); 088 } 089 else { 090 throw new CommandException(ErrorCode.E0610); 091 } 092 } 093 catch (Exception ex) { 094 throw new CommandException(ErrorCode.E0603, ex.getMessage(), ex); 095 } 096 LogUtils.setLogInfo(this.coordJob, logInfo); 097 } 098 099 /* (non-Javadoc) 100 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 101 */ 102 @Override 103 protected void verifyPrecondition() throws CommandException, PreconditionException { 104 super.eagerVerifyPrecondition(); 105 if (coordJob.getStatus() == CoordinatorJob.Status.SUCCEEDED 106 || coordJob.getStatus() == CoordinatorJob.Status.FAILED 107 || coordJob.getStatus() == CoordinatorJob.Status.KILLED) { 108 LOG.info("CoordSuspendXCommand is not going to execute because " 109 + "job finished or failed or killed, id = " + jobId + ", status = " + coordJob.getStatus()); 110 throw new PreconditionException(ErrorCode.E0728, jobId, coordJob.getStatus().toString()); 111 } 112 } 113 114 /* (non-Javadoc) 115 * @see org.apache.oozie.command.SuspendTransitionXCommand#suspendChildren() 116 */ 117 @Override 118 public void suspendChildren() throws CommandException { 119 try { 120 //Get all running actions of a job to suspend them 121 List<CoordinatorActionBean> actionList = jpaService 122 .execute(new CoordJobGetActionsRunningJPAExecutor(jobId)); 123 for (CoordinatorActionBean action : actionList) { 124 // queue a SuspendXCommand 125 if (action.getExternalId() != null) { 126 queue(new SuspendXCommand(action.getExternalId())); 127 updateCoordAction(action); 128 LOG.debug( 129 "Suspend coord action = [{0}], new status = [{1}], pending = [{2}] and queue SuspendXCommand for [{3}]", 130 action.getId(), action.getStatus(), action.getPending(), action.getExternalId()); 131 } 132 else { 133 updateCoordAction(action); 134 LOG.debug( 135 "Suspend coord action = [{0}], new status = [{1}], pending = [{2}] and external id is null", 136 action.getId(), action.getStatus(), action.getPending()); 137 } 138 139 } 140 LOG.debug("Suspended coordinator actions for the coordinator=[{0}]", jobId); 141 } 142 catch (XException ex) { 143 exceptionOccured = true; 144 throw new CommandException(ex); 145 } 146 finally { 147 if (exceptionOccured) { 148 coordJob.setStatus(CoordinatorJob.Status.FAILED); 149 coordJob.resetPending(); 150 LOG.debug("Exception happened, fail coordinator job id = " + jobId + ", status = " 151 + coordJob.getStatus()); 152 updateList.add(coordJob); 153 } 154 } 155 } 156 157 /* (non-Javadoc) 158 * @see org.apache.oozie.command.TransitionXCommand#notifyParent() 159 */ 160 @Override 161 public void notifyParent() throws CommandException { 162 // update bundle action 163 if (this.coordJob.getBundleId() != null) { 164 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus); 165 bundleStatusUpdate.call(); 166 } 167 } 168 169 /* (non-Javadoc) 170 * @see org.apache.oozie.command.TransitionXCommand#updateJob() 171 */ 172 @Override 173 public void updateJob() { 174 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation()); 175 coordJob.setLastModifiedTime(new Date()); 176 coordJob.setSuspendedTime(new Date()); 177 LOG.debug("Suspend coordinator job id = " + jobId + ", status = " + coordJob.getStatus() + ", pending = " + coordJob.isPending()); 178 updateList.add(coordJob); 179 } 180 181 /* (non-Javadoc) 182 * @see org.apache.oozie.command.SuspendTransitionXCommand#performWrites() 183 */ 184 @Override 185 public void performWrites() throws CommandException { 186 try { 187 jpaService.execute(new BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, null)); 188 } 189 catch (JPAExecutorException jex) { 190 throw new CommandException(jex); 191 } 192 } 193 194 private void updateCoordAction(CoordinatorActionBean action) { 195 action.setStatus(CoordinatorActionBean.Status.SUSPENDED); 196 action.incrementAndGetPending(); 197 action.setLastModifiedTime(new Date()); 198 updateList.add(action); 199 } 200 201 /* (non-Javadoc) 202 * @see org.apache.oozie.command.TransitionXCommand#getJob() 203 */ 204 @Override 205 public Job getJob() { 206 return coordJob; 207 } 208 209 /** 210 * Transit job to suspended from running or to prepsuspended from prep. 211 * 212 * @see org.apache.oozie.command.TransitionXCommand#transitToNext() 213 */ 214 @Override 215 public void transitToNext() { 216 if (coordJob == null) { 217 coordJob = (CoordinatorJobBean) this.getJob(); 218 } 219 if (coordJob.getStatus() == Job.Status.PREP) { 220 coordJob.setStatus(Job.Status.PREPSUSPENDED); 221 coordJob.setStatus(StatusUtils.getStatus(coordJob)); 222 } 223 else if (coordJob.getStatus() == Job.Status.RUNNING) { 224 coordJob.setStatus(Job.Status.SUSPENDED); 225 } 226 else if (coordJob.getStatus() == Job.Status.RUNNINGWITHERROR) { 227 coordJob.setStatus(Job.Status.SUSPENDEDWITHERROR); 228 } 229 else if (coordJob.getStatus() == Job.Status.PAUSED) { 230 coordJob.setStatus(Job.Status.SUSPENDED); 231 } 232 else if (coordJob.getStatus() == Job.Status.PREPPAUSED) { 233 coordJob.setStatus(Job.Status.PREPSUSPENDED); 234 } 235 coordJob.setPending(); 236 } 237 238 }