001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.bundle; 019 020 import java.util.Date; 021 import java.util.List; 022 023 import org.apache.oozie.BundleActionBean; 024 import org.apache.oozie.BundleJobBean; 025 import org.apache.oozie.ErrorCode; 026 import org.apache.oozie.XException; 027 import org.apache.oozie.client.Job; 028 import org.apache.oozie.command.CommandException; 029 import org.apache.oozie.command.PreconditionException; 030 import org.apache.oozie.command.SuspendTransitionXCommand; 031 import org.apache.oozie.command.coord.CoordSuspendXCommand; 032 import org.apache.oozie.executor.jpa.BundleActionUpdateJPAExecutor; 033 import org.apache.oozie.executor.jpa.BundleActionsGetJPAExecutor; 034 import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor; 035 import org.apache.oozie.executor.jpa.BundleJobUpdateJPAExecutor; 036 import org.apache.oozie.executor.jpa.JPAExecutorException; 037 import org.apache.oozie.service.JPAService; 038 import org.apache.oozie.service.Services; 039 import org.apache.oozie.util.InstrumentUtils; 040 import org.apache.oozie.util.ParamChecker; 041 import org.apache.oozie.util.LogUtils; 042 043 public class BundleJobSuspendXCommand extends SuspendTransitionXCommand { 044 private final String jobId; 045 private JPAService jpaService; 046 private List<BundleActionBean> bundleActions; 047 private BundleJobBean bundleJob; 048 049 public BundleJobSuspendXCommand(String id) { 050 super("bundle_suspend", "bundle_suspend", 1); 051 this.jobId = ParamChecker.notEmpty(id, "id"); 052 } 053 054 /* (non-Javadoc) 055 * @see org.apache.oozie.command.TransitionXCommand#getJob() 056 */ 057 @Override 058 public Job getJob() { 059 return bundleJob; 060 } 061 062 /* (non-Javadoc) 063 * @see org.apache.oozie.command.TransitionXCommand#notifyParent() 064 */ 065 @Override 066 public void notifyParent() throws CommandException { 067 } 068 069 /* (non-Javadoc) 070 * @see org.apache.oozie.command.TransitionXCommand#setJob(org.apache.oozie.client.Job) 071 */ 072 @Override 073 public void setJob(Job job) { 074 } 075 076 /* (non-Javadoc) 077 * @see org.apache.oozie.command.XCommand#getEntityKey() 078 */ 079 @Override 080 protected String getEntityKey() { 081 return this.jobId; 082 } 083 084 /* (non-Javadoc) 085 * @see org.apache.oozie.command.XCommand#isLockRequired() 086 */ 087 @Override 088 protected boolean isLockRequired() { 089 return true; 090 } 091 092 /* (non-Javadoc) 093 * @see org.apache.oozie.command.XCommand#loadState() 094 */ 095 @Override 096 protected void loadState() throws CommandException { 097 jpaService = Services.get().get(JPAService.class); 098 if (jpaService == null) { 099 throw new CommandException(ErrorCode.E0610); 100 } 101 102 try { 103 bundleJob = jpaService.execute(new BundleJobGetJPAExecutor(jobId)); 104 } 105 catch (Exception Ex) { 106 throw new CommandException(ErrorCode.E0604, jobId); 107 } 108 109 try { 110 bundleActions = jpaService.execute(new BundleActionsGetJPAExecutor(jobId)); 111 } 112 catch (Exception Ex) { 113 throw new CommandException(ErrorCode.E1311, jobId); 114 } 115 116 LogUtils.setLogInfo(bundleJob, logInfo); 117 } 118 119 /* (non-Javadoc) 120 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 121 */ 122 @Override 123 protected void verifyPrecondition() throws CommandException, PreconditionException { 124 if (bundleJob.getStatus() == Job.Status.SUCCEEDED || bundleJob.getStatus() == Job.Status.FAILED 125 || bundleJob.getStatus() == Job.Status.KILLED) { 126 LOG.info("BundleJobSuspendXCommand is not going to execute because job finished or failed or killed, id = " 127 + jobId + ", status = " + bundleJob.getStatus()); 128 throw new PreconditionException(ErrorCode.E1312, jobId, bundleJob.getStatus().toString()); 129 } 130 } 131 132 /* (non-Javadoc) 133 * @see org.apache.oozie.command.TransitionXCommand#updateJob() 134 */ 135 @Override 136 public void updateJob() throws CommandException { 137 InstrumentUtils.incrJobCounter("bundle_suspend", 1, null); 138 bundleJob.setSuspendedTime(new Date()); 139 bundleJob.setLastModifiedTime(new Date()); 140 141 LOG.debug("Suspend bundle job id = " + jobId + ", status = " + bundleJob.getStatus() + ", pending = " + bundleJob.isPending()); 142 try { 143 jpaService.execute(new BundleJobUpdateJPAExecutor(bundleJob)); 144 } 145 catch (JPAExecutorException e) { 146 throw new CommandException(e); 147 } 148 } 149 150 @Override 151 public void suspendChildren() throws CommandException { 152 try { 153 for (BundleActionBean action : this.bundleActions) { 154 if (action.getStatus() == Job.Status.RUNNING || action.getStatus() == Job.Status.PREP) { 155 // queue a CoordSuspendXCommand 156 if (action.getCoordId() != null) { 157 queue(new CoordSuspendXCommand(action.getCoordId())); 158 updateBundleAction(action); 159 LOG.debug("Suspend bundle action = [{0}], new status = [{1}], pending = [{2}] and queue CoordSuspendXCommand for [{3}]", 160 action.getBundleActionId(), action.getStatus(), action.getPending(), action.getCoordId()); 161 } else { 162 updateBundleAction(action); 163 LOG.debug("Suspend bundle action = [{0}], new status = [{1}], pending = [{2}] and coord id is null", 164 action.getBundleActionId(), action.getStatus(), action.getPending()); 165 } 166 167 } 168 } 169 LOG.debug("Suspended bundle actions for the bundle=[{0}]", jobId); 170 } 171 catch (XException ex) { 172 throw new CommandException(ex); 173 } 174 } 175 176 private void updateBundleAction(BundleActionBean action) throws CommandException { 177 if (action.getStatus() == Job.Status.PREP) { 178 action.setStatus(Job.Status.PREPSUSPENDED); 179 } 180 else if (action.getStatus() == Job.Status.RUNNING) { 181 action.setStatus(Job.Status.SUSPENDED); 182 } 183 action.incrementAndGetPending(); 184 action.setLastModifiedTime(new Date()); 185 try { 186 jpaService.execute(new BundleActionUpdateJPAExecutor(action)); 187 } 188 catch (JPAExecutorException e) { 189 throw new CommandException(e); 190 } 191 } 192 }