001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.command.coord; 019 020import java.util.ArrayList; 021import java.util.Date; 022import java.util.HashSet; 023import java.util.List; 024import java.util.Map; 025import java.util.Map.Entry; 026import java.util.Set; 027 028import org.apache.commons.lang.StringUtils; 029import org.apache.oozie.CoordinatorActionBean; 030import org.apache.oozie.CoordinatorJobBean; 031import org.apache.oozie.ErrorCode; 032import org.apache.oozie.XException; 033import org.apache.oozie.client.CoordinatorAction; 034import org.apache.oozie.client.CoordinatorJob; 035import org.apache.oozie.client.Job; 036import org.apache.oozie.client.OozieClient; 037import org.apache.oozie.client.rest.JsonBean; 038import org.apache.oozie.command.CommandException; 039import org.apache.oozie.command.PreconditionException; 040import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand; 041import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor; 042import org.apache.oozie.executor.jpa.CoordJobGetActionByActionNumberJPAExecutor; 043import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; 044import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor; 045import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor.SLARegQuery; 046import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor; 047import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery; 048import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery; 049import org.apache.oozie.executor.jpa.JPAExecutorException; 050import org.apache.oozie.executor.jpa.BatchQueryExecutor; 051import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry; 052import org.apache.oozie.service.JPAService; 053import org.apache.oozie.service.Services; 054import org.apache.oozie.sla.SLARegistrationBean; 055import org.apache.oozie.sla.SLASummaryBean; 056import org.apache.oozie.sla.service.SLAService; 057import org.apache.oozie.util.DateUtils; 058import org.apache.oozie.util.JobUtils; 059import org.apache.oozie.util.LogUtils; 060import org.apache.oozie.util.ParamChecker; 061import org.apache.oozie.util.StatusUtils; 062 063public class CoordChangeXCommand extends CoordinatorXCommand<Void> { 064 private final String jobId; 065 private Date newEndTime = null; 066 private Integer newConcurrency = null; 067 private Date newPauseTime = null; 068 private Date oldPauseTime = null; 069 private boolean resetPauseTime = false; 070 private CoordinatorJob.Status jobStatus = null; 071 private CoordinatorJobBean coordJob; 072 private JPAService jpaService = null; 073 private Job.Status prevStatus; 074 private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>(); 075 private List<JsonBean> deleteList = new ArrayList<JsonBean>(); 076 077 private static final Set<String> ALLOWED_CHANGE_OPTIONS = new HashSet<String>(); 078 static { 079 ALLOWED_CHANGE_OPTIONS.add("endtime"); 080 ALLOWED_CHANGE_OPTIONS.add("concurrency"); 081 ALLOWED_CHANGE_OPTIONS.add("pausetime"); 082 ALLOWED_CHANGE_OPTIONS.add(OozieClient.CHANGE_VALUE_STATUS); 083 084 } 085 086 /** 087 * This command is used to update the Coordinator job with the new values Update the coordinator job bean and update 088 * that to database. 089 * 090 * @param id Coordinator job id. 091 * @param changeValue This the changed value in the form key=value. 092 * @throws CommandException thrown if changeValue cannot be parsed properly. 093 */ 094 public CoordChangeXCommand(String id, String changeValue) throws CommandException { 095 super("coord_change", "coord_change", 0); 096 this.jobId = ParamChecker.notEmpty(id, "id"); 097 ParamChecker.notEmpty(changeValue, "value"); 098 099 validateChangeValue(changeValue); 100 } 101 102 /** 103 * @param changeValue change value. 104 * @throws CommandException thrown if changeValue cannot be parsed properly. 105 */ 106 private void validateChangeValue(String changeValue) throws CommandException { 107 Map<String, String> map = JobUtils.parseChangeValue(changeValue); 108 109 if (map.size() > ALLOWED_CHANGE_OPTIONS.size()) { 110 throw new CommandException(ErrorCode.E1015, changeValue, "must change endtime|concurrency|pausetime|status"); 111 } 112 113 java.util.Iterator<Entry<String, String>> iter = map.entrySet().iterator(); 114 while (iter.hasNext()) { 115 Entry<String, String> entry = iter.next(); 116 String key = entry.getKey(); 117 String value = entry.getValue(); 118 119 if (!ALLOWED_CHANGE_OPTIONS.contains(key)) { 120 throw new CommandException(ErrorCode.E1015, changeValue, "must change endtime|concurrency|pausetime|status"); 121 } 122 123 if (!key.equals(OozieClient.CHANGE_VALUE_PAUSETIME) && value.equalsIgnoreCase("")) { 124 throw new CommandException(ErrorCode.E1015, changeValue, "value on " + key + " can not be empty"); 125 } 126 } 127 128 if (map.containsKey(OozieClient.CHANGE_VALUE_ENDTIME)) { 129 String value = map.get(OozieClient.CHANGE_VALUE_ENDTIME); 130 try { 131 newEndTime = DateUtils.parseDateOozieTZ(value); 132 } 133 catch (Exception ex) { 134 throw new CommandException(ErrorCode.E1015, value, "must be a valid date"); 135 } 136 } 137 138 if (map.containsKey(OozieClient.CHANGE_VALUE_CONCURRENCY)) { 139 String value = map.get(OozieClient.CHANGE_VALUE_CONCURRENCY); 140 try { 141 newConcurrency = Integer.parseInt(value); 142 } 143 catch (NumberFormatException ex) { 144 throw new CommandException(ErrorCode.E1015, value, "must be a valid integer"); 145 } 146 } 147 148 if (map.containsKey(OozieClient.CHANGE_VALUE_PAUSETIME)) { 149 String value = map.get(OozieClient.CHANGE_VALUE_PAUSETIME); 150 if (value.equals("")) { // this is to reset pause time to null; 151 resetPauseTime = true; 152 } 153 else { 154 try { 155 newPauseTime = DateUtils.parseDateOozieTZ(value); 156 } 157 catch (Exception ex) { 158 throw new CommandException(ErrorCode.E1015, value, "must be a valid date"); 159 } 160 } 161 } 162 163 if (map.containsKey(OozieClient.CHANGE_VALUE_STATUS)) { 164 String value = map.get(OozieClient.CHANGE_VALUE_STATUS); 165 if (!StringUtils.isEmpty(value)) { 166 jobStatus = CoordinatorJob.Status.valueOf(value); 167 } 168 } 169 } 170 171 /** 172 * Check if new end time is valid. 173 * 174 * @param coordJob coordinator job id. 175 * @param newEndTime new end time. 176 * @throws CommandException thrown if new end time is not valid. 177 */ 178 private void checkEndTime(CoordinatorJobBean coordJob, Date newEndTime) throws CommandException { 179 //It's ok to set end date before start date. 180 } 181 182 /** 183 * Check if new pause time is valid. 184 * 185 * @param coordJob coordinator job id. 186 * @param newPauseTime new pause time. 187 * @param newEndTime new end time, can be null meaning no change on end time. 188 * @throws CommandException thrown if new pause time is not valid. 189 */ 190 private void checkPauseTime(CoordinatorJobBean coordJob, Date newPauseTime) 191 throws CommandException { 192 //no check. 193 } 194 195 /** 196 * Check if status change is valid. 197 * 198 * @param coordJob the coord job 199 * @param jobStatus the job status 200 * @throws CommandException the command exception 201 */ 202 private void checkStatusChange(CoordinatorJobBean coordJob, CoordinatorJob.Status jobStatus) 203 throws CommandException { 204 if (!jobStatus.equals(CoordinatorJob.Status.RUNNING) && !jobStatus.equals(CoordinatorJob.Status.IGNORED)) { 205 throw new CommandException(ErrorCode.E1015, jobStatus, " must be RUNNING or IGNORED"); 206 } 207 208 if (jobStatus.equals(CoordinatorJob.Status.RUNNING)) { 209 if (!(coordJob.getStatus().equals(CoordinatorJob.Status.FAILED) || coordJob.getStatus().equals( 210 CoordinatorJob.Status.KILLED) || coordJob.getStatus().equals(CoordinatorJob.Status.IGNORED))) { 211 throw new CommandException(ErrorCode.E1015, jobStatus, 212 " Only FAILED, KILLED, IGNORED job can be changed to RUNNING. Current job status is " 213 + coordJob.getStatus()); 214 } 215 } 216 else { 217 if (!(coordJob.getStatus().equals(CoordinatorJob.Status.FAILED) || coordJob.getStatus().equals( 218 CoordinatorJob.Status.KILLED)) 219 || coordJob.isPending()) { 220 throw new CommandException(ErrorCode.E1015, jobStatus, 221 " Only FAILED or KILLED non-pending job can be changed to IGNORED. Current job status is " 222 + coordJob.getStatus() + " and pending status is " + coordJob.isPending()); 223 } 224 } 225 } 226 227 /** 228 * Process lookahead created actions that become invalid because of the new pause time, 229 * These actions will be deleted from DB, also the coordinator job will be updated accordingly 230 * 231 * @param coordJob coordinator job 232 * @param newPauseTime new pause time 233 * @throws JPAExecutorException, CommandException 234 */ 235 private void processLookaheadActions(CoordinatorJobBean coordJob, Date newTime) throws CommandException, 236 JPAExecutorException { 237 int lastActionNumber = coordJob.getLastActionNumber(); 238 Date lastActionTime = null; 239 Date tempDate = null; 240 241 while ((tempDate = deleteAction(lastActionNumber, newTime)) != null) { 242 lastActionNumber--; 243 lastActionTime = tempDate; 244 } 245 if (lastActionTime != null) { 246 LOG.debug("New pause/end date is : " + newTime + " and last action number is : " + lastActionNumber); 247 coordJob.setLastActionNumber(lastActionNumber); 248 coordJob.setLastActionTime(lastActionTime); 249 coordJob.setNextMaterializedTime(lastActionTime); 250 coordJob.resetDoneMaterialization(); 251 } 252 } 253 254 /** 255 * Delete coordinator action 256 * 257 * @param actionNum coordinator action number 258 */ 259 private Date deleteAction(int actionNum, Date afterDate) throws CommandException { 260 try { 261 if (actionNum <= 0) { 262 return null; 263 } 264 265 String actionId = jpaService.execute(new CoordJobGetActionByActionNumberJPAExecutor(jobId, actionNum)); 266 CoordinatorActionBean bean = jpaService.execute(new CoordActionGetJPAExecutor(actionId)); 267 if (afterDate.compareTo(bean.getNominalTime()) <= 0) { 268 // delete SLA registration entry (if any) for action 269 if (SLAService.isEnabled()) { 270 Services.get().get(SLAService.class).removeRegistration(actionId); 271 } 272 SLARegistrationBean slaReg = SLARegistrationQueryExecutor.getInstance().get(SLARegQuery.GET_SLA_REG_ALL, actionId); 273 if (slaReg != null) { 274 LOG.debug("Deleting registration bean corresponding to action " + slaReg.getId()); 275 deleteList.add(slaReg); 276 } 277 SLASummaryBean slaSummaryBean = SLASummaryQueryExecutor.getInstance().get( 278 SLASummaryQuery.GET_SLA_SUMMARY, actionId); 279 if (slaSummaryBean != null) { 280 LOG.debug("Deleting summary bean corresponding to action " + slaSummaryBean.getId()); 281 deleteList.add(slaSummaryBean); 282 } 283 if (bean.getStatus() == CoordinatorAction.Status.WAITING 284 || bean.getStatus() == CoordinatorAction.Status.READY) { 285 deleteList.add(bean); 286 } 287 else { 288 throw new CommandException(ErrorCode.E1022, bean.getId()); 289 } 290 return bean.getNominalTime(); 291 } 292 else { 293 return null; 294 } 295 296 } 297 catch (JPAExecutorException e) { 298 throw new CommandException(e); 299 } 300 } 301 302 /** 303 * Check if new end time, new concurrency, new pause time are valid. 304 * 305 * @param coordJob coordinator job id. 306 * @param newEndTime new end time. 307 * @param newConcurrency new concurrency. 308 * @param newPauseTime new pause time. 309 * @throws CommandException thrown if new values are not valid. 310 */ 311 private void check(CoordinatorJobBean coordJob, Date newEndTime, Integer newConcurrency, Date newPauseTime, 312 CoordinatorJob.Status jobStatus) throws CommandException { 313 314 if (coordJob.getStatus() == CoordinatorJob.Status.KILLED 315 || coordJob.getStatus() == CoordinatorJob.Status.IGNORED) { 316 if (jobStatus == null || (newEndTime != null || newConcurrency != null || newPauseTime != null)) { 317 throw new CommandException(ErrorCode.E1016); 318 } 319 } 320 321 if (newEndTime != null) { 322 checkEndTime(coordJob, newEndTime); 323 } 324 325 if (newPauseTime != null) { 326 checkPauseTime(coordJob, newPauseTime); 327 } 328 if (jobStatus != null) { 329 checkStatusChange(coordJob, jobStatus); 330 } 331 } 332 333 /* (non-Javadoc) 334 * @see org.apache.oozie.command.XCommand#execute() 335 */ 336 @Override 337 protected Void execute() throws CommandException { 338 LOG.info("STARTED CoordChangeXCommand for jobId=" + jobId); 339 340 try { 341 if (newEndTime != null) { 342 // during coord materialization, nextMaterializedTime is set to 343 // startTime + n(actions materialized) * frequency and this can be AFTER endTime, 344 // while doneMaterialization is true. Hence the following checks 345 // for newEndTime being in the middle of endTime and nextMatdTime. 346 // Since job is already done materialization so no need to change 347 boolean dontChange = coordJob.getEndTime().before(newEndTime) 348 && coordJob.getNextMaterializedTime() != null 349 && coordJob.getNextMaterializedTime().after(newEndTime); 350 if (!dontChange) { 351 coordJob.setEndTime(newEndTime); 352 // OOZIE-1703, we should SUCCEEDED the coord, if it's in PREP and new endtime is before start time 353 if (coordJob.getStartTime().compareTo(newEndTime) >= 0) { 354 if (coordJob.getStatus() != CoordinatorJob.Status.PREP) { 355 processLookaheadActions(coordJob, newEndTime); 356 } 357 if (coordJob.getStatus() == CoordinatorJob.Status.PREP 358 || coordJob.getStatus() == CoordinatorJob.Status.RUNNING) { 359 LOG.info("Changing coord status to SUCCEEDED, because it's in " + coordJob.getStatus() 360 + " and new end time is before start time. Startime is " + coordJob.getStartTime() 361 + " and new end time is " + newEndTime); 362 363 coordJob.setStatus(CoordinatorJob.Status.SUCCEEDED); 364 coordJob.resetPending(); 365 } 366 coordJob.setDoneMaterialization(); 367 } 368 else { 369 // move it to running iff new end time is after starttime. 370 if (coordJob.getStatus() == CoordinatorJob.Status.SUCCEEDED) { 371 coordJob.setStatus(CoordinatorJob.Status.RUNNING); 372 } 373 if (coordJob.getStatus() == CoordinatorJob.Status.DONEWITHERROR 374 || coordJob.getStatus() == CoordinatorJob.Status.FAILED) { 375 // Check for backward compatibility for Oozie versions (3.2 and before) 376 // when RUNNINGWITHERROR, SUSPENDEDWITHERROR and 377 // PAUSEDWITHERROR is not supported 378 coordJob.setStatus(StatusUtils 379 .getStatusIfBackwardSupportTrue(CoordinatorJob.Status.RUNNINGWITHERROR)); 380 } 381 coordJob.setPending(); 382 coordJob.resetDoneMaterialization(); 383 processLookaheadActions(coordJob, newEndTime); 384 } 385 } 386 387 else { 388 LOG.info("Didn't change endtime. Endtime is in between coord end time and next materialization time." 389 + "Coord endTime = " + DateUtils.formatDateOozieTZ(newEndTime) 390 + " next materialization time =" 391 + DateUtils.formatDateOozieTZ(coordJob.getNextMaterializedTime())); 392 } 393 } 394 395 if (newConcurrency != null) { 396 this.coordJob.setConcurrency(newConcurrency); 397 } 398 399 if (newPauseTime != null || resetPauseTime == true) { 400 this.coordJob.setPauseTime(newPauseTime); 401 if (oldPauseTime != null && newPauseTime != null) { 402 if (oldPauseTime.before(newPauseTime)) { 403 if (this.coordJob.getStatus() == Job.Status.PAUSED) { 404 this.coordJob.setStatus(Job.Status.RUNNING); 405 } 406 else if (this.coordJob.getStatus() == Job.Status.PAUSEDWITHERROR) { 407 this.coordJob.setStatus(Job.Status.RUNNINGWITHERROR); 408 } 409 } 410 } 411 else if (oldPauseTime != null && newPauseTime == null) { 412 if (this.coordJob.getStatus() == Job.Status.PAUSED) { 413 this.coordJob.setStatus(Job.Status.RUNNING); 414 } 415 else if (this.coordJob.getStatus() == Job.Status.PAUSEDWITHERROR) { 416 this.coordJob.setStatus(Job.Status.RUNNINGWITHERROR); 417 } 418 } 419 if (!resetPauseTime) { 420 processLookaheadActions(coordJob, newPauseTime); 421 } 422 } 423 if (jobStatus != null) { 424 coordJob.setStatus(jobStatus); 425 LOG.info("Coord status is changed to " + jobStatus + " from " + prevStatus); 426 if (jobStatus.equals(CoordinatorJob.Status.RUNNING)) { 427 coordJob.setPending(); 428 if (coordJob.getNextMaterializedTime() != null 429 && coordJob.getEndTime().after(coordJob.getNextMaterializedTime())) { 430 coordJob.resetDoneMaterialization(); 431 } 432 } else if (jobStatus.equals(CoordinatorJob.Status.IGNORED)) { 433 coordJob.resetPending(); 434 coordJob.setDoneMaterialization(); 435 } 436 } 437 438 if (coordJob.getNextMaterializedTime() != null && coordJob.getEndTime().compareTo(coordJob.getNextMaterializedTime()) <= 0) { 439 LOG.info("[" + coordJob.getId() + "]: all actions have been materialized, job status = " + coordJob.getStatus() 440 + ", set pending to true"); 441 // set doneMaterialization to true when materialization is done 442 coordJob.setDoneMaterialization(); 443 } 444 445 coordJob.setLastModifiedTime(new Date()); 446 updateList.add(new UpdateEntry<CoordJobQuery>(CoordJobQuery.UPDATE_COORD_JOB_CHANGE, coordJob)); 447 BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, deleteList); 448 449 return null; 450 } 451 catch (XException ex) { 452 throw new CommandException(ex); 453 } 454 finally { 455 LOG.info("ENDED CoordChangeXCommand for jobId=" + jobId); 456 // update bundle action 457 if (coordJob.getBundleId() != null) { 458 //ignore pending as it'sync command 459 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus, true); 460 bundleStatusUpdate.call(); 461 } 462 } 463 } 464 465 /* (non-Javadoc) 466 * @see org.apache.oozie.command.XCommand#getEntityKey() 467 */ 468 @Override 469 public String getEntityKey() { 470 return this.jobId; 471 } 472 473 /* (non-Javadoc) 474 * @see org.apache.oozie.command.XCommand#loadState() 475 */ 476 @Override 477 protected void loadState() throws CommandException{ 478 jpaService = Services.get().get(JPAService.class); 479 480 if (jpaService == null) { 481 throw new CommandException(ErrorCode.E0610); 482 } 483 484 try { 485 this.coordJob = jpaService.execute(new CoordJobGetJPAExecutor(jobId)); 486 oldPauseTime = coordJob.getPauseTime(); 487 prevStatus = coordJob.getStatus(); 488 } 489 catch (JPAExecutorException e) { 490 throw new CommandException(e); 491 } 492 493 LogUtils.setLogInfo(this.coordJob, logInfo); 494 } 495 496 /* (non-Javadoc) 497 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 498 */ 499 @Override 500 protected void verifyPrecondition() throws CommandException,PreconditionException { 501 check(this.coordJob, newEndTime, newConcurrency, newPauseTime, jobStatus); 502 } 503 504 /* (non-Javadoc) 505 * @see org.apache.oozie.command.XCommand#isLockRequired() 506 */ 507 @Override 508 protected boolean isLockRequired() { 509 return true; 510 } 511}