001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.coord; 019 020 import java.util.ArrayList; 021 import java.util.Calendar; 022 import java.util.Date; 023 import java.util.HashSet; 024 import java.util.List; 025 import java.util.Map; 026 import java.util.Map.Entry; 027 import java.util.Set; 028 029 import org.apache.oozie.CoordinatorActionBean; 030 import org.apache.oozie.CoordinatorJobBean; 031 import org.apache.oozie.ErrorCode; 032 import org.apache.oozie.XException; 033 import org.apache.oozie.client.CoordinatorJob; 034 import org.apache.oozie.client.Job; 035 import org.apache.oozie.client.OozieClient; 036 import org.apache.oozie.client.rest.JsonBean; 037 import org.apache.oozie.command.CommandException; 038 import org.apache.oozie.command.PreconditionException; 039 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand; 040 import org.apache.oozie.coord.TimeUnit; 041 import org.apache.oozie.executor.jpa.BulkUpdateDeleteJPAExecutor; 042 import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor; 043 import org.apache.oozie.executor.jpa.CoordJobGetActionByActionNumberJPAExecutor; 044 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; 045 import org.apache.oozie.executor.jpa.JPAExecutorException; 046 import org.apache.oozie.service.JPAService; 047 import org.apache.oozie.service.Services; 048 import org.apache.oozie.util.DateUtils; 049 import org.apache.oozie.util.JobUtils; 050 import org.apache.oozie.util.LogUtils; 051 import org.apache.oozie.util.ParamChecker; 052 import org.apache.oozie.util.StatusUtils; 053 054 public class CoordChangeXCommand extends CoordinatorXCommand<Void> { 055 private final String jobId; 056 private Date newEndTime = null; 057 private Integer newConcurrency = null; 058 private Date newPauseTime = null; 059 private Date oldPauseTime = null; 060 private boolean resetPauseTime = false; 061 private CoordinatorJobBean coordJob; 062 private JPAService jpaService = null; 063 private Job.Status prevStatus; 064 private List<JsonBean> updateList = new ArrayList<JsonBean>(); 065 private List<JsonBean> deleteList = new ArrayList<JsonBean>(); 066 067 private static final Set<String> ALLOWED_CHANGE_OPTIONS = new HashSet<String>(); 068 static { 069 ALLOWED_CHANGE_OPTIONS.add("endtime"); 070 ALLOWED_CHANGE_OPTIONS.add("concurrency"); 071 ALLOWED_CHANGE_OPTIONS.add("pausetime"); 072 } 073 074 /** 075 * This command is used to update the Coordinator job with the new values Update the coordinator job bean and update 076 * that to database. 077 * 078 * @param id Coordinator job id. 079 * @param changeValue This the changed value in the form key=value. 080 * @throws CommandException thrown if changeValue cannot be parsed properly. 081 */ 082 public CoordChangeXCommand(String id, String changeValue) throws CommandException { 083 super("coord_change", "coord_change", 0); 084 this.jobId = ParamChecker.notEmpty(id, "id"); 085 ParamChecker.notEmpty(changeValue, "value"); 086 087 validateChangeValue(changeValue); 088 } 089 090 /** 091 * @param changeValue change value. 092 * @throws CommandException thrown if changeValue cannot be parsed properly. 093 */ 094 private void validateChangeValue(String changeValue) throws CommandException { 095 Map<String, String> map = JobUtils.parseChangeValue(changeValue); 096 097 if (map.size() > ALLOWED_CHANGE_OPTIONS.size()) { 098 throw new CommandException(ErrorCode.E1015, changeValue, "must change endtime|concurrency|pausetime"); 099 } 100 101 java.util.Iterator<Entry<String, String>> iter = map.entrySet().iterator(); 102 while (iter.hasNext()) { 103 Entry<String, String> entry = iter.next(); 104 String key = entry.getKey(); 105 String value = entry.getValue(); 106 107 if (!ALLOWED_CHANGE_OPTIONS.contains(key)) { 108 throw new CommandException(ErrorCode.E1015, changeValue, "must change endtime|concurrency|pausetime"); 109 } 110 111 if (!key.equals(OozieClient.CHANGE_VALUE_PAUSETIME) && value.equalsIgnoreCase("")) { 112 throw new CommandException(ErrorCode.E1015, changeValue, "value on " + key + " can not be empty"); 113 } 114 } 115 116 if (map.containsKey(OozieClient.CHANGE_VALUE_ENDTIME)) { 117 String value = map.get(OozieClient.CHANGE_VALUE_ENDTIME); 118 try { 119 newEndTime = DateUtils.parseDateOozieTZ(value); 120 } 121 catch (Exception ex) { 122 throw new CommandException(ErrorCode.E1015, value, "must be a valid date"); 123 } 124 } 125 126 if (map.containsKey(OozieClient.CHANGE_VALUE_CONCURRENCY)) { 127 String value = map.get(OozieClient.CHANGE_VALUE_CONCURRENCY); 128 try { 129 newConcurrency = Integer.parseInt(value); 130 } 131 catch (NumberFormatException ex) { 132 throw new CommandException(ErrorCode.E1015, value, "must be a valid integer"); 133 } 134 } 135 136 if (map.containsKey(OozieClient.CHANGE_VALUE_PAUSETIME)) { 137 String value = map.get(OozieClient.CHANGE_VALUE_PAUSETIME); 138 if (value.equals("")) { // this is to reset pause time to null; 139 resetPauseTime = true; 140 } 141 else { 142 try { 143 newPauseTime = DateUtils.parseDateOozieTZ(value); 144 } 145 catch (Exception ex) { 146 throw new CommandException(ErrorCode.E1015, value, "must be a valid date"); 147 } 148 } 149 } 150 } 151 152 /** 153 * Returns the actual last action time(one instance before coordJob.lastActionTime) 154 * @return Date - last action time if coordJob.getLastActionTime() is not null, null otherwise 155 */ 156 private Date getLastActionTime() { 157 if(coordJob.getLastActionTime() == null) 158 return null; 159 160 Calendar d = Calendar.getInstance(DateUtils.getTimeZone(coordJob.getTimeZone())); 161 d.setTime(coordJob.getLastActionTime()); 162 TimeUnit timeUnit = TimeUnit.valueOf(coordJob.getTimeUnitStr()); 163 d.add(timeUnit.getCalendarUnit(), -coordJob.getFrequency()); 164 return d.getTime(); 165 } 166 167 /** 168 * Check if new end time is valid. 169 * 170 * @param coordJob coordinator job id. 171 * @param newEndTime new end time. 172 * @throws CommandException thrown if new end time is not valid. 173 */ 174 private void checkEndTime(CoordinatorJobBean coordJob, Date newEndTime) throws CommandException { 175 // New endTime cannot be before coordinator job's start time. 176 Date startTime = coordJob.getStartTime(); 177 if (newEndTime.before(startTime)) { 178 throw new CommandException(ErrorCode.E1015, newEndTime, "cannot be before coordinator job's start time [" 179 + startTime + "]"); 180 } 181 182 // New endTime cannot be before coordinator job's last action time. 183 Date lastActionTime = getLastActionTime(); 184 if (lastActionTime != null) { 185 if (!newEndTime.after(lastActionTime)) { 186 throw new CommandException(ErrorCode.E1015, newEndTime, 187 "must be after coordinator job's last action time [" + lastActionTime + "]"); 188 } 189 } 190 } 191 192 /** 193 * Check if new pause time is valid. 194 * 195 * @param coordJob coordinator job id. 196 * @param newPauseTime new pause time. 197 * @param newEndTime new end time, can be null meaning no change on end time. 198 * @throws CommandException thrown if new pause time is not valid. 199 */ 200 private void checkPauseTime(CoordinatorJobBean coordJob, Date newPauseTime) 201 throws CommandException { 202 // New pauseTime has to be a non-past time. 203 Date d = new Date(); 204 if (newPauseTime.before(d)) { 205 throw new CommandException(ErrorCode.E1015, newPauseTime, "must be a non-past time"); 206 } 207 } 208 209 /** 210 * Process lookahead created actions that become invalid because of the new pause time, 211 * These actions will be deleted from DB, also the coordinator job will be updated accordingly 212 * 213 * @param coordJob coordinator job 214 * @param newPauseTime new pause time 215 */ 216 private void processLookaheadActions(CoordinatorJobBean coordJob, Date newPauseTime) throws CommandException { 217 Date lastActionTime = coordJob.getLastActionTime(); 218 if (lastActionTime != null) { 219 // d is the real last action time. 220 Calendar d = Calendar.getInstance(DateUtils.getTimeZone(coordJob.getTimeZone())); 221 d.setTime(getLastActionTime()); 222 TimeUnit timeUnit = TimeUnit.valueOf(coordJob.getTimeUnitStr()); 223 224 int lastActionNumber = coordJob.getLastActionNumber(); 225 226 boolean hasChanged = false; 227 while (true) { 228 if (!newPauseTime.after(d.getTime())) { 229 deleteAction(lastActionNumber); 230 d.add(timeUnit.getCalendarUnit(), -coordJob.getFrequency()); 231 lastActionNumber = lastActionNumber - 1; 232 233 hasChanged = true; 234 } 235 else { 236 break; 237 } 238 } 239 240 if (hasChanged == true) { 241 coordJob.setLastActionNumber(lastActionNumber); 242 d.add(timeUnit.getCalendarUnit(), coordJob.getFrequency()); 243 Date d1 = d.getTime(); 244 coordJob.setLastActionTime(d1); 245 coordJob.setNextMaterializedTime(d1); 246 coordJob.resetDoneMaterialization(); 247 } 248 } 249 } 250 251 /** 252 * Delete coordinator action 253 * 254 * @param actionNum coordinator action number 255 */ 256 private void deleteAction(int actionNum) throws CommandException { 257 try { 258 String actionId = jpaService.execute(new CoordJobGetActionByActionNumberJPAExecutor(jobId, actionNum)); 259 CoordinatorActionBean bean = jpaService.execute(new CoordActionGetJPAExecutor(actionId)); 260 deleteList.add(bean); 261 } catch (JPAExecutorException e) { 262 throw new CommandException(e); 263 } 264 } 265 266 /** 267 * Check if new end time, new concurrency, new pause time are valid. 268 * 269 * @param coordJob coordinator job id. 270 * @param newEndTime new end time. 271 * @param newConcurrency new concurrency. 272 * @param newPauseTime new pause time. 273 * @throws CommandException thrown if new values are not valid. 274 */ 275 private void check(CoordinatorJobBean coordJob, Date newEndTime, Integer newConcurrency, Date newPauseTime) 276 throws CommandException { 277 if (coordJob.getStatus() == CoordinatorJob.Status.KILLED) { 278 throw new CommandException(ErrorCode.E1016); 279 } 280 281 if (newEndTime != null) { 282 checkEndTime(coordJob, newEndTime); 283 } 284 285 if (newPauseTime != null) { 286 checkPauseTime(coordJob, newPauseTime); 287 } 288 } 289 290 /* (non-Javadoc) 291 * @see org.apache.oozie.command.XCommand#execute() 292 */ 293 @Override 294 protected Void execute() throws CommandException { 295 LOG.info("STARTED CoordChangeXCommand for jobId=" + jobId); 296 297 try { 298 if (newEndTime != null) { 299 coordJob.setEndTime(newEndTime); 300 if (coordJob.getStatus() == CoordinatorJob.Status.SUCCEEDED){ 301 coordJob.setStatus(CoordinatorJob.Status.RUNNING); 302 } 303 if (coordJob.getStatus() == CoordinatorJob.Status.DONEWITHERROR 304 || coordJob.getStatus() == CoordinatorJob.Status.FAILED) { 305 // Check for backward compatibility for Oozie versions (3.2 and before) 306 // when RUNNINGWITHERROR, SUSPENDEDWITHERROR and 307 // PAUSEDWITHERROR is not supported 308 coordJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(CoordinatorJob.Status.RUNNINGWITHERROR)); 309 } 310 coordJob.setPending(); 311 coordJob.resetDoneMaterialization(); 312 } 313 314 if (newConcurrency != null) { 315 this.coordJob.setConcurrency(newConcurrency); 316 } 317 318 if (newPauseTime != null || resetPauseTime == true) { 319 this.coordJob.setPauseTime(newPauseTime); 320 if (oldPauseTime != null && newPauseTime != null) { 321 if (oldPauseTime.before(newPauseTime)) { 322 if (this.coordJob.getStatus() == Job.Status.PAUSED) { 323 this.coordJob.setStatus(Job.Status.RUNNING); 324 } 325 else if (this.coordJob.getStatus() == Job.Status.PAUSEDWITHERROR) { 326 this.coordJob.setStatus(Job.Status.RUNNINGWITHERROR); 327 } 328 } 329 } 330 else if (oldPauseTime != null && newPauseTime == null) { 331 if (this.coordJob.getStatus() == Job.Status.PAUSED) { 332 this.coordJob.setStatus(Job.Status.RUNNING); 333 } 334 else if (this.coordJob.getStatus() == Job.Status.PAUSEDWITHERROR) { 335 this.coordJob.setStatus(Job.Status.RUNNINGWITHERROR); 336 } 337 } 338 if (!resetPauseTime) { 339 processLookaheadActions(coordJob, newPauseTime); 340 } 341 } 342 343 if (coordJob.getNextMaterializedTime() != null && coordJob.getEndTime().compareTo(coordJob.getNextMaterializedTime()) <= 0) { 344 LOG.info("[" + coordJob.getId() + "]: all actions have been materialized, job status = " + coordJob.getStatus() 345 + ", set pending to true"); 346 // set doneMaterialization to true when materialization is done 347 coordJob.setDoneMaterialization(); 348 } 349 350 updateList.add(coordJob); 351 jpaService.execute(new BulkUpdateDeleteJPAExecutor(updateList, deleteList, false)); 352 353 return null; 354 } 355 catch (XException ex) { 356 throw new CommandException(ex); 357 } 358 finally { 359 LOG.info("ENDED CoordChangeXCommand for jobId=" + jobId); 360 // update bundle action 361 if (coordJob.getBundleId() != null) { 362 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus); 363 bundleStatusUpdate.call(); 364 } 365 } 366 } 367 368 /* (non-Javadoc) 369 * @see org.apache.oozie.command.XCommand#getEntityKey() 370 */ 371 @Override 372 public String getEntityKey() { 373 return this.jobId; 374 } 375 376 /* (non-Javadoc) 377 * @see org.apache.oozie.command.XCommand#loadState() 378 */ 379 @Override 380 protected void loadState() throws CommandException{ 381 jpaService = Services.get().get(JPAService.class); 382 383 if (jpaService == null) { 384 throw new CommandException(ErrorCode.E0610); 385 } 386 387 try { 388 this.coordJob = jpaService.execute(new CoordJobGetJPAExecutor(jobId)); 389 oldPauseTime = coordJob.getPauseTime(); 390 prevStatus = coordJob.getStatus(); 391 } 392 catch (JPAExecutorException e) { 393 throw new CommandException(e); 394 } 395 396 LogUtils.setLogInfo(this.coordJob, logInfo); 397 } 398 399 /* (non-Javadoc) 400 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 401 */ 402 @Override 403 protected void verifyPrecondition() throws CommandException,PreconditionException { 404 check(this.coordJob, newEndTime, newConcurrency, newPauseTime); 405 } 406 407 /* (non-Javadoc) 408 * @see org.apache.oozie.command.XCommand#isLockRequired() 409 */ 410 @Override 411 protected boolean isLockRequired() { 412 return true; 413 } 414 }