001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.command.bundle; 019 020import java.util.Date; 021import java.util.List; 022 023import org.apache.oozie.BundleActionBean; 024import org.apache.oozie.BundleJobBean; 025import org.apache.oozie.ErrorCode; 026import org.apache.oozie.client.Job; 027import org.apache.oozie.command.CommandException; 028import org.apache.oozie.command.PreconditionException; 029import org.apache.oozie.command.ResumeTransitionXCommand; 030import org.apache.oozie.command.coord.CoordResumeXCommand; 031import org.apache.oozie.executor.jpa.BatchQueryExecutor; 032import org.apache.oozie.executor.jpa.BundleActionQueryExecutor; 033import org.apache.oozie.executor.jpa.BundleJobQueryExecutor; 034import org.apache.oozie.executor.jpa.JPAExecutorException; 035import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry; 036import org.apache.oozie.executor.jpa.BundleActionQueryExecutor.BundleActionQuery; 037import org.apache.oozie.executor.jpa.BundleJobQueryExecutor.BundleJobQuery; 038import org.apache.oozie.service.JPAService; 039import org.apache.oozie.service.Services; 040import org.apache.oozie.util.InstrumentUtils; 041import org.apache.oozie.util.LogUtils; 042import org.apache.oozie.util.ParamChecker; 043 044public class BundleJobResumeXCommand extends ResumeTransitionXCommand { 045 046 private final String bundleId; 047 private JPAService jpaService = null; 048 private BundleJobBean bundleJob; 049 private List<BundleActionBean> bundleActions; 050 051 /** 052 * Constructor to create the Bundle Resume Command. 053 * 054 * @param jobId : Bundle Id 055 */ 056 public BundleJobResumeXCommand(String jobId) { 057 super("bundle_resume", "bundle_resume", 1); 058 this.bundleId = ParamChecker.notNull(jobId, "BundleId"); 059 } 060 061 /* (non-Javadoc) 062 * @see org.apache.oozie.command.ResumeTransitionXCommand#resumeChildren() 063 */ 064 @Override 065 public void resumeChildren() { 066 for (BundleActionBean action : bundleActions) { 067 if (action.getStatus() == Job.Status.SUSPENDED || action.getStatus() == Job.Status.SUSPENDEDWITHERROR || action.getStatus() == Job.Status.PREPSUSPENDED) { 068 // queue a CoordResumeXCommand 069 if (action.getCoordId() != null) { 070 queue(new CoordResumeXCommand(action.getCoordId())); 071 updateBundleAction(action); 072 LOG.debug("Resume bundle action = [{0}], new status = [{1}], pending = [{2}] and queue CoordResumeXCommand for [{3}]", 073 action.getBundleActionId(), action.getStatus(), action.getPending(), action 074 .getCoordId()); 075 } 076 else { 077 updateBundleAction(action); 078 LOG.debug("Resume bundle action = [{0}], new status = [{1}], pending = [{2}] and coord id is null", 079 action.getBundleActionId(), action.getStatus(), action.getPending()); 080 } 081 } 082 } 083 LOG.debug("Resume bundle actions for the bundle=[{0}]", bundleId); 084 } 085 086 private void updateBundleAction(BundleActionBean action) { 087 if (action.getStatus() == Job.Status.PREPSUSPENDED) { 088 action.setStatus(Job.Status.PREP); 089 } 090 else if (action.getStatus() == Job.Status.SUSPENDED) { 091 action.setStatus(Job.Status.RUNNING); 092 } 093 else if (action.getStatus() == Job.Status.SUSPENDEDWITHERROR) { 094 action.setStatus(Job.Status.RUNNINGWITHERROR); 095 } 096 action.incrementAndGetPending(); 097 action.setLastModifiedTime(new Date()); 098 updateList.add(new UpdateEntry<BundleActionQuery>(BundleActionQuery.UPDATE_BUNDLE_ACTION_STATUS_PENDING_MODTIME, action)); 099 } 100 101 /* (non-Javadoc) 102 * @see org.apache.oozie.command.TransitionXCommand#notifyParent() 103 */ 104 @Override 105 public void notifyParent() throws CommandException { 106 107 } 108 109 /* (non-Javadoc) 110 * @see org.apache.oozie.command.TransitionXCommand#updateJob() 111 */ 112 @Override 113 public void updateJob() { 114 InstrumentUtils.incrJobCounter("bundle_resume", 1, null); 115 bundleJob.setSuspendedTime(null); 116 bundleJob.setLastModifiedTime(new Date()); 117 LOG.debug("Resume bundle job id = " + bundleId + ", status = " + bundleJob.getStatus() + ", pending = " + bundleJob.isPending()); 118 updateList.add(new UpdateEntry<BundleJobQuery>(BundleJobQuery.UPDATE_BUNDLE_JOB_STATUS_PENDING_SUSP_MOD_TIME, bundleJob)); 119 } 120 121 /* (non-Javadoc) 122 * @see org.apache.oozie.command.ResumeTransitionXCommand#performWrites() 123 */ 124 @Override 125 public void performWrites() throws CommandException { 126 try { 127 BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null); 128 } 129 catch (JPAExecutorException e) { 130 throw new CommandException(e); 131 } 132 } 133 134 /* (non-Javadoc) 135 * @see org.apache.oozie.command.XCommand#getEntityKey() 136 */ 137 @Override 138 public String getEntityKey() { 139 return bundleId; 140 } 141 142 /* (non-Javadoc) 143 * @see org.apache.oozie.command.XCommand#isLockRequired() 144 */ 145 @Override 146 protected boolean isLockRequired() { 147 return true; 148 } 149 150 /* (non-Javadoc) 151 * @see org.apache.oozie.command.XCommand#loadState() 152 */ 153 @Override 154 protected void loadState() throws CommandException { 155 jpaService = Services.get().get(JPAService.class); 156 if (jpaService == null) { 157 throw new CommandException(ErrorCode.E0610); 158 } 159 160 try { 161 bundleJob = BundleJobQueryExecutor.getInstance().get(BundleJobQuery.GET_BUNDLE_JOB, bundleId); 162 bundleActions = BundleActionQueryExecutor.getInstance().getList( 163 BundleActionQuery.GET_BUNDLE_ACTIONS_STATUS_UNIGNORED_FOR_BUNDLE, bundleId); 164 } 165 catch (Exception Ex) { 166 throw new CommandException(ErrorCode.E0604, bundleId); 167 } 168 169 LogUtils.setLogInfo(bundleJob, logInfo); 170 } 171 172 /* (non-Javadoc) 173 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 174 */ 175 @Override 176 protected void verifyPrecondition() throws CommandException, PreconditionException { 177 if (bundleJob.getStatus() != Job.Status.SUSPENDED && bundleJob.getStatus() != Job.Status.SUSPENDEDWITHERROR && bundleJob.getStatus() != Job.Status.PREPSUSPENDED) { 178 throw new PreconditionException(ErrorCode.E1100, "BundleResumeCommand not Resumed - " 179 + "job not in SUSPENDED/SUSPENDEDWITHERROR/PREPSUSPENDED state " + bundleId); 180 } 181 } 182 183 /* (non-Javadoc) 184 * @see org.apache.oozie.command.TransitionXCommand#getJob() 185 */ 186 @Override 187 public Job getJob() { 188 return bundleJob; 189 } 190}