001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.coord; 019 020 import java.util.Date; 021 import java.util.List; 022 023 import org.apache.oozie.CoordinatorActionBean; 024 import org.apache.oozie.CoordinatorJobBean; 025 import org.apache.oozie.ErrorCode; 026 import org.apache.oozie.XException; 027 import org.apache.oozie.client.CoordinatorJob; 028 import org.apache.oozie.client.Job; 029 import org.apache.oozie.command.CommandException; 030 import org.apache.oozie.command.PreconditionException; 031 import org.apache.oozie.command.SuspendTransitionXCommand; 032 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand; 033 import org.apache.oozie.command.wf.SuspendXCommand; 034 import org.apache.oozie.executor.jpa.CoordActionUpdateStatusJPAExecutor; 035 import org.apache.oozie.executor.jpa.CoordJobGetActionsRunningJPAExecutor; 036 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; 037 import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor; 038 import org.apache.oozie.executor.jpa.JPAExecutorException; 039 import org.apache.oozie.service.JPAService; 040 import org.apache.oozie.service.Services; 041 import org.apache.oozie.util.InstrumentUtils; 042 import org.apache.oozie.util.LogUtils; 043 import org.apache.oozie.util.ParamChecker; 044 import org.apache.oozie.util.StatusUtils; 045 046 /** 047 * Suspend coordinator job and actions. 048 * 049 */ 050 public class CoordSuspendXCommand extends SuspendTransitionXCommand { 051 private final String jobId; 052 private CoordinatorJobBean coordJob; 053 private JPAService jpaService; 054 private boolean exceptionOccured = false; 055 private CoordinatorJob.Status prevStatus = null; 056 057 public CoordSuspendXCommand(String id) { 058 super("coord_suspend", "coord_suspend", 1); 059 this.jobId = ParamChecker.notEmpty(id, "id"); 060 } 061 062 /* (non-Javadoc) 063 * @see org.apache.oozie.command.XCommand#getEntityKey() 064 */ 065 @Override 066 public String getEntityKey() { 067 return jobId; 068 } 069 070 /* (non-Javadoc) 071 * @see org.apache.oozie.command.XCommand#isLockRequired() 072 */ 073 @Override 074 protected boolean isLockRequired() { 075 return true; 076 } 077 078 /* (non-Javadoc) 079 * @see org.apache.oozie.command.XCommand#loadState() 080 */ 081 @Override 082 protected void loadState() throws CommandException { 083 super.eagerLoadState(); 084 try { 085 jpaService = Services.get().get(JPAService.class); 086 if (jpaService != null) { 087 this.coordJob = jpaService.execute(new CoordJobGetJPAExecutor(this.jobId)); 088 prevStatus = coordJob.getStatus(); 089 } 090 else { 091 throw new CommandException(ErrorCode.E0610); 092 } 093 } 094 catch (Exception ex) { 095 throw new CommandException(ErrorCode.E0603, ex); 096 } 097 LogUtils.setLogInfo(this.coordJob, logInfo); 098 } 099 100 /* (non-Javadoc) 101 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 102 */ 103 @Override 104 protected void verifyPrecondition() throws CommandException, PreconditionException { 105 super.eagerVerifyPrecondition(); 106 if (coordJob.getStatus() == CoordinatorJob.Status.SUCCEEDED 107 || coordJob.getStatus() == CoordinatorJob.Status.FAILED 108 || coordJob.getStatus() == CoordinatorJob.Status.KILLED) { 109 LOG.info("CoordSuspendXCommand is not going to execute because " 110 + "job finished or failed or killed, id = " + jobId + ", status = " + coordJob.getStatus()); 111 throw new PreconditionException(ErrorCode.E0728, jobId, coordJob.getStatus().toString()); 112 } 113 } 114 115 /* (non-Javadoc) 116 * @see org.apache.oozie.command.SuspendTransitionXCommand#suspendChildren() 117 */ 118 @Override 119 public void suspendChildren() throws CommandException { 120 try { 121 //Get all running actions of a job to suspend them 122 List<CoordinatorActionBean> actionList = jpaService 123 .execute(new CoordJobGetActionsRunningJPAExecutor(jobId)); 124 for (CoordinatorActionBean action : actionList) { 125 // queue a SuspendXCommand 126 if (action.getExternalId() != null) { 127 queue(new SuspendXCommand(action.getExternalId())); 128 updateCoordAction(action); 129 LOG.debug( 130 "Suspend coord action = [{0}], new status = [{1}], pending = [{2}] and queue SuspendXCommand for [{3}]", 131 action.getId(), action.getStatus(), action.getPending(), action.getExternalId()); 132 } 133 else { 134 updateCoordAction(action); 135 LOG.debug( 136 "Suspend coord action = [{0}], new status = [{1}], pending = [{2}] and external id is null", 137 action.getId(), action.getStatus(), action.getPending()); 138 } 139 140 } 141 LOG.debug("Suspended coordinator actions for the coordinator=[{0}]", jobId); 142 } 143 catch (XException ex) { 144 exceptionOccured = true; 145 throw new CommandException(ex); 146 } 147 finally { 148 if (exceptionOccured) { 149 coordJob.setStatus(CoordinatorJob.Status.FAILED); 150 coordJob.resetPending(); 151 LOG.debug("Exception happened, fail coordinator job id = " + jobId + ", status = " 152 + coordJob.getStatus()); 153 try { 154 jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob)); 155 } 156 catch (JPAExecutorException je) { 157 LOG.error("Failed to update coordinator job : " + jobId, je); 158 } 159 } 160 } 161 } 162 163 /* (non-Javadoc) 164 * @see org.apache.oozie.command.TransitionXCommand#notifyParent() 165 */ 166 @Override 167 public void notifyParent() throws CommandException { 168 // update bundle action 169 if (this.coordJob.getBundleId() != null) { 170 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus); 171 bundleStatusUpdate.call(); 172 } 173 } 174 175 /* (non-Javadoc) 176 * @see org.apache.oozie.command.TransitionXCommand#updateJob() 177 */ 178 @Override 179 public void updateJob() throws CommandException { 180 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation()); 181 coordJob.setLastModifiedTime(new Date()); 182 coordJob.setSuspendedTime(new Date()); 183 LOG.debug("Suspend coordinator job id = " + jobId + ", status = " + coordJob.getStatus() + ", pending = " + coordJob.isPending()); 184 try { 185 jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob)); 186 } 187 catch (JPAExecutorException e) { 188 throw new CommandException(e); 189 } 190 } 191 192 private void updateCoordAction(CoordinatorActionBean action) throws CommandException { 193 action.setStatus(CoordinatorActionBean.Status.SUSPENDED); 194 action.incrementAndGetPending(); 195 action.setLastModifiedTime(new Date()); 196 try { 197 jpaService.execute(new CoordActionUpdateStatusJPAExecutor(action)); 198 } 199 catch (JPAExecutorException e) { 200 throw new CommandException(e); 201 } 202 } 203 204 /* (non-Javadoc) 205 * @see org.apache.oozie.command.TransitionXCommand#getJob() 206 */ 207 @Override 208 public Job getJob() { 209 return coordJob; 210 } 211 212 /** 213 * Transit job to suspended from running or to prepsuspended from prep. 214 * 215 * @see org.apache.oozie.command.TransitionXCommand#transitToNext() 216 */ 217 @Override 218 public void transitToNext() { 219 if (coordJob == null) { 220 coordJob = (CoordinatorJobBean) this.getJob(); 221 } 222 if (coordJob.getStatus() == Job.Status.PREP) { 223 coordJob.setStatus(Job.Status.PREPSUSPENDED); 224 coordJob.setStatus(StatusUtils.getStatus(coordJob)); 225 } 226 else if (coordJob.getStatus() == Job.Status.RUNNING) { 227 coordJob.setStatus(Job.Status.SUSPENDED); 228 } 229 coordJob.setPending(); 230 } 231 232 }