001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.coord; 019 020 import java.util.ArrayList; 021 import java.util.Calendar; 022 import java.util.Date; 023 import java.util.HashSet; 024 import java.util.List; 025 import java.util.Map; 026 import java.util.Map.Entry; 027 import java.util.Set; 028 029 import org.apache.oozie.CoordinatorActionBean; 030 import org.apache.oozie.CoordinatorJobBean; 031 import org.apache.oozie.ErrorCode; 032 import org.apache.oozie.XException; 033 import org.apache.oozie.client.CoordinatorJob; 034 import org.apache.oozie.client.Job; 035 import org.apache.oozie.client.OozieClient; 036 import org.apache.oozie.client.rest.JsonBean; 037 import org.apache.oozie.command.CommandException; 038 import org.apache.oozie.command.PreconditionException; 039 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand; 040 import org.apache.oozie.coord.TimeUnit; 041 import org.apache.oozie.executor.jpa.BulkUpdateDeleteJPAExecutor; 042 import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor; 043 import org.apache.oozie.executor.jpa.CoordJobGetActionByActionNumberJPAExecutor; 044 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; 045 import org.apache.oozie.executor.jpa.JPAExecutorException; 046 import org.apache.oozie.executor.jpa.sla.SLARegistrationGetJPAExecutor; 047 import org.apache.oozie.executor.jpa.sla.SLASummaryGetJPAExecutor; 048 import org.apache.oozie.service.JPAService; 049 import org.apache.oozie.service.Services; 050 import org.apache.oozie.sla.SLARegistrationBean; 051 import org.apache.oozie.sla.SLASummaryBean; 052 import org.apache.oozie.sla.service.SLAService; 053 import org.apache.oozie.util.DateUtils; 054 import org.apache.oozie.util.JobUtils; 055 import org.apache.oozie.util.LogUtils; 056 import org.apache.oozie.util.ParamChecker; 057 import org.apache.oozie.util.StatusUtils; 058 059 public class CoordChangeXCommand extends CoordinatorXCommand<Void> { 060 private final String jobId; 061 private Date newEndTime = null; 062 private Integer newConcurrency = null; 063 private Date newPauseTime = null; 064 private Date oldPauseTime = null; 065 private boolean resetPauseTime = false; 066 private CoordinatorJobBean coordJob; 067 private JPAService jpaService = null; 068 private Job.Status prevStatus; 069 private List<JsonBean> updateList = new ArrayList<JsonBean>(); 070 private List<JsonBean> deleteList = new ArrayList<JsonBean>(); 071 072 private static final Set<String> ALLOWED_CHANGE_OPTIONS = new HashSet<String>(); 073 static { 074 ALLOWED_CHANGE_OPTIONS.add("endtime"); 075 ALLOWED_CHANGE_OPTIONS.add("concurrency"); 076 ALLOWED_CHANGE_OPTIONS.add("pausetime"); 077 } 078 079 /** 080 * This command is used to update the Coordinator job with the new values Update the coordinator job bean and update 081 * that to database. 082 * 083 * @param id Coordinator job id. 084 * @param changeValue This the changed value in the form key=value. 085 * @throws CommandException thrown if changeValue cannot be parsed properly. 086 */ 087 public CoordChangeXCommand(String id, String changeValue) throws CommandException { 088 super("coord_change", "coord_change", 0); 089 this.jobId = ParamChecker.notEmpty(id, "id"); 090 ParamChecker.notEmpty(changeValue, "value"); 091 092 validateChangeValue(changeValue); 093 } 094 095 /** 096 * @param changeValue change value. 097 * @throws CommandException thrown if changeValue cannot be parsed properly. 098 */ 099 private void validateChangeValue(String changeValue) throws CommandException { 100 Map<String, String> map = JobUtils.parseChangeValue(changeValue); 101 102 if (map.size() > ALLOWED_CHANGE_OPTIONS.size()) { 103 throw new CommandException(ErrorCode.E1015, changeValue, "must change endtime|concurrency|pausetime"); 104 } 105 106 java.util.Iterator<Entry<String, String>> iter = map.entrySet().iterator(); 107 while (iter.hasNext()) { 108 Entry<String, String> entry = iter.next(); 109 String key = entry.getKey(); 110 String value = entry.getValue(); 111 112 if (!ALLOWED_CHANGE_OPTIONS.contains(key)) { 113 throw new CommandException(ErrorCode.E1015, changeValue, "must change endtime|concurrency|pausetime"); 114 } 115 116 if (!key.equals(OozieClient.CHANGE_VALUE_PAUSETIME) && value.equalsIgnoreCase("")) { 117 throw new CommandException(ErrorCode.E1015, changeValue, "value on " + key + " can not be empty"); 118 } 119 } 120 121 if (map.containsKey(OozieClient.CHANGE_VALUE_ENDTIME)) { 122 String value = map.get(OozieClient.CHANGE_VALUE_ENDTIME); 123 try { 124 newEndTime = DateUtils.parseDateOozieTZ(value); 125 } 126 catch (Exception ex) { 127 throw new CommandException(ErrorCode.E1015, value, "must be a valid date"); 128 } 129 } 130 131 if (map.containsKey(OozieClient.CHANGE_VALUE_CONCURRENCY)) { 132 String value = map.get(OozieClient.CHANGE_VALUE_CONCURRENCY); 133 try { 134 newConcurrency = Integer.parseInt(value); 135 } 136 catch (NumberFormatException ex) { 137 throw new CommandException(ErrorCode.E1015, value, "must be a valid integer"); 138 } 139 } 140 141 if (map.containsKey(OozieClient.CHANGE_VALUE_PAUSETIME)) { 142 String value = map.get(OozieClient.CHANGE_VALUE_PAUSETIME); 143 if (value.equals("")) { // this is to reset pause time to null; 144 resetPauseTime = true; 145 } 146 else { 147 try { 148 newPauseTime = DateUtils.parseDateOozieTZ(value); 149 } 150 catch (Exception ex) { 151 throw new CommandException(ErrorCode.E1015, value, "must be a valid date"); 152 } 153 } 154 } 155 } 156 157 /** 158 * Returns the actual last action time(one instance before coordJob.lastActionTime) 159 * @return Date - last action time if coordJob.getLastActionTime() is not null, null otherwise 160 */ 161 private Date getLastActionTime() { 162 if(coordJob.getLastActionTime() == null) 163 return null; 164 165 Calendar d = Calendar.getInstance(DateUtils.getTimeZone(coordJob.getTimeZone())); 166 d.setTime(coordJob.getLastActionTime()); 167 TimeUnit timeUnit = TimeUnit.valueOf(coordJob.getTimeUnitStr()); 168 d.add(timeUnit.getCalendarUnit(), -Integer.valueOf(coordJob.getFrequency())); 169 return d.getTime(); 170 } 171 172 /** 173 * Check if new end time is valid. 174 * 175 * @param coordJob coordinator job id. 176 * @param newEndTime new end time. 177 * @throws CommandException thrown if new end time is not valid. 178 */ 179 private void checkEndTime(CoordinatorJobBean coordJob, Date newEndTime) throws CommandException { 180 // New endTime cannot be before coordinator job's start time. 181 Date startTime = coordJob.getStartTime(); 182 if (newEndTime.before(startTime)) { 183 throw new CommandException(ErrorCode.E1015, newEndTime, "cannot be before coordinator job's start time [" 184 + startTime + "]"); 185 } 186 187 // New endTime cannot be before coordinator job's last action time. 188 Date lastActionTime = getLastActionTime(); 189 if (lastActionTime != null) { 190 if (!newEndTime.after(lastActionTime)) { 191 throw new CommandException(ErrorCode.E1015, newEndTime, 192 "must be after coordinator job's last action time [" + lastActionTime + "]"); 193 } 194 } 195 } 196 197 /** 198 * Check if new pause time is valid. 199 * 200 * @param coordJob coordinator job id. 201 * @param newPauseTime new pause time. 202 * @param newEndTime new end time, can be null meaning no change on end time. 203 * @throws CommandException thrown if new pause time is not valid. 204 */ 205 private void checkPauseTime(CoordinatorJobBean coordJob, Date newPauseTime) 206 throws CommandException { 207 // New pauseTime has to be a non-past time. 208 Date d = new Date(); 209 if (newPauseTime.before(d)) { 210 throw new CommandException(ErrorCode.E1015, newPauseTime, "must be a non-past time"); 211 } 212 } 213 214 /** 215 * Process lookahead created actions that become invalid because of the new pause time, 216 * These actions will be deleted from DB, also the coordinator job will be updated accordingly 217 * 218 * @param coordJob coordinator job 219 * @param newPauseTime new pause time 220 */ 221 private void processLookaheadActions(CoordinatorJobBean coordJob, Date newPauseTime) throws CommandException { 222 Date lastActionTime = coordJob.getLastActionTime(); 223 if (lastActionTime != null) { 224 // d is the real last action time. 225 Calendar d = Calendar.getInstance(DateUtils.getTimeZone(coordJob.getTimeZone())); 226 d.setTime(getLastActionTime()); 227 TimeUnit timeUnit = TimeUnit.valueOf(coordJob.getTimeUnitStr()); 228 229 int lastActionNumber = coordJob.getLastActionNumber(); 230 231 boolean hasChanged = false; 232 while (true) { 233 if (!newPauseTime.after(d.getTime())) { 234 deleteAction(lastActionNumber); 235 d.add(timeUnit.getCalendarUnit(), -Integer.valueOf(coordJob.getFrequency())); 236 lastActionNumber = lastActionNumber - 1; 237 238 hasChanged = true; 239 } 240 else { 241 break; 242 } 243 } 244 245 if (hasChanged == true) { 246 coordJob.setLastActionNumber(lastActionNumber); 247 d.add(timeUnit.getCalendarUnit(), Integer.valueOf(coordJob.getFrequency())); 248 Date d1 = d.getTime(); 249 coordJob.setLastActionTime(d1); 250 coordJob.setNextMaterializedTime(d1); 251 coordJob.resetDoneMaterialization(); 252 } 253 } 254 } 255 256 /** 257 * Delete coordinator action 258 * 259 * @param actionNum coordinator action number 260 */ 261 private void deleteAction(int actionNum) throws CommandException { 262 try { 263 String actionId = jpaService.execute(new CoordJobGetActionByActionNumberJPAExecutor(jobId, actionNum)); 264 CoordinatorActionBean bean = jpaService.execute(new CoordActionGetJPAExecutor(actionId)); 265 // delete SLA registration entry (if any) for action 266 if (SLAService.isEnabled()) { 267 Services.get().get(SLAService.class).removeRegistration(actionId); 268 } 269 SLARegistrationBean slaReg = jpaService.execute(new SLARegistrationGetJPAExecutor(actionId)); 270 if (slaReg != null) { 271 LOG.debug("Deleting registration bean corresponding to action " + slaReg.getId()); 272 deleteList.add(slaReg); 273 } 274 SLASummaryBean slaSummaryBean = jpaService.execute(new SLASummaryGetJPAExecutor(actionId)); 275 if (slaSummaryBean != null) { 276 LOG.debug("Deleting summary bean corresponding to action " + slaSummaryBean.getId()); 277 deleteList.add(slaSummaryBean); 278 } 279 deleteList.add(bean); 280 } 281 catch (JPAExecutorException e) { 282 throw new CommandException(e); 283 } 284 } 285 286 /** 287 * Check if new end time, new concurrency, new pause time are valid. 288 * 289 * @param coordJob coordinator job id. 290 * @param newEndTime new end time. 291 * @param newConcurrency new concurrency. 292 * @param newPauseTime new pause time. 293 * @throws CommandException thrown if new values are not valid. 294 */ 295 private void check(CoordinatorJobBean coordJob, Date newEndTime, Integer newConcurrency, Date newPauseTime) 296 throws CommandException { 297 if (coordJob.getStatus() == CoordinatorJob.Status.KILLED) { 298 throw new CommandException(ErrorCode.E1016); 299 } 300 301 if (newEndTime != null) { 302 checkEndTime(coordJob, newEndTime); 303 } 304 305 if (newPauseTime != null) { 306 checkPauseTime(coordJob, newPauseTime); 307 } 308 } 309 310 /* (non-Javadoc) 311 * @see org.apache.oozie.command.XCommand#execute() 312 */ 313 @Override 314 protected Void execute() throws CommandException { 315 LOG.info("STARTED CoordChangeXCommand for jobId=" + jobId); 316 317 try { 318 if (newEndTime != null) { 319 coordJob.setEndTime(newEndTime); 320 if (coordJob.getStatus() == CoordinatorJob.Status.SUCCEEDED){ 321 coordJob.setStatus(CoordinatorJob.Status.RUNNING); 322 } 323 if (coordJob.getStatus() == CoordinatorJob.Status.DONEWITHERROR 324 || coordJob.getStatus() == CoordinatorJob.Status.FAILED) { 325 // Check for backward compatibility for Oozie versions (3.2 and before) 326 // when RUNNINGWITHERROR, SUSPENDEDWITHERROR and 327 // PAUSEDWITHERROR is not supported 328 coordJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(CoordinatorJob.Status.RUNNINGWITHERROR)); 329 } 330 coordJob.setPending(); 331 coordJob.resetDoneMaterialization(); 332 } 333 334 if (newConcurrency != null) { 335 this.coordJob.setConcurrency(newConcurrency); 336 } 337 338 if (newPauseTime != null || resetPauseTime == true) { 339 this.coordJob.setPauseTime(newPauseTime); 340 if (oldPauseTime != null && newPauseTime != null) { 341 if (oldPauseTime.before(newPauseTime)) { 342 if (this.coordJob.getStatus() == Job.Status.PAUSED) { 343 this.coordJob.setStatus(Job.Status.RUNNING); 344 } 345 else if (this.coordJob.getStatus() == Job.Status.PAUSEDWITHERROR) { 346 this.coordJob.setStatus(Job.Status.RUNNINGWITHERROR); 347 } 348 } 349 } 350 else if (oldPauseTime != null && newPauseTime == null) { 351 if (this.coordJob.getStatus() == Job.Status.PAUSED) { 352 this.coordJob.setStatus(Job.Status.RUNNING); 353 } 354 else if (this.coordJob.getStatus() == Job.Status.PAUSEDWITHERROR) { 355 this.coordJob.setStatus(Job.Status.RUNNINGWITHERROR); 356 } 357 } 358 if (!resetPauseTime) { 359 processLookaheadActions(coordJob, newPauseTime); 360 } 361 } 362 363 if (coordJob.getNextMaterializedTime() != null && coordJob.getEndTime().compareTo(coordJob.getNextMaterializedTime()) <= 0) { 364 LOG.info("[" + coordJob.getId() + "]: all actions have been materialized, job status = " + coordJob.getStatus() 365 + ", set pending to true"); 366 // set doneMaterialization to true when materialization is done 367 coordJob.setDoneMaterialization(); 368 } 369 370 updateList.add(coordJob); 371 jpaService.execute(new BulkUpdateDeleteJPAExecutor(updateList, deleteList, false)); 372 373 return null; 374 } 375 catch (XException ex) { 376 throw new CommandException(ex); 377 } 378 finally { 379 LOG.info("ENDED CoordChangeXCommand for jobId=" + jobId); 380 // update bundle action 381 if (coordJob.getBundleId() != null) { 382 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus); 383 bundleStatusUpdate.call(); 384 } 385 } 386 } 387 388 /* (non-Javadoc) 389 * @see org.apache.oozie.command.XCommand#getEntityKey() 390 */ 391 @Override 392 public String getEntityKey() { 393 return this.jobId; 394 } 395 396 /* (non-Javadoc) 397 * @see org.apache.oozie.command.XCommand#loadState() 398 */ 399 @Override 400 protected void loadState() throws CommandException{ 401 jpaService = Services.get().get(JPAService.class); 402 403 if (jpaService == null) { 404 throw new CommandException(ErrorCode.E0610); 405 } 406 407 try { 408 this.coordJob = jpaService.execute(new CoordJobGetJPAExecutor(jobId)); 409 oldPauseTime = coordJob.getPauseTime(); 410 prevStatus = coordJob.getStatus(); 411 } 412 catch (JPAExecutorException e) { 413 throw new CommandException(e); 414 } 415 416 LogUtils.setLogInfo(this.coordJob, logInfo); 417 } 418 419 /* (non-Javadoc) 420 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 421 */ 422 @Override 423 protected void verifyPrecondition() throws CommandException,PreconditionException { 424 check(this.coordJob, newEndTime, newConcurrency, newPauseTime); 425 } 426 427 /* (non-Javadoc) 428 * @see org.apache.oozie.command.XCommand#isLockRequired() 429 */ 430 @Override 431 protected boolean isLockRequired() { 432 return true; 433 } 434 }