001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.coord; 019 020 import java.util.Date; 021 import java.util.HashSet; 022 import java.util.Map; 023 import java.util.Set; 024 import java.util.Map.Entry; 025 026 import org.apache.oozie.CoordinatorActionBean; 027 import org.apache.oozie.CoordinatorJobBean; 028 import org.apache.oozie.ErrorCode; 029 import org.apache.oozie.XException; 030 import org.apache.oozie.client.CoordinatorJob; 031 import org.apache.oozie.client.Job; 032 import org.apache.oozie.client.OozieClient; 033 import org.apache.oozie.command.CommandException; 034 import org.apache.oozie.command.PreconditionException; 035 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand; 036 import org.apache.oozie.executor.jpa.CoordActionRemoveJPAExecutor; 037 import org.apache.oozie.executor.jpa.CoordJobGetActionByActionNumberJPAExecutor; 038 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; 039 import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor; 040 import org.apache.oozie.executor.jpa.JPAExecutorException; 041 import org.apache.oozie.service.JPAService; 042 import org.apache.oozie.service.Services; 043 import org.apache.oozie.util.DateUtils; 044 import org.apache.oozie.util.JobUtils; 045 import org.apache.oozie.util.LogUtils; 046 import org.apache.oozie.util.ParamChecker; 047 048 public class CoordChangeXCommand extends CoordinatorXCommand<Void> { 049 private final String jobId; 050 private Date newEndTime = null; 051 private Integer newConcurrency = null; 052 private Date newPauseTime = null; 053 private Date oldPauseTime = null; 054 private boolean resetPauseTime = false; 055 private CoordinatorJobBean coordJob; 056 private JPAService jpaService = null; 057 private Job.Status prevStatus; 058 059 private static final Set<String> ALLOWED_CHANGE_OPTIONS = new HashSet<String>(); 060 static { 061 ALLOWED_CHANGE_OPTIONS.add("endtime"); 062 ALLOWED_CHANGE_OPTIONS.add("concurrency"); 063 ALLOWED_CHANGE_OPTIONS.add("pausetime"); 064 } 065 066 /** 067 * This command is used to update the Coordinator job with the new values Update the coordinator job bean and update 068 * that to database. 069 * 070 * @param id Coordinator job id. 071 * @param changeValue This the changed value in the form key=value. 072 * @throws CommandException thrown if changeValue cannot be parsed properly. 073 */ 074 public CoordChangeXCommand(String id, String changeValue) throws CommandException { 075 super("coord_change", "coord_change", 0); 076 this.jobId = ParamChecker.notEmpty(id, "id"); 077 ParamChecker.notEmpty(changeValue, "value"); 078 079 validateChangeValue(changeValue); 080 } 081 082 /** 083 * @param changeValue change value. 084 * @throws CommandException thrown if changeValue cannot be parsed properly. 085 */ 086 private void validateChangeValue(String changeValue) throws CommandException { 087 Map<String, String> map = JobUtils.parseChangeValue(changeValue); 088 089 if (map.size() > ALLOWED_CHANGE_OPTIONS.size()) { 090 throw new CommandException(ErrorCode.E1015, changeValue, "must change endtime|concurrency|pausetime"); 091 } 092 093 java.util.Iterator<Entry<String, String>> iter = map.entrySet().iterator(); 094 while (iter.hasNext()) { 095 Entry<String, String> entry = iter.next(); 096 String key = entry.getKey(); 097 String value = entry.getValue(); 098 099 if (!ALLOWED_CHANGE_OPTIONS.contains(key)) { 100 throw new CommandException(ErrorCode.E1015, changeValue, "must change endtime|concurrency|pausetime"); 101 } 102 103 if (!key.equals(OozieClient.CHANGE_VALUE_PAUSETIME) && value.equalsIgnoreCase("")) { 104 throw new CommandException(ErrorCode.E1015, changeValue, "value on " + key + " can not be empty"); 105 } 106 } 107 108 if (map.containsKey(OozieClient.CHANGE_VALUE_ENDTIME)) { 109 String value = map.get(OozieClient.CHANGE_VALUE_ENDTIME); 110 try { 111 newEndTime = DateUtils.parseDateUTC(value); 112 } 113 catch (Exception ex) { 114 throw new CommandException(ErrorCode.E1015, value, "must be a valid date"); 115 } 116 } 117 118 if (map.containsKey(OozieClient.CHANGE_VALUE_CONCURRENCY)) { 119 String value = map.get(OozieClient.CHANGE_VALUE_CONCURRENCY); 120 try { 121 newConcurrency = Integer.parseInt(value); 122 } 123 catch (NumberFormatException ex) { 124 throw new CommandException(ErrorCode.E1015, value, "must be a valid integer"); 125 } 126 } 127 128 if (map.containsKey(OozieClient.CHANGE_VALUE_PAUSETIME)) { 129 String value = map.get(OozieClient.CHANGE_VALUE_PAUSETIME); 130 if (value.equals("")) { // this is to reset pause time to null; 131 resetPauseTime = true; 132 } 133 else { 134 try { 135 newPauseTime = DateUtils.parseDateUTC(value); 136 } 137 catch (Exception ex) { 138 throw new CommandException(ErrorCode.E1015, value, "must be a valid date"); 139 } 140 } 141 } 142 } 143 144 /** 145 * Check if new end time is valid. 146 * 147 * @param coordJob coordinator job id. 148 * @param newEndTime new end time. 149 * @throws CommandException thrown if new end time is not valid. 150 */ 151 private void checkEndTime(CoordinatorJobBean coordJob, Date newEndTime) throws CommandException { 152 // New endTime cannot be before coordinator job's start time. 153 Date startTime = coordJob.getStartTime(); 154 if (newEndTime.before(startTime)) { 155 throw new CommandException(ErrorCode.E1015, newEndTime, "cannot be before coordinator job's start time [" 156 + startTime + "]"); 157 } 158 159 // New endTime cannot be before coordinator job's last action time. 160 Date lastActionTime = coordJob.getLastActionTime(); 161 if (lastActionTime != null) { 162 Date d = new Date(lastActionTime.getTime() - coordJob.getFrequency() * 60 * 1000); 163 if (!newEndTime.after(d)) { 164 throw new CommandException(ErrorCode.E1015, newEndTime, 165 "must be after coordinator job's last action time [" + d + "]"); 166 } 167 } 168 } 169 170 /** 171 * Check if new pause time is valid. 172 * 173 * @param coordJob coordinator job id. 174 * @param newPauseTime new pause time. 175 * @param newEndTime new end time, can be null meaning no change on end time. 176 * @throws CommandException thrown if new pause time is not valid. 177 */ 178 private void checkPauseTime(CoordinatorJobBean coordJob, Date newPauseTime) 179 throws CommandException { 180 // New pauseTime has to be a non-past time. 181 Date d = new Date(); 182 if (newPauseTime.before(d)) { 183 throw new CommandException(ErrorCode.E1015, newPauseTime, "must be a non-past time"); 184 } 185 } 186 187 /** 188 * Process lookahead created actions that become invalid because of the new pause time, 189 * These actions will be deleted from DB, also the coordinator job will be updated accordingly 190 * 191 * @param coordJob coordinator job 192 * @param newPauseTime new pause time 193 */ 194 private void processLookaheadActions(CoordinatorJobBean coordJob, Date newPauseTime) throws CommandException { 195 Date lastActionTime = coordJob.getLastActionTime(); 196 if (lastActionTime != null) { 197 // d is the real last action time. 198 Date d = new Date(lastActionTime.getTime() - coordJob.getFrequency() * 60 * 1000); 199 int lastActionNumber = coordJob.getLastActionNumber(); 200 201 boolean hasChanged = false; 202 while (true) { 203 if (!newPauseTime.after(d)) { 204 deleteAction(coordJob.getId(), lastActionNumber); 205 d = new Date(d.getTime() - coordJob.getFrequency() * 60 * 1000); 206 lastActionNumber = lastActionNumber - 1; 207 208 hasChanged = true; 209 } 210 else { 211 break; 212 } 213 } 214 215 if (hasChanged == true) { 216 coordJob.setLastActionNumber(lastActionNumber); 217 Date d1 = new Date(d.getTime() + coordJob.getFrequency() * 60 * 1000); 218 coordJob.setLastActionTime(d1); 219 coordJob.setNextMaterializedTime(d1); 220 coordJob.resetDoneMaterialization(); 221 } 222 } 223 } 224 225 /** 226 * Delete last action for a coordinator job 227 * 228 * @param coordJob coordinator job 229 * @param lastActionNum last action number of the coordinator job 230 */ 231 private void deleteAction(String jobId, int lastActionNum) throws CommandException { 232 try { 233 CoordinatorActionBean actionBean = jpaService.execute(new CoordJobGetActionByActionNumberJPAExecutor(jobId, lastActionNum)); 234 jpaService.execute(new CoordActionRemoveJPAExecutor(actionBean.getId())); 235 } 236 catch (JPAExecutorException e) { 237 throw new CommandException(e); 238 } 239 } 240 241 /** 242 * Check if new end time, new concurrency, new pause time are valid. 243 * 244 * @param coordJob coordinator job id. 245 * @param newEndTime new end time. 246 * @param newConcurrency new concurrency. 247 * @param newPauseTime new pause time. 248 * @throws CommandException thrown if new values are not valid. 249 */ 250 private void check(CoordinatorJobBean coordJob, Date newEndTime, Integer newConcurrency, Date newPauseTime) 251 throws CommandException { 252 if (coordJob.getStatus() == CoordinatorJob.Status.KILLED) { 253 throw new CommandException(ErrorCode.E1016); 254 } 255 256 if (newEndTime != null) { 257 checkEndTime(coordJob, newEndTime); 258 } 259 260 if (newPauseTime != null) { 261 checkPauseTime(coordJob, newPauseTime); 262 } 263 } 264 265 /* (non-Javadoc) 266 * @see org.apache.oozie.command.XCommand#execute() 267 */ 268 @Override 269 protected Void execute() throws CommandException { 270 LOG.info("STARTED CoordChangeXCommand for jobId=" + jobId); 271 272 try { 273 if (newEndTime != null) { 274 coordJob.setEndTime(newEndTime); 275 if (coordJob.getStatus() == CoordinatorJob.Status.SUCCEEDED 276 || coordJob.getStatus() == CoordinatorJob.Status.RUNNING 277 || coordJob.getStatus() == CoordinatorJob.Status.DONEWITHERROR 278 || coordJob.getStatus() == CoordinatorJob.Status.FAILED) { 279 coordJob.setStatus(CoordinatorJob.Status.RUNNING); 280 coordJob.setPending(); 281 coordJob.resetDoneMaterialization(); 282 } 283 } 284 285 if (newConcurrency != null) { 286 this.coordJob.setConcurrency(newConcurrency); 287 } 288 289 if (newPauseTime != null || resetPauseTime == true) { 290 this.coordJob.setPauseTime(newPauseTime); 291 if (oldPauseTime != null && newPauseTime != null) { 292 if (oldPauseTime.before(newPauseTime) && this.coordJob.getStatus() == Job.Status.PAUSED) { 293 this.coordJob.setStatus(Job.Status.RUNNING); 294 } 295 } 296 else if (oldPauseTime != null && newPauseTime == null && this.coordJob.getStatus() == Job.Status.PAUSED) { 297 this.coordJob.setStatus(Job.Status.RUNNING); 298 } 299 300 if (!resetPauseTime) { 301 processLookaheadActions(coordJob, newPauseTime); 302 } 303 } 304 305 jpaService.execute(new CoordJobUpdateJPAExecutor(this.coordJob)); 306 307 return null; 308 } 309 catch (XException ex) { 310 throw new CommandException(ex); 311 } 312 finally { 313 LOG.info("ENDED CoordChangeXCommand for jobId=" + jobId); 314 // update bundle action 315 if (coordJob.getBundleId() != null) { 316 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus); 317 bundleStatusUpdate.call(); 318 } 319 } 320 } 321 322 /* (non-Javadoc) 323 * @see org.apache.oozie.command.XCommand#getEntityKey() 324 */ 325 @Override 326 protected String getEntityKey() { 327 return this.jobId; 328 } 329 330 /* (non-Javadoc) 331 * @see org.apache.oozie.command.XCommand#loadState() 332 */ 333 @Override 334 protected void loadState() throws CommandException{ 335 jpaService = Services.get().get(JPAService.class); 336 337 if (jpaService == null) { 338 throw new CommandException(ErrorCode.E0610); 339 } 340 341 try { 342 this.coordJob = jpaService.execute(new CoordJobGetJPAExecutor(jobId)); 343 oldPauseTime = coordJob.getPauseTime(); 344 prevStatus = coordJob.getStatus(); 345 } 346 catch (JPAExecutorException e) { 347 throw new CommandException(e); 348 } 349 350 LogUtils.setLogInfo(this.coordJob, logInfo); 351 } 352 353 /* (non-Javadoc) 354 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 355 */ 356 @Override 357 protected void verifyPrecondition() throws CommandException,PreconditionException { 358 check(this.coordJob, newEndTime, newConcurrency, newPauseTime); 359 } 360 361 /* (non-Javadoc) 362 * @see org.apache.oozie.command.XCommand#isLockRequired() 363 */ 364 @Override 365 protected boolean isLockRequired() { 366 return true; 367 } 368 }