001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.bundle; 019 020 import java.util.Date; 021 import java.util.List; 022 023 import org.apache.oozie.BundleActionBean; 024 import org.apache.oozie.BundleJobBean; 025 import org.apache.oozie.ErrorCode; 026 import org.apache.oozie.client.Job; 027 import org.apache.oozie.command.CommandException; 028 import org.apache.oozie.command.PreconditionException; 029 import org.apache.oozie.command.ResumeTransitionXCommand; 030 import org.apache.oozie.command.coord.CoordResumeXCommand; 031 import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor; 032 import org.apache.oozie.executor.jpa.BundleActionsGetJPAExecutor; 033 import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor; 034 import org.apache.oozie.executor.jpa.JPAExecutorException; 035 import org.apache.oozie.service.JPAService; 036 import org.apache.oozie.service.Services; 037 import org.apache.oozie.util.InstrumentUtils; 038 import org.apache.oozie.util.LogUtils; 039 import org.apache.oozie.util.ParamChecker; 040 041 public class BundleJobResumeXCommand extends ResumeTransitionXCommand { 042 043 private final String bundleId; 044 private JPAService jpaService = null; 045 private BundleJobBean bundleJob; 046 private List<BundleActionBean> bundleActions; 047 048 /** 049 * Constructor to create the Bundle Resume Command. 050 * 051 * @param jobId : Bundle Id 052 */ 053 public BundleJobResumeXCommand(String jobId) { 054 super("bundle_resume", "bundle_resume", 1); 055 this.bundleId = ParamChecker.notNull(jobId, "BundleId"); 056 } 057 058 /* (non-Javadoc) 059 * @see org.apache.oozie.command.ResumeTransitionXCommand#resumeChildren() 060 */ 061 @Override 062 public void resumeChildren() { 063 for (BundleActionBean action : bundleActions) { 064 if (action.getStatus() == Job.Status.SUSPENDED || action.getStatus() == Job.Status.SUSPENDEDWITHERROR || action.getStatus() == Job.Status.PREPSUSPENDED) { 065 // queue a CoordResumeXCommand 066 if (action.getCoordId() != null) { 067 queue(new CoordResumeXCommand(action.getCoordId())); 068 updateBundleAction(action); 069 LOG.debug("Resume bundle action = [{0}], new status = [{1}], pending = [{2}] and queue CoordResumeXCommand for [{3}]", 070 action.getBundleActionId(), action.getStatus(), action.getPending(), action 071 .getCoordId()); 072 } 073 else { 074 updateBundleAction(action); 075 LOG.debug("Resume bundle action = [{0}], new status = [{1}], pending = [{2}] and coord id is null", 076 action.getBundleActionId(), action.getStatus(), action.getPending()); 077 } 078 } 079 } 080 LOG.debug("Resume bundle actions for the bundle=[{0}]", bundleId); 081 } 082 083 private void updateBundleAction(BundleActionBean action) { 084 if (action.getStatus() == Job.Status.PREPSUSPENDED) { 085 action.setStatus(Job.Status.PREP); 086 } 087 else if (action.getStatus() == Job.Status.SUSPENDED) { 088 action.setStatus(Job.Status.RUNNING); 089 } 090 else if (action.getStatus() == Job.Status.SUSPENDEDWITHERROR) { 091 action.setStatus(Job.Status.RUNNINGWITHERROR); 092 } 093 action.incrementAndGetPending(); 094 action.setLastModifiedTime(new Date()); 095 updateList.add(action); 096 } 097 098 /* (non-Javadoc) 099 * @see org.apache.oozie.command.TransitionXCommand#notifyParent() 100 */ 101 @Override 102 public void notifyParent() throws CommandException { 103 104 } 105 106 /* (non-Javadoc) 107 * @see org.apache.oozie.command.TransitionXCommand#updateJob() 108 */ 109 @Override 110 public void updateJob() { 111 InstrumentUtils.incrJobCounter("bundle_resume", 1, null); 112 bundleJob.setSuspendedTime(null); 113 bundleJob.setLastModifiedTime(new Date()); 114 LOG.debug("Resume bundle job id = " + bundleId + ", status = " + bundleJob.getStatus() + ", pending = " + bundleJob.isPending()); 115 updateList.add(bundleJob); 116 } 117 118 /* (non-Javadoc) 119 * @see org.apache.oozie.command.ResumeTransitionXCommand#performWrites() 120 */ 121 @Override 122 public void performWrites() throws CommandException { 123 try { 124 jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, null)); 125 } 126 catch (JPAExecutorException e) { 127 throw new CommandException(e); 128 } 129 } 130 131 /* (non-Javadoc) 132 * @see org.apache.oozie.command.XCommand#getEntityKey() 133 */ 134 @Override 135 public String getEntityKey() { 136 return bundleId; 137 } 138 139 /* (non-Javadoc) 140 * @see org.apache.oozie.command.XCommand#isLockRequired() 141 */ 142 @Override 143 protected boolean isLockRequired() { 144 return true; 145 } 146 147 /* (non-Javadoc) 148 * @see org.apache.oozie.command.XCommand#loadState() 149 */ 150 @Override 151 protected void loadState() throws CommandException { 152 jpaService = Services.get().get(JPAService.class); 153 if (jpaService == null) { 154 throw new CommandException(ErrorCode.E0610); 155 } 156 157 try { 158 bundleJob = jpaService.execute(new BundleJobGetJPAExecutor(bundleId)); 159 bundleActions = jpaService.execute(new BundleActionsGetJPAExecutor(bundleId)); 160 } 161 catch (Exception Ex) { 162 throw new CommandException(ErrorCode.E0604, bundleId); 163 } 164 165 LogUtils.setLogInfo(bundleJob, logInfo); 166 } 167 168 /* (non-Javadoc) 169 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 170 */ 171 @Override 172 protected void verifyPrecondition() throws CommandException, PreconditionException { 173 if (bundleJob.getStatus() != Job.Status.SUSPENDED && bundleJob.getStatus() != Job.Status.SUSPENDEDWITHERROR && bundleJob.getStatus() != Job.Status.PREPSUSPENDED) { 174 throw new PreconditionException(ErrorCode.E1100, "BundleResumeCommand not Resumed - " 175 + "job not in SUSPENDED/SUSPENDEDWITHERROR/PREPSUSPENDED state " + bundleId); 176 } 177 } 178 179 /* (non-Javadoc) 180 * @see org.apache.oozie.command.TransitionXCommand#getJob() 181 */ 182 @Override 183 public Job getJob() { 184 return bundleJob; 185 } 186 }