001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.bundle; 019 020 import java.util.Date; 021 import java.util.List; 022 023 import org.apache.oozie.BundleActionBean; 024 import org.apache.oozie.BundleJobBean; 025 import org.apache.oozie.ErrorCode; 026 import org.apache.oozie.XException; 027 import org.apache.oozie.client.Job; 028 import org.apache.oozie.command.CommandException; 029 import org.apache.oozie.command.PreconditionException; 030 import org.apache.oozie.command.ResumeTransitionXCommand; 031 import org.apache.oozie.command.coord.CoordResumeXCommand; 032 import org.apache.oozie.executor.jpa.BundleActionUpdateJPAExecutor; 033 import org.apache.oozie.executor.jpa.BundleActionsGetJPAExecutor; 034 import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor; 035 import org.apache.oozie.executor.jpa.BundleJobUpdateJPAExecutor; 036 import org.apache.oozie.executor.jpa.JPAExecutorException; 037 import org.apache.oozie.service.JPAService; 038 import org.apache.oozie.service.Services; 039 import org.apache.oozie.util.InstrumentUtils; 040 import org.apache.oozie.util.LogUtils; 041 import org.apache.oozie.util.ParamChecker; 042 043 public class BundleJobResumeXCommand extends ResumeTransitionXCommand { 044 045 private final String bundleId; 046 private JPAService jpaService = null; 047 private BundleJobBean bundleJob; 048 private List<BundleActionBean> bundleActions; 049 050 /** 051 * Constructor to create the Bundle Resume Command. 052 * 053 * @param jobId : Bundle Id 054 */ 055 public BundleJobResumeXCommand(String jobId) { 056 super("bundle_resume", "bundle_resume", 1); 057 this.bundleId = ParamChecker.notNull(jobId, "BundleId"); 058 } 059 060 /* (non-Javadoc) 061 * @see org.apache.oozie.command.ResumeTransitionXCommand#resumeChildren() 062 */ 063 @Override 064 public void resumeChildren() throws CommandException { 065 try { 066 for (BundleActionBean action : bundleActions) { 067 if (action.getStatus() == Job.Status.SUSPENDED || action.getStatus() == Job.Status.PREPSUSPENDED) { 068 // queue a CoordResumeXCommand 069 if (action.getCoordId() != null) { 070 queue(new CoordResumeXCommand(action.getCoordId())); 071 updateBundleAction(action); 072 LOG.debug("Resume bundle action = [{0}], new status = [{1}], pending = [{2}] and queue CoordResumeXCommand for [{3}]", 073 action.getBundleActionId(), action.getStatus(), action.getPending(), action 074 .getCoordId()); 075 } 076 else { 077 updateBundleAction(action); 078 LOG.debug("Resume bundle action = [{0}], new status = [{1}], pending = [{2}] and coord id is null", 079 action.getBundleActionId(), action.getStatus(), action.getPending()); 080 } 081 } 082 } 083 LOG.debug("Resume bundle actions for the bundle=[{0}]", bundleId); 084 } 085 catch (XException ex) { 086 throw new CommandException(ex); 087 } 088 } 089 090 private void updateBundleAction(BundleActionBean action) throws CommandException { 091 if (action.getStatus() == Job.Status.PREPSUSPENDED) { 092 action.setStatus(Job.Status.PREP); 093 } 094 else if (action.getStatus() == Job.Status.SUSPENDED) { 095 action.setStatus(Job.Status.RUNNING); 096 } 097 action.incrementAndGetPending(); 098 action.setLastModifiedTime(new Date()); 099 try { 100 jpaService.execute(new BundleActionUpdateJPAExecutor(action)); 101 } 102 catch (JPAExecutorException e) { 103 throw new CommandException(e); 104 } 105 } 106 107 /* (non-Javadoc) 108 * @see org.apache.oozie.command.TransitionXCommand#notifyParent() 109 */ 110 @Override 111 public void notifyParent() throws CommandException { 112 113 } 114 115 /* (non-Javadoc) 116 * @see org.apache.oozie.command.TransitionXCommand#updateJob() 117 */ 118 @Override 119 public void updateJob() throws CommandException { 120 InstrumentUtils.incrJobCounter("bundle_resume", 1, null); 121 bundleJob.setSuspendedTime(null); 122 bundleJob.setLastModifiedTime(new Date()); 123 LOG.debug("Resume bundle job id = " + bundleId + ", status = " + bundleJob.getStatus() + ", pending = " + bundleJob.isPending()); 124 try { 125 jpaService.execute(new BundleJobUpdateJPAExecutor(bundleJob)); 126 } 127 catch (JPAExecutorException e) { 128 throw new CommandException(e); 129 } 130 } 131 132 /* (non-Javadoc) 133 * @see org.apache.oozie.command.XCommand#getEntityKey() 134 */ 135 @Override 136 protected String getEntityKey() { 137 return bundleId; 138 } 139 140 /* (non-Javadoc) 141 * @see org.apache.oozie.command.XCommand#isLockRequired() 142 */ 143 @Override 144 protected boolean isLockRequired() { 145 return true; 146 } 147 148 /* (non-Javadoc) 149 * @see org.apache.oozie.command.XCommand#loadState() 150 */ 151 @Override 152 protected void loadState() throws CommandException { 153 jpaService = Services.get().get(JPAService.class); 154 if (jpaService == null) { 155 throw new CommandException(ErrorCode.E0610); 156 } 157 158 try { 159 bundleJob = jpaService.execute(new BundleJobGetJPAExecutor(bundleId)); 160 bundleActions = jpaService.execute(new BundleActionsGetJPAExecutor(bundleId)); 161 } 162 catch (Exception Ex) { 163 throw new CommandException(ErrorCode.E0604, bundleId); 164 } 165 166 LogUtils.setLogInfo(bundleJob, logInfo); 167 } 168 169 /* (non-Javadoc) 170 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 171 */ 172 @Override 173 protected void verifyPrecondition() throws CommandException, PreconditionException { 174 if (bundleJob.getStatus() != Job.Status.SUSPENDED && bundleJob.getStatus() != Job.Status.PREPSUSPENDED) { 175 throw new PreconditionException(ErrorCode.E1100, "BundleResumeCommand not Resumed - " 176 + "job not in SUSPENDED/PREPSUSPENDED state " + bundleId); 177 } 178 } 179 180 /* (non-Javadoc) 181 * @see org.apache.oozie.command.TransitionXCommand#getJob() 182 */ 183 @Override 184 public Job getJob() { 185 return bundleJob; 186 } 187 }