001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.command.bundle; 019 020import java.util.Date; 021import java.util.List; 022 023import org.apache.oozie.BundleActionBean; 024import org.apache.oozie.BundleJobBean; 025import org.apache.oozie.ErrorCode; 026import org.apache.oozie.client.Job; 027import org.apache.oozie.command.CommandException; 028import org.apache.oozie.command.PreconditionException; 029import org.apache.oozie.command.SuspendTransitionXCommand; 030import org.apache.oozie.command.coord.CoordSuspendXCommand; 031import org.apache.oozie.executor.jpa.BundleActionQueryExecutor.BundleActionQuery; 032import org.apache.oozie.executor.jpa.BatchQueryExecutor; 033import org.apache.oozie.executor.jpa.BundleActionQueryExecutor; 034import org.apache.oozie.executor.jpa.BundleJobQueryExecutor; 035import org.apache.oozie.executor.jpa.BundleJobQueryExecutor.BundleJobQuery; 036import org.apache.oozie.executor.jpa.JPAExecutorException; 037import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry; 038import org.apache.oozie.util.InstrumentUtils; 039import org.apache.oozie.util.ParamChecker; 040import org.apache.oozie.util.LogUtils; 041 042public class BundleJobSuspendXCommand extends SuspendTransitionXCommand { 043 private final String jobId; 044 private List<BundleActionBean> bundleActions; 045 private BundleJobBean bundleJob; 046 047 public BundleJobSuspendXCommand(String id) { 048 super("bundle_suspend", "bundle_suspend", 1); 049 this.jobId = ParamChecker.notEmpty(id, "id"); 050 } 051 052 /* (non-Javadoc) 053 * @see org.apache.oozie.command.TransitionXCommand#getJob() 054 */ 055 @Override 056 public Job getJob() { 057 return bundleJob; 058 } 059 060 /* (non-Javadoc) 061 * @see org.apache.oozie.command.TransitionXCommand#notifyParent() 062 */ 063 @Override 064 public void notifyParent() throws CommandException { 065 } 066 067 /* (non-Javadoc) 068 * @see org.apache.oozie.command.TransitionXCommand#setJob(org.apache.oozie.client.Job) 069 */ 070 @Override 071 public void setJob(Job job) { 072 } 073 074 /* (non-Javadoc) 075 * @see org.apache.oozie.command.SuspendTransitionXCommand#performWrites() 076 */ 077 @Override 078 public void performWrites() throws CommandException { 079 try { 080 BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null); 081 } 082 catch (JPAExecutorException e) { 083 throw new CommandException(e); 084 } 085 } 086 087 /* (non-Javadoc) 088 * @see org.apache.oozie.command.XCommand#getEntityKey() 089 */ 090 @Override 091 public String getEntityKey() { 092 return this.jobId; 093 } 094 095 /* (non-Javadoc) 096 * @see org.apache.oozie.command.XCommand#isLockRequired() 097 */ 098 @Override 099 protected boolean isLockRequired() { 100 return true; 101 } 102 103 /* (non-Javadoc) 104 * @see org.apache.oozie.command.XCommand#loadState() 105 */ 106 @Override 107 protected void loadState() throws CommandException { 108 try { 109 bundleJob = BundleJobQueryExecutor.getInstance().get(BundleJobQuery.GET_BUNDLE_JOB, jobId); 110 } 111 catch (Exception Ex) { 112 throw new CommandException(ErrorCode.E0604, jobId); 113 } 114 115 try { 116 bundleActions = BundleActionQueryExecutor.getInstance().getList( 117 BundleActionQuery.GET_BUNDLE_ACTIONS_STATUS_UNIGNORED_FOR_BUNDLE, bundleJob.getId()); 118 } 119 catch (Exception Ex) { 120 throw new CommandException(ErrorCode.E1311, jobId); 121 } 122 123 LogUtils.setLogInfo(bundleJob, logInfo); 124 } 125 126 /* (non-Javadoc) 127 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 128 */ 129 @Override 130 protected void verifyPrecondition() throws CommandException, PreconditionException { 131 if (bundleJob.getStatus() == Job.Status.SUCCEEDED || bundleJob.getStatus() == Job.Status.FAILED 132 || bundleJob.getStatus() == Job.Status.KILLED || bundleJob.getStatus() == Job.Status.DONEWITHERROR) { 133 LOG.info("BundleJobSuspendXCommand is not going to execute because job either succeeded, failed, killed, or donewitherror; id = " 134 + jobId + ", status = " + bundleJob.getStatus()); 135 throw new PreconditionException(ErrorCode.E1312, jobId, bundleJob.getStatus().toString()); 136 } 137 } 138 139 /* (non-Javadoc) 140 * @see org.apache.oozie.command.TransitionXCommand#updateJob() 141 */ 142 @Override 143 public void updateJob() { 144 InstrumentUtils.incrJobCounter("bundle_suspend", 1, null); 145 bundleJob.setSuspendedTime(new Date()); 146 bundleJob.setLastModifiedTime(new Date()); 147 148 LOG.debug("Suspend bundle job id = " + jobId + ", status = " + bundleJob.getStatus() + ", pending = " + bundleJob.isPending()); 149 updateList.add(new UpdateEntry<BundleJobQuery>(BundleJobQuery.UPDATE_BUNDLE_JOB_STATUS_PENDING_SUSP_MOD_TIME, bundleJob)); 150 } 151 152 @Override 153 public void suspendChildren() throws CommandException { 154 for (BundleActionBean action : this.bundleActions) { 155 if (action.getStatus() == Job.Status.RUNNING || action.getStatus() == Job.Status.RUNNINGWITHERROR 156 || action.getStatus() == Job.Status.PREP || action.getStatus() == Job.Status.PAUSED 157 || action.getStatus() == Job.Status.PAUSEDWITHERROR) { 158 // queue a CoordSuspendXCommand 159 if (action.getCoordId() != null) { 160 queue(new CoordSuspendXCommand(action.getCoordId())); 161 updateBundleAction(action); 162 LOG.debug("Suspend bundle action = [{0}], new status = [{1}], pending = [{2}] and queue CoordSuspendXCommand for [{3}]", 163 action.getBundleActionId(), action.getStatus(), action.getPending(), action.getCoordId()); 164 } else { 165 updateBundleAction(action); 166 LOG.debug("Suspend bundle action = [{0}], new status = [{1}], pending = [{2}] and coord id is null", 167 action.getBundleActionId(), action.getStatus(), action.getPending()); 168 } 169 170 } 171 } 172 LOG.debug("Suspended bundle actions for the bundle=[{0}]", jobId); 173 } 174 175 private void updateBundleAction(BundleActionBean action) { 176 if (action.getStatus() == Job.Status.PREP) { 177 action.setStatus(Job.Status.PREPSUSPENDED); 178 } 179 else if (action.getStatus() == Job.Status.RUNNING) { 180 action.setStatus(Job.Status.SUSPENDED); 181 } 182 else if (action.getStatus() == Job.Status.RUNNINGWITHERROR) { 183 action.setStatus(Job.Status.SUSPENDEDWITHERROR); 184 } 185 else if (action.getStatus() == Job.Status.PAUSED) { 186 action.setStatus(Job.Status.SUSPENDED); 187 } 188 else if (action.getStatus() == Job.Status.PAUSEDWITHERROR) { 189 action.setStatus(Job.Status.SUSPENDEDWITHERROR); 190 } 191 192 action.incrementAndGetPending(); 193 action.setLastModifiedTime(new Date()); 194 updateList.add(new UpdateEntry<BundleActionQuery>( 195 BundleActionQuery.UPDATE_BUNDLE_ACTION_STATUS_PENDING_MODTIME, action)); 196 } 197}