001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.command.coord; 019 020import java.util.Date; 021import java.util.List; 022 023import org.apache.oozie.CoordinatorActionBean; 024import org.apache.oozie.CoordinatorJobBean; 025import org.apache.oozie.ErrorCode; 026import org.apache.oozie.XException; 027import org.apache.oozie.client.CoordinatorJob; 028import org.apache.oozie.client.Job; 029import org.apache.oozie.command.CommandException; 030import org.apache.oozie.command.PreconditionException; 031import org.apache.oozie.command.SuspendTransitionXCommand; 032import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand; 033import org.apache.oozie.command.wf.SuspendXCommand; 034import org.apache.oozie.executor.jpa.BatchQueryExecutor; 035import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry; 036import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery; 037import org.apache.oozie.executor.jpa.CoordJobGetActionsRunningJPAExecutor; 038import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; 039import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery; 040import org.apache.oozie.executor.jpa.JPAExecutorException; 041import org.apache.oozie.service.JPAService; 042import org.apache.oozie.service.Services; 043import org.apache.oozie.util.InstrumentUtils; 044import org.apache.oozie.util.LogUtils; 045import org.apache.oozie.util.ParamChecker; 046import org.apache.oozie.util.StatusUtils; 047 048/** 049 * Suspend coordinator job and actions. 050 * 051 */ 052public class CoordSuspendXCommand extends SuspendTransitionXCommand { 053 private final String jobId; 054 private CoordinatorJobBean coordJob; 055 private JPAService jpaService; 056 private boolean exceptionOccured = false; 057 private CoordinatorJob.Status prevStatus = null; 058 059 public CoordSuspendXCommand(String id) { 060 super("coord_suspend", "coord_suspend", 1); 061 this.jobId = ParamChecker.notEmpty(id, "id"); 062 } 063 064 @Override 065 public String getEntityKey() { 066 return jobId; 067 } 068 069 @Override 070 public String getKey() { 071 return getName() + "_" + jobId; 072 } 073 074 @Override 075 protected boolean isLockRequired() { 076 return true; 077 } 078 079 @Override 080 protected void loadState() throws CommandException { 081 super.eagerLoadState(); 082 try { 083 jpaService = Services.get().get(JPAService.class); 084 if (jpaService != null) { 085 this.coordJob = CoordJobQueryExecutor.getInstance() 086 .get(CoordJobQuery.GET_COORD_JOB_SUSPEND_KILL, this.jobId); 087 prevStatus = coordJob.getStatus(); 088 } 089 else { 090 throw new CommandException(ErrorCode.E0610); 091 } 092 } 093 catch (Exception ex) { 094 throw new CommandException(ErrorCode.E0603, ex.getMessage(), ex); 095 } 096 LogUtils.setLogInfo(this.coordJob, logInfo); 097 } 098 099 @Override 100 protected void verifyPrecondition() throws CommandException, PreconditionException { 101 super.eagerVerifyPrecondition(); 102 if (coordJob.getStatus() == CoordinatorJob.Status.SUCCEEDED 103 || coordJob.getStatus() == CoordinatorJob.Status.FAILED 104 || coordJob.getStatus() == CoordinatorJob.Status.KILLED 105 || coordJob.getStatus() == CoordinatorJob.Status.IGNORED) { 106 LOG.info("CoordSuspendXCommand is not going to execute because " 107 + "job finished or failed or killed, id = " + jobId + ", status = " + coordJob.getStatus()); 108 throw new PreconditionException(ErrorCode.E0728, jobId, coordJob.getStatus().toString()); 109 } 110 } 111 112 @Override 113 public void suspendChildren() throws CommandException { 114 try { 115 //Get all running actions of a job to suspend them 116 List<CoordinatorActionBean> actionList = jpaService 117 .execute(new CoordJobGetActionsRunningJPAExecutor(jobId)); 118 for (CoordinatorActionBean action : actionList) { 119 // queue a SuspendXCommand 120 if (action.getExternalId() != null) { 121 queue(new SuspendXCommand(action.getExternalId())); 122 updateCoordAction(action); 123 LOG.debug( 124 "Suspend coord action = [{0}], new status = [{1}], pending = [{2}] and queue SuspendXCommand for [{3}]", 125 action.getId(), action.getStatus(), action.getPending(), action.getExternalId()); 126 } 127 else { 128 updateCoordAction(action); 129 LOG.debug( 130 "Suspend coord action = [{0}], new status = [{1}], pending = [{2}] and external id is null", 131 action.getId(), action.getStatus(), action.getPending()); 132 } 133 134 } 135 LOG.debug("Suspended coordinator actions for the coordinator=[{0}]", jobId); 136 } 137 catch (XException ex) { 138 exceptionOccured = true; 139 throw new CommandException(ex); 140 } 141 finally { 142 if (exceptionOccured) { 143 coordJob.setStatus(CoordinatorJob.Status.FAILED); 144 coordJob.resetPending(); 145 LOG.debug("Exception happened, fail coordinator job id = " + jobId + ", status = " 146 + coordJob.getStatus()); 147 updateList.add(new UpdateEntry<CoordJobQuery>(CoordJobQuery.UPDATE_COORD_JOB_STATUS_PENDING_TIME, coordJob)); 148 } 149 } 150 } 151 152 @Override 153 public void notifyParent() throws CommandException { 154 // update bundle action 155 if (this.coordJob.getBundleId() != null) { 156 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus); 157 bundleStatusUpdate.call(); 158 } 159 } 160 161 @Override 162 public void updateJob() { 163 InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation()); 164 coordJob.setLastModifiedTime(new Date()); 165 coordJob.setSuspendedTime(new Date()); 166 LOG.debug("Suspend coordinator job id = " + jobId + ", status = " + coordJob.getStatus() + ", pending = " + coordJob.isPending()); 167 updateList.add(new UpdateEntry<CoordJobQuery>(CoordJobQuery.UPDATE_COORD_JOB_STATUS_PENDING_TIME, coordJob)); 168 } 169 170 @Override 171 public void performWrites() throws CommandException { 172 try { 173 BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null); 174 } 175 catch (JPAExecutorException jex) { 176 throw new CommandException(jex); 177 } 178 } 179 180 private void updateCoordAction(CoordinatorActionBean action) { 181 action.setStatus(CoordinatorActionBean.Status.SUSPENDED); 182 action.incrementAndGetPending(); 183 action.setLastModifiedTime(new Date()); 184 updateList.add(new UpdateEntry<CoordActionQuery>(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME, action)); 185 } 186 187 @Override 188 public Job getJob() { 189 return coordJob; 190 } 191 192 /** 193 * Transit job to suspended from running or to prepsuspended from prep. 194 * 195 * @see org.apache.oozie.command.TransitionXCommand#transitToNext() 196 */ 197 @Override 198 public void transitToNext() { 199 if (coordJob == null) { 200 coordJob = (CoordinatorJobBean) this.getJob(); 201 } 202 if (coordJob.getStatus() == Job.Status.PREP) { 203 coordJob.setStatus(Job.Status.PREPSUSPENDED); 204 coordJob.setStatus(StatusUtils.getStatus(coordJob)); 205 } 206 else if (coordJob.getStatus() == Job.Status.RUNNING) { 207 coordJob.setStatus(Job.Status.SUSPENDED); 208 } 209 else if (coordJob.getStatus() == Job.Status.RUNNINGWITHERROR || coordJob.getStatus() == Job.Status.PAUSEDWITHERROR) { 210 coordJob.setStatus(Job.Status.SUSPENDEDWITHERROR); 211 } 212 else if (coordJob.getStatus() == Job.Status.PAUSED) { 213 coordJob.setStatus(Job.Status.SUSPENDED); 214 } 215 else if (coordJob.getStatus() == Job.Status.PREPPAUSED) { 216 coordJob.setStatus(Job.Status.PREPSUSPENDED); 217 } 218 coordJob.setPending(); 219 } 220 221}