001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.command.bundle; 019 020import java.util.Date; 021import java.util.HashMap; 022import java.util.List; 023import java.util.Map; 024 025import org.apache.oozie.BundleActionBean; 026import org.apache.oozie.BundleJobBean; 027import org.apache.oozie.CoordinatorJobBean; 028import org.apache.oozie.XException; 029import org.apache.oozie.client.Job; 030import org.apache.oozie.client.rest.RestConstants; 031import org.apache.oozie.command.CommandException; 032import org.apache.oozie.command.RerunTransitionXCommand; 033import org.apache.oozie.command.coord.CoordRerunXCommand; 034import org.apache.oozie.executor.jpa.BatchQueryExecutor; 035import org.apache.oozie.executor.jpa.BundleActionQueryExecutor; 036import org.apache.oozie.executor.jpa.BundleJobQueryExecutor; 037import org.apache.oozie.executor.jpa.BundleJobQueryExecutor.BundleJobQuery; 038import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; 039import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery; 040import org.apache.oozie.executor.jpa.JPAExecutorException; 041import org.apache.oozie.executor.jpa.BundleActionQueryExecutor.BundleActionQuery; 042import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry; 043import org.apache.oozie.util.DateUtils; 044import org.apache.oozie.util.LogUtils; 045import org.apache.oozie.util.ParamChecker; 046import org.apache.oozie.util.XLog; 047 048/** 049 * Rerun bundle coordinator jobs by a list of coordinator names or dates. User can specify if refresh or noCleanup. 050 * <p/> 051 * The "refresh" is used to indicate if user wants to refresh an action's input/outpur dataset urls 052 * <p/> 053 * The "noCleanup" is used to indicate if user wants to cleanup output events for given rerun actions 054 */ 055public class BundleRerunXCommand extends RerunTransitionXCommand<Void> { 056 057 private final String coordScope; 058 private final String dateScope; 059 private final boolean refresh; 060 private final boolean noCleanup; 061 private BundleJobBean bundleJob; 062 private List<BundleActionBean> bundleActions; 063 protected boolean prevPending; 064 065 /** 066 * The constructor for class {@link BundleRerunXCommand} 067 * 068 * @param jobId the bundle job id 069 * @param coordScope the rerun scope for coordinator job names separated by "," 070 * @param dateScope the rerun scope for coordinator nominal times separated by "," 071 * @param refresh true if user wants to refresh input/outpur dataset urls 072 * @param noCleanup false if user wants to cleanup output events for given rerun actions 073 */ 074 public BundleRerunXCommand(String jobId, String coordScope, String dateScope, boolean refresh, boolean noCleanup) { 075 super("bundle_rerun", "bundle_rerun", 1); 076 this.jobId = ParamChecker.notEmpty(jobId, "jobId"); 077 this.coordScope = coordScope; 078 this.dateScope = dateScope; 079 this.refresh = refresh; 080 this.noCleanup = noCleanup; 081 } 082 083 /* (non-Javadoc) 084 * @see org.apache.oozie.command.XCommand#loadState() 085 */ 086 @Override 087 protected void loadState() throws CommandException { 088 try { 089 this.bundleJob = BundleJobQueryExecutor.getInstance().get(BundleJobQuery.GET_BUNDLE_JOB, jobId); 090 this.bundleActions = BundleActionQueryExecutor.getInstance().getList( 091 BundleActionQuery.GET_BUNDLE_ACTIONS_STATUS_UNIGNORED_FOR_BUNDLE, jobId); 092 LogUtils.setLogInfo(bundleJob, logInfo); 093 super.setJob(bundleJob); 094 prevPending = bundleJob.isPending(); 095 } 096 catch (XException ex) { 097 throw new CommandException(ex); 098 } 099 100 } 101 102 /* (non-Javadoc) 103 * @see org.apache.oozie.command.RerunTransitionXCommand#rerunChildren() 104 */ 105 @Override 106 public void rerunChildren() throws CommandException { 107 boolean isUpdateActionDone = false; 108 Map<String, BundleActionBean> coordNameToBAMapping = new HashMap<String, BundleActionBean>(); 109 if (bundleActions != null) { 110 for (BundleActionBean action : bundleActions) { 111 if (action.getCoordName() != null) { 112 coordNameToBAMapping.put(action.getCoordName(), action); 113 } 114 } 115 } 116 117 if (coordScope != null && !coordScope.isEmpty()) { 118 String[] list = coordScope.split(","); 119 for (String coordName : list) { 120 coordName = coordName.trim(); 121 if (coordNameToBAMapping.keySet().contains(coordName)) { 122 String coordId = coordNameToBAMapping.get(coordName).getCoordId(); 123 if (coordId == null) { 124 LOG.info("No coord id found. Therefore, nothing to queue for coord rerun for coordname: " + coordName); 125 continue; 126 } 127 CoordinatorJobBean coordJob = getCoordJob(coordId); 128 129 String rerunDateScope; 130 if (dateScope != null && !dateScope.isEmpty()) { 131 rerunDateScope = dateScope; 132 } 133 else { 134 String coordStart = DateUtils.formatDateOozieTZ(coordJob.getStartTime()); 135 String coordEnd = DateUtils.formatDateOozieTZ(coordJob.getEndTime()); 136 rerunDateScope = coordStart + "::" + coordEnd; 137 } 138 LOG.debug("Queuing rerun range [" + rerunDateScope + "] for coord id " + coordId + " of bundle " 139 + bundleJob.getId()); 140 queue(new CoordRerunXCommand(coordId, RestConstants.JOB_COORD_SCOPE_DATE, rerunDateScope, refresh, 141 noCleanup)); 142 updateBundleAction(coordNameToBAMapping.get(coordName)); 143 isUpdateActionDone = true; 144 } 145 else { 146 LOG.info("Rerun for coord " + coordName + " NOT performed because it is not in bundle ", bundleJob.getId()); 147 } 148 } 149 } 150 else if (dateScope != null && !dateScope.isEmpty()) { 151 if (bundleActions != null) { 152 for (BundleActionBean action : bundleActions) { 153 if (action.getCoordId() == null) { 154 LOG.info("No coord id found. Therefore nothing to queue for coord rerun with coord name " 155 + action.getCoordName()); 156 continue; 157 } 158 LOG.debug("Queuing rerun range [" + dateScope + "] for coord id " + action.getCoordId() + " of bundle " 159 + bundleJob.getId()); 160 queue(new CoordRerunXCommand(action.getCoordId(), RestConstants.JOB_COORD_SCOPE_DATE, dateScope, 161 refresh, noCleanup)); 162 updateBundleAction(action); 163 isUpdateActionDone = true; 164 } 165 } 166 } 167 if (!isUpdateActionDone) { 168 transitToPrevious(); 169 } 170 LOG.info("Rerun coord jobs for the bundle=[{0}]", jobId); 171 } 172 173 private final void transitToPrevious() throws CommandException { 174 bundleJob.setStatus(getPrevStatus()); 175 if (!prevPending) { 176 bundleJob.resetPending(); 177 } 178 else { 179 bundleJob.setPending(); 180 } 181 updateJob(); 182 } 183 184 /** 185 * Update bundle action 186 * 187 * @param action the bundle action 188 * @throws CommandException thrown if failed to update bundle action 189 */ 190 private void updateBundleAction(BundleActionBean action) { 191 action.incrementAndGetPending(); 192 action.setLastModifiedTime(new Date()); 193 updateList.add(new UpdateEntry<BundleActionQuery>(BundleActionQuery.UPDATE_BUNDLE_ACTION_PENDING_MODTIME, action)); 194 } 195 196 /* (non-Javadoc) 197 * @see org.apache.oozie.command.TransitionXCommand#updateJob() 198 */ 199 @Override 200 public void updateJob() { 201 // rerun a paused bundle job will keep job status at paused and pending at previous pending 202 if (getPrevStatus() != null) { 203 Job.Status bundleJobStatus = getPrevStatus(); 204 if (bundleJobStatus.equals(Job.Status.PAUSED) || bundleJobStatus.equals(Job.Status.PAUSEDWITHERROR)) { 205 bundleJob.setStatus(bundleJobStatus); 206 if (prevPending) { 207 bundleJob.setPending(); 208 } 209 else { 210 bundleJob.resetPending(); 211 } 212 } 213 } 214 updateList.add(new UpdateEntry<BundleJobQuery>(BundleJobQuery.UPDATE_BUNDLE_JOB_STATUS_PENDING, bundleJob)); 215 } 216 217 /* (non-Javadoc) 218 * @see org.apache.oozie.command.RerunTransitionXCommand#performWrites() 219 */ 220 @Override 221 public void performWrites() throws CommandException { 222 try { 223 BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null); 224 } 225 catch (JPAExecutorException e) { 226 throw new CommandException(e); 227 } 228 } 229 230 /* (non-Javadoc) 231 * @see org.apache.oozie.command.XCommand#getEntityKey() 232 */ 233 @Override 234 public String getEntityKey() { 235 return jobId; 236 } 237 238 /* (non-Javadoc) 239 * @see org.apache.oozie.command.XCommand#isLockRequired() 240 */ 241 @Override 242 protected boolean isLockRequired() { 243 return true; 244 } 245 246 /* 247 * (non-Javadoc) 248 * @see org.apache.oozie.command.TransitionXCommand#getJob() 249 */ 250 @Override 251 public Job getJob() { 252 return bundleJob; 253 } 254 255 /* (non-Javadoc) 256 * @see org.apache.oozie.command.TransitionXCommand#notifyParent() 257 */ 258 @Override 259 public void notifyParent() throws CommandException { 260 261 } 262 263 /* (non-Javadoc) 264 * @see org.apache.oozie.command.RerunTransitionXCommand#getLog() 265 */ 266 @Override 267 public XLog getLog() { 268 return LOG; 269 } 270 271 private final CoordinatorJobBean getCoordJob(String coordId) throws CommandException { 272 try { 273 CoordinatorJobBean job = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, coordId); 274 return job; 275 } 276 catch (JPAExecutorException je) { 277 throw new CommandException(je); 278 } 279 280 } 281 282}