001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.command.bundle; 020 021import java.util.Date; 022import java.util.List; 023 024import org.apache.oozie.BundleActionBean; 025import org.apache.oozie.BundleJobBean; 026import org.apache.oozie.ErrorCode; 027import org.apache.oozie.client.Job; 028import org.apache.oozie.command.CommandException; 029import org.apache.oozie.command.PreconditionException; 030import org.apache.oozie.command.ResumeTransitionXCommand; 031import org.apache.oozie.command.coord.CoordResumeXCommand; 032import org.apache.oozie.executor.jpa.BatchQueryExecutor; 033import org.apache.oozie.executor.jpa.BundleActionQueryExecutor; 034import org.apache.oozie.executor.jpa.BundleJobQueryExecutor; 035import org.apache.oozie.executor.jpa.JPAExecutorException; 036import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry; 037import org.apache.oozie.executor.jpa.BundleActionQueryExecutor.BundleActionQuery; 038import org.apache.oozie.executor.jpa.BundleJobQueryExecutor.BundleJobQuery; 039import org.apache.oozie.service.JPAService; 040import org.apache.oozie.service.Services; 041import org.apache.oozie.util.InstrumentUtils; 042import org.apache.oozie.util.LogUtils; 043import org.apache.oozie.util.ParamChecker; 044 045public class BundleJobResumeXCommand extends ResumeTransitionXCommand { 046 047 private final String bundleId; 048 private JPAService jpaService = null; 049 private BundleJobBean bundleJob; 050 private List<BundleActionBean> bundleActions; 051 052 /** 053 * Constructor to create the Bundle Resume Command. 054 * 055 * @param jobId : Bundle Id 056 */ 057 public BundleJobResumeXCommand(String jobId) { 058 super("bundle_resume", "bundle_resume", 1); 059 this.bundleId = ParamChecker.notNull(jobId, "BundleId"); 060 } 061 062 /* (non-Javadoc) 063 * @see org.apache.oozie.command.ResumeTransitionXCommand#resumeChildren() 064 */ 065 @Override 066 public void resumeChildren() { 067 for (BundleActionBean action : bundleActions) { 068 if (action.getStatus() == Job.Status.SUSPENDED || action.getStatus() == Job.Status.SUSPENDEDWITHERROR || action.getStatus() == Job.Status.PREPSUSPENDED) { 069 // queue a CoordResumeXCommand 070 if (action.getCoordId() != null) { 071 queue(new CoordResumeXCommand(action.getCoordId())); 072 updateBundleAction(action); 073 LOG.debug("Resume bundle action = [{0}], new status = [{1}], pending = [{2}] and queue CoordResumeXCommand for [{3}]", 074 action.getBundleActionId(), action.getStatus(), action.getPending(), action 075 .getCoordId()); 076 } 077 else { 078 updateBundleAction(action); 079 LOG.debug("Resume bundle action = [{0}], new status = [{1}], pending = [{2}] and coord id is null", 080 action.getBundleActionId(), action.getStatus(), action.getPending()); 081 } 082 } 083 } 084 LOG.debug("Resume bundle actions for the bundle=[{0}]", bundleId); 085 } 086 087 private void updateBundleAction(BundleActionBean action) { 088 if (action.getStatus() == Job.Status.PREPSUSPENDED) { 089 action.setStatus(Job.Status.PREP); 090 } 091 else if (action.getStatus() == Job.Status.SUSPENDED) { 092 action.setStatus(Job.Status.RUNNING); 093 } 094 else if (action.getStatus() == Job.Status.SUSPENDEDWITHERROR) { 095 action.setStatus(Job.Status.RUNNINGWITHERROR); 096 } 097 action.incrementAndGetPending(); 098 action.setLastModifiedTime(new Date()); 099 updateList.add(new UpdateEntry<BundleActionQuery>(BundleActionQuery.UPDATE_BUNDLE_ACTION_STATUS_PENDING_MODTIME, action)); 100 } 101 102 /* (non-Javadoc) 103 * @see org.apache.oozie.command.TransitionXCommand#notifyParent() 104 */ 105 @Override 106 public void notifyParent() throws CommandException { 107 108 } 109 110 /* (non-Javadoc) 111 * @see org.apache.oozie.command.TransitionXCommand#updateJob() 112 */ 113 @Override 114 public void updateJob() { 115 InstrumentUtils.incrJobCounter("bundle_resume", 1, null); 116 bundleJob.setSuspendedTime(null); 117 bundleJob.setLastModifiedTime(new Date()); 118 LOG.debug("Resume bundle job id = " + bundleId + ", status = " + bundleJob.getStatus() + ", pending = " + bundleJob.isPending()); 119 updateList.add(new UpdateEntry<BundleJobQuery>(BundleJobQuery.UPDATE_BUNDLE_JOB_STATUS_PENDING_SUSP_MOD_TIME, bundleJob)); 120 } 121 122 /* (non-Javadoc) 123 * @see org.apache.oozie.command.ResumeTransitionXCommand#performWrites() 124 */ 125 @Override 126 public void performWrites() throws CommandException { 127 try { 128 BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null); 129 } 130 catch (JPAExecutorException e) { 131 throw new CommandException(e); 132 } 133 } 134 135 /* (non-Javadoc) 136 * @see org.apache.oozie.command.XCommand#getEntityKey() 137 */ 138 @Override 139 public String getEntityKey() { 140 return bundleId; 141 } 142 143 /* (non-Javadoc) 144 * @see org.apache.oozie.command.XCommand#isLockRequired() 145 */ 146 @Override 147 protected boolean isLockRequired() { 148 return true; 149 } 150 151 /* (non-Javadoc) 152 * @see org.apache.oozie.command.XCommand#loadState() 153 */ 154 @Override 155 protected void loadState() throws CommandException { 156 jpaService = Services.get().get(JPAService.class); 157 if (jpaService == null) { 158 throw new CommandException(ErrorCode.E0610); 159 } 160 161 try { 162 bundleJob = BundleJobQueryExecutor.getInstance().get(BundleJobQuery.GET_BUNDLE_JOB, bundleId); 163 bundleActions = BundleActionQueryExecutor.getInstance().getList( 164 BundleActionQuery.GET_BUNDLE_ACTIONS_STATUS_UNIGNORED_FOR_BUNDLE, bundleId); 165 } 166 catch (Exception Ex) { 167 throw new CommandException(ErrorCode.E0604, bundleId); 168 } 169 170 LogUtils.setLogInfo(bundleJob); 171 } 172 173 /* (non-Javadoc) 174 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 175 */ 176 @Override 177 protected void verifyPrecondition() throws CommandException, PreconditionException { 178 if (bundleJob.getStatus() != Job.Status.SUSPENDED && bundleJob.getStatus() != Job.Status.SUSPENDEDWITHERROR && bundleJob.getStatus() != Job.Status.PREPSUSPENDED) { 179 throw new PreconditionException(ErrorCode.E1100, "BundleResumeCommand not Resumed - " 180 + "job not in SUSPENDED/SUSPENDEDWITHERROR/PREPSUSPENDED state " + bundleId); 181 } 182 } 183 184 /* (non-Javadoc) 185 * @see org.apache.oozie.command.TransitionXCommand#getJob() 186 */ 187 @Override 188 public Job getJob() { 189 return bundleJob; 190 } 191}