001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.command.coord; 020 021import java.util.ArrayList; 022import java.util.Date; 023import java.util.HashSet; 024import java.util.List; 025import java.util.Map; 026import java.util.Map.Entry; 027import java.util.Set; 028 029import org.apache.commons.lang.StringUtils; 030import org.apache.oozie.CoordinatorActionBean; 031import org.apache.oozie.CoordinatorJobBean; 032import org.apache.oozie.ErrorCode; 033import org.apache.oozie.XException; 034import org.apache.oozie.client.CoordinatorAction; 035import org.apache.oozie.client.CoordinatorJob; 036import org.apache.oozie.client.Job; 037import org.apache.oozie.client.OozieClient; 038import org.apache.oozie.client.rest.JsonBean; 039import org.apache.oozie.command.CommandException; 040import org.apache.oozie.command.PreconditionException; 041import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand; 042import org.apache.oozie.executor.jpa.BatchQueryExecutor; 043import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry; 044import org.apache.oozie.executor.jpa.CoordActionQueryExecutor; 045import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; 046import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery; 047import org.apache.oozie.executor.jpa.JPAExecutorException; 048import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor; 049import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor.SLARegQuery; 050import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor; 051import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery; 052import org.apache.oozie.service.JPAService; 053import org.apache.oozie.service.Services; 054import org.apache.oozie.sla.SLARegistrationBean; 055import org.apache.oozie.sla.SLASummaryBean; 056import org.apache.oozie.sla.service.SLAService; 057import org.apache.oozie.util.DateUtils; 058import org.apache.oozie.util.JobUtils; 059import org.apache.oozie.util.LogUtils; 060import org.apache.oozie.util.ParamChecker; 061import org.apache.oozie.util.StatusUtils; 062 063public class CoordChangeXCommand extends CoordinatorXCommand<Void> { 064 private final String jobId; 065 private Date newEndTime = null; 066 private Integer oldConcurrency = null; 067 private Integer newConcurrency = null; 068 private Date newPauseTime = null; 069 private Date oldPauseTime = null; 070 private boolean resetPauseTime = false; 071 private CoordinatorJob.Status jobStatus = null; 072 private CoordinatorJobBean coordJob; 073 private JPAService jpaService = null; 074 private Job.Status prevStatus; 075 private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>(); 076 private List<JsonBean> deleteList = new ArrayList<JsonBean>(); 077 078 private static final Set<String> ALLOWED_CHANGE_OPTIONS = new HashSet<String>(); 079 static { 080 ALLOWED_CHANGE_OPTIONS.add("endtime"); 081 ALLOWED_CHANGE_OPTIONS.add("concurrency"); 082 ALLOWED_CHANGE_OPTIONS.add("pausetime"); 083 ALLOWED_CHANGE_OPTIONS.add(OozieClient.CHANGE_VALUE_STATUS); 084 085 } 086 087 /** 088 * This command is used to update the Coordinator job with the new values Update the coordinator job bean and update 089 * that to database. 090 * 091 * @param id Coordinator job id. 092 * @param changeValue This the changed value in the form key=value. 093 * @throws CommandException thrown if changeValue cannot be parsed properly. 094 */ 095 public CoordChangeXCommand(String id, String changeValue) throws CommandException { 096 super("coord_change", "coord_change", 0); 097 this.jobId = ParamChecker.notEmpty(id, "id"); 098 ParamChecker.notEmpty(changeValue, "value"); 099 100 validateChangeValue(changeValue); 101 } 102 103 @Override 104 protected void setLogInfo() { 105 LogUtils.setLogInfo(jobId); 106 } 107 108 /** 109 * @param changeValue change value. 110 * @throws CommandException thrown if changeValue cannot be parsed properly. 111 */ 112 private void validateChangeValue(String changeValue) throws CommandException { 113 Map<String, String> map = JobUtils.parseChangeValue(changeValue); 114 115 if (map.size() > ALLOWED_CHANGE_OPTIONS.size()) { 116 throw new CommandException(ErrorCode.E1015, changeValue, "must change endtime|concurrency|pausetime|status"); 117 } 118 119 java.util.Iterator<Entry<String, String>> iter = map.entrySet().iterator(); 120 while (iter.hasNext()) { 121 Entry<String, String> entry = iter.next(); 122 String key = entry.getKey(); 123 String value = entry.getValue(); 124 125 if (!ALLOWED_CHANGE_OPTIONS.contains(key)) { 126 throw new CommandException(ErrorCode.E1015, changeValue, "must change endtime|concurrency|pausetime|status"); 127 } 128 129 if (!key.equals(OozieClient.CHANGE_VALUE_PAUSETIME) && value.equalsIgnoreCase("")) { 130 throw new CommandException(ErrorCode.E1015, changeValue, "value on " + key + " can not be empty"); 131 } 132 } 133 134 if (map.containsKey(OozieClient.CHANGE_VALUE_ENDTIME)) { 135 String value = map.get(OozieClient.CHANGE_VALUE_ENDTIME); 136 try { 137 newEndTime = DateUtils.parseDateOozieTZ(value); 138 } 139 catch (Exception ex) { 140 throw new CommandException(ErrorCode.E1015, value, "must be a valid date"); 141 } 142 } 143 144 if (map.containsKey(OozieClient.CHANGE_VALUE_CONCURRENCY)) { 145 String value = map.get(OozieClient.CHANGE_VALUE_CONCURRENCY); 146 try { 147 newConcurrency = Integer.parseInt(value); 148 } 149 catch (NumberFormatException ex) { 150 throw new CommandException(ErrorCode.E1015, value, "must be a valid integer"); 151 } 152 } 153 154 if (map.containsKey(OozieClient.CHANGE_VALUE_PAUSETIME)) { 155 String value = map.get(OozieClient.CHANGE_VALUE_PAUSETIME); 156 if (value.equals("")) { // this is to reset pause time to null; 157 resetPauseTime = true; 158 } 159 else { 160 try { 161 newPauseTime = DateUtils.parseDateOozieTZ(value); 162 } 163 catch (Exception ex) { 164 throw new CommandException(ErrorCode.E1015, value, "must be a valid date"); 165 } 166 } 167 } 168 169 if (map.containsKey(OozieClient.CHANGE_VALUE_STATUS)) { 170 String value = map.get(OozieClient.CHANGE_VALUE_STATUS); 171 if (!StringUtils.isEmpty(value)) { 172 jobStatus = CoordinatorJob.Status.valueOf(value); 173 } 174 } 175 } 176 177 /** 178 * Check if new end time is valid. 179 * 180 * @param coordJob coordinator job id. 181 * @param newEndTime new end time. 182 * @throws CommandException thrown if new end time is not valid. 183 */ 184 private void checkEndTime(CoordinatorJobBean coordJob, Date newEndTime) throws CommandException { 185 //It's ok to set end date before start date. 186 } 187 188 /** 189 * Check if new pause time is valid. 190 * 191 * @param coordJob coordinator job id. 192 * @param newPauseTime new pause time. 193 * @param newEndTime new end time, can be null meaning no change on end time. 194 * @throws CommandException thrown if new pause time is not valid. 195 */ 196 private void checkPauseTime(CoordinatorJobBean coordJob, Date newPauseTime) 197 throws CommandException { 198 //no check. 199 } 200 201 /** 202 * Check if status change is valid. 203 * 204 * @param coordJob the coord job 205 * @param jobStatus the job status 206 * @throws CommandException the command exception 207 */ 208 private void checkStatusChange(CoordinatorJobBean coordJob, CoordinatorJob.Status jobStatus) 209 throws CommandException { 210 if (!jobStatus.equals(CoordinatorJob.Status.RUNNING) && !jobStatus.equals(CoordinatorJob.Status.IGNORED)) { 211 throw new CommandException(ErrorCode.E1015, jobStatus, " must be RUNNING or IGNORED"); 212 } 213 214 if (jobStatus.equals(CoordinatorJob.Status.RUNNING)) { 215 if (!(coordJob.getStatus().equals(CoordinatorJob.Status.FAILED) || coordJob.getStatus().equals( 216 CoordinatorJob.Status.KILLED) || coordJob.getStatus().equals(CoordinatorJob.Status.IGNORED))) { 217 throw new CommandException(ErrorCode.E1015, jobStatus, 218 " Only FAILED, KILLED, IGNORED job can be changed to RUNNING. Current job status is " 219 + coordJob.getStatus()); 220 } 221 } 222 else { 223 if (!(coordJob.getStatus().equals(CoordinatorJob.Status.FAILED) || coordJob.getStatus().equals( 224 CoordinatorJob.Status.KILLED)) 225 || coordJob.isPending()) { 226 throw new CommandException(ErrorCode.E1015, jobStatus, 227 " Only FAILED or KILLED non-pending job can be changed to IGNORED. Current job status is " 228 + coordJob.getStatus() + " and pending status is " + coordJob.isPending()); 229 } 230 } 231 } 232 233 /** 234 * Process lookahead created actions that become invalid because of the new pause time, 235 * These actions will be deleted from DB, also the coordinator job will be updated accordingly 236 * 237 * @param coordJob coordinator job 238 * @param newPauseTime new pause time 239 * @throws JPAExecutorException, CommandException 240 */ 241 private void processLookaheadActions(CoordinatorJobBean coordJob, Date newTime) throws CommandException, 242 JPAExecutorException { 243 int lastActionNumber = coordJob.getLastActionNumber(); 244 Date lastActionTime = null; 245 Date tempDate = null; 246 247 while ((tempDate = deleteAction(lastActionNumber, newTime)) != null) { 248 lastActionNumber--; 249 lastActionTime = tempDate; 250 } 251 if (lastActionTime != null) { 252 LOG.debug("New pause/end date is : " + newTime + " and last action number is : " + lastActionNumber); 253 coordJob.setLastActionNumber(lastActionNumber); 254 coordJob.setLastActionTime(lastActionTime); 255 coordJob.setNextMaterializedTime(lastActionTime); 256 coordJob.resetDoneMaterialization(); 257 } 258 } 259 260 /** 261 * Delete coordinator action 262 * 263 * @param actionNum coordinator action number 264 */ 265 private Date deleteAction(int actionNum, Date afterDate) throws CommandException { 266 try { 267 if (actionNum <= 0) { 268 return null; 269 } 270 271 String actionId = jobId + "@" + actionNum; 272 CoordinatorActionBean bean = CoordActionQueryExecutor.getInstance().getIfExist( 273 CoordActionQueryExecutor.CoordActionQuery.GET_COORD_ACTION, actionId); 274 if (bean == null) { 275 return null; 276 } 277 if (afterDate.compareTo(bean.getNominalTime()) <= 0) { 278 if (bean.getStatus() == CoordinatorAction.Status.WAITING 279 || bean.getStatus() == CoordinatorAction.Status.READY) { 280 // delete SLA registration entry (if any) for action 281 if (SLAService.isEnabled()) { 282 Services.get().get(SLAService.class).removeRegistration(actionId); 283 } 284 SLARegistrationBean slaReg = SLARegistrationQueryExecutor.getInstance().get(SLARegQuery.GET_SLA_REG_ALL, actionId); 285 if (slaReg != null) { 286 LOG.debug("Deleting registration bean corresponding to action " + slaReg.getId()); 287 deleteList.add(slaReg); 288 } 289 SLASummaryBean slaSummaryBean = SLASummaryQueryExecutor.getInstance().get( 290 SLASummaryQuery.GET_SLA_SUMMARY, actionId); 291 if (slaSummaryBean != null) { 292 LOG.debug("Deleting summary bean corresponding to action " + slaSummaryBean.getId()); 293 deleteList.add(slaSummaryBean); 294 } 295 deleteList.add(bean); 296 } 297 else { 298 throw new CommandException(ErrorCode.E1022, bean.getId()); 299 } 300 return bean.getNominalTime(); 301 } 302 else { 303 return null; 304 } 305 306 } 307 catch (JPAExecutorException e) { 308 throw new CommandException(e); 309 } 310 } 311 312 /** 313 * Check if new end time, new concurrency, new pause time are valid. 314 * 315 * @param coordJob coordinator job id. 316 * @param newEndTime new end time. 317 * @param newConcurrency new concurrency. 318 * @param newPauseTime new pause time. 319 * @throws CommandException thrown if new values are not valid. 320 */ 321 private void check(CoordinatorJobBean coordJob, Date newEndTime, Integer newConcurrency, Date newPauseTime, 322 CoordinatorJob.Status jobStatus) throws CommandException { 323 324 if (coordJob.getStatus() == CoordinatorJob.Status.KILLED 325 || coordJob.getStatus() == CoordinatorJob.Status.IGNORED) { 326 if (jobStatus == null || (newEndTime != null || newConcurrency != null || newPauseTime != null)) { 327 throw new CommandException(ErrorCode.E1016); 328 } 329 } 330 331 if (newEndTime != null) { 332 checkEndTime(coordJob, newEndTime); 333 } 334 335 if (newPauseTime != null) { 336 checkPauseTime(coordJob, newPauseTime); 337 } 338 if (jobStatus != null) { 339 checkStatusChange(coordJob, jobStatus); 340 } 341 } 342 343 /* (non-Javadoc) 344 * @see org.apache.oozie.command.XCommand#execute() 345 */ 346 @Override 347 protected Void execute() throws CommandException { 348 LOG.info("STARTED CoordChangeXCommand for jobId=" + jobId); 349 350 try { 351 oldConcurrency = this.coordJob.getConcurrency(); 352 if (newEndTime != null) { 353 // during coord materialization, nextMaterializedTime is set to 354 // startTime + n(actions materialized) * frequency and this can be AFTER endTime, 355 // while doneMaterialization is true. Hence the following checks 356 // for newEndTime being in the middle of endTime and nextMatdTime. 357 // Since job is already done materialization so no need to change 358 boolean dontChange = coordJob.getEndTime().before(newEndTime) 359 && coordJob.getNextMaterializedTime() != null 360 && coordJob.getNextMaterializedTime().after(newEndTime); 361 if (!dontChange) { 362 coordJob.setEndTime(newEndTime); 363 // OOZIE-1703, we should SUCCEEDED the coord, if it's in PREP and new endtime is before start time 364 if (coordJob.getStartTime().compareTo(newEndTime) >= 0) { 365 if (coordJob.getStatus() != CoordinatorJob.Status.PREP) { 366 processLookaheadActions(coordJob, newEndTime); 367 } 368 if (coordJob.getStatus() == CoordinatorJob.Status.PREP 369 || coordJob.getStatus() == CoordinatorJob.Status.RUNNING) { 370 LOG.info("Changing coord status to SUCCEEDED, because it's in " + coordJob.getStatus() 371 + " and new end time is before start time. Startime is " + coordJob.getStartTime() 372 + " and new end time is " + newEndTime); 373 374 coordJob.setStatus(CoordinatorJob.Status.SUCCEEDED); 375 coordJob.resetPending(); 376 } 377 coordJob.setDoneMaterialization(); 378 } 379 else { 380 // move it to running iff new end time is after starttime. 381 if (coordJob.getStatus() == CoordinatorJob.Status.SUCCEEDED) { 382 coordJob.setStatus(CoordinatorJob.Status.RUNNING); 383 } 384 if (coordJob.getStatus() == CoordinatorJob.Status.DONEWITHERROR 385 || coordJob.getStatus() == CoordinatorJob.Status.FAILED) { 386 // Check for backward compatibility for Oozie versions (3.2 and before) 387 // when RUNNINGWITHERROR, SUSPENDEDWITHERROR and 388 // PAUSEDWITHERROR is not supported 389 coordJob.setStatus(StatusUtils 390 .getStatusIfBackwardSupportTrue(CoordinatorJob.Status.RUNNINGWITHERROR)); 391 } 392 coordJob.setPending(); 393 coordJob.resetDoneMaterialization(); 394 processLookaheadActions(coordJob, newEndTime); 395 } 396 } 397 398 else { 399 LOG.info("Didn't change endtime. Endtime is in between coord end time and next materialization time." 400 + "Coord endTime = " + DateUtils.formatDateOozieTZ(newEndTime) 401 + " next materialization time =" 402 + DateUtils.formatDateOozieTZ(coordJob.getNextMaterializedTime())); 403 } 404 } 405 406 if (newConcurrency != null) { 407 this.coordJob.setConcurrency(newConcurrency); 408 } 409 410 if (newPauseTime != null || resetPauseTime == true) { 411 this.coordJob.setPauseTime(newPauseTime); 412 if (oldPauseTime != null && newPauseTime != null) { 413 if (oldPauseTime.before(newPauseTime)) { 414 if (this.coordJob.getStatus() == Job.Status.PAUSED) { 415 this.coordJob.setStatus(Job.Status.RUNNING); 416 } 417 else if (this.coordJob.getStatus() == Job.Status.PAUSEDWITHERROR) { 418 this.coordJob.setStatus(Job.Status.RUNNINGWITHERROR); 419 } 420 } 421 } 422 else if (oldPauseTime != null && newPauseTime == null) { 423 if (this.coordJob.getStatus() == Job.Status.PAUSED) { 424 this.coordJob.setStatus(Job.Status.RUNNING); 425 } 426 else if (this.coordJob.getStatus() == Job.Status.PAUSEDWITHERROR) { 427 this.coordJob.setStatus(Job.Status.RUNNINGWITHERROR); 428 } 429 } 430 if (!resetPauseTime) { 431 processLookaheadActions(coordJob, newPauseTime); 432 } 433 } 434 if (jobStatus != null) { 435 coordJob.setStatus(jobStatus); 436 LOG.info("Coord status is changed to " + jobStatus + " from " + prevStatus); 437 if (jobStatus.equals(CoordinatorJob.Status.RUNNING)) { 438 coordJob.setPending(); 439 if (coordJob.getNextMaterializedTime() == null 440 || coordJob.getEndTime().after(coordJob.getNextMaterializedTime())) { 441 coordJob.resetDoneMaterialization(); 442 } 443 } else if (jobStatus.equals(CoordinatorJob.Status.IGNORED)) { 444 coordJob.resetPending(); 445 coordJob.setDoneMaterialization(); 446 } 447 } 448 449 if (coordJob.getNextMaterializedTime() != null && coordJob.getEndTime().compareTo(coordJob.getNextMaterializedTime()) <= 0) { 450 LOG.info("[" + coordJob.getId() + "]: all actions have been materialized, job status = " + coordJob.getStatus() 451 + ", set pending to true"); 452 // set doneMaterialization to true when materialization is done 453 coordJob.setDoneMaterialization(); 454 } 455 456 coordJob.setLastModifiedTime(new Date()); 457 updateList.add(new UpdateEntry<CoordJobQuery>(CoordJobQuery.UPDATE_COORD_JOB_CHANGE, coordJob)); 458 BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, deleteList); 459 460 if (newConcurrency != null && newConcurrency > oldConcurrency) { 461 queue(new CoordActionReadyXCommand(jobId)); 462 } 463 464 return null; 465 } 466 catch (XException ex) { 467 throw new CommandException(ex); 468 } 469 finally { 470 LOG.info("ENDED CoordChangeXCommand for jobId=" + jobId); 471 // update bundle action 472 if (coordJob.getBundleId() != null) { 473 //ignore pending as it'sync command 474 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus, true); 475 bundleStatusUpdate.call(); 476 } 477 } 478 } 479 480 /* (non-Javadoc) 481 * @see org.apache.oozie.command.XCommand#getEntityKey() 482 */ 483 @Override 484 public String getEntityKey() { 485 return this.jobId; 486 } 487 488 /* (non-Javadoc) 489 * @see org.apache.oozie.command.XCommand#loadState() 490 */ 491 @Override 492 protected void loadState() throws CommandException{ 493 jpaService = Services.get().get(JPAService.class); 494 495 if (jpaService == null) { 496 throw new CommandException(ErrorCode.E0610); 497 } 498 499 try { 500 this.coordJob = jpaService.execute(new CoordJobGetJPAExecutor(jobId)); 501 oldPauseTime = coordJob.getPauseTime(); 502 prevStatus = coordJob.getStatus(); 503 } 504 catch (JPAExecutorException e) { 505 throw new CommandException(e); 506 } 507 508 LogUtils.setLogInfo(this.coordJob); 509 } 510 511 /* (non-Javadoc) 512 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 513 */ 514 @Override 515 protected void verifyPrecondition() throws CommandException,PreconditionException { 516 check(this.coordJob, newEndTime, newConcurrency, newPauseTime, jobStatus); 517 } 518 519 /* (non-Javadoc) 520 * @see org.apache.oozie.command.XCommand#isLockRequired() 521 */ 522 @Override 523 protected boolean isLockRequired() { 524 return true; 525 } 526}