001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.command.coord; 020 021import org.apache.commons.lang.StringUtils; 022import org.apache.hadoop.conf.Configuration; 023import org.apache.oozie.AppType; 024import org.apache.oozie.CoordinatorActionBean; 025import org.apache.oozie.CoordinatorJobBean; 026import org.apache.oozie.ErrorCode; 027import org.apache.oozie.SLAEventBean; 028import org.apache.oozie.client.CoordinatorJob; 029import org.apache.oozie.client.Job; 030import org.apache.oozie.client.SLAEvent.SlaAppType; 031import org.apache.oozie.client.rest.JsonBean; 032import org.apache.oozie.command.CommandException; 033import org.apache.oozie.command.MaterializeTransitionXCommand; 034import org.apache.oozie.command.PreconditionException; 035import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand; 036import org.apache.oozie.coord.CoordUtils; 037import org.apache.oozie.coord.TimeUnit; 038import org.apache.oozie.coord.input.logic.CoordInputLogicEvaluatorUtil; 039import org.apache.oozie.executor.jpa.BatchQueryExecutor; 040import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry; 041import org.apache.oozie.executor.jpa.CoordActionsActiveCountJPAExecutor; 042import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; 043import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery; 044import org.apache.oozie.executor.jpa.JPAExecutorException; 045import org.apache.oozie.service.ConfigurationService; 046import org.apache.oozie.service.CoordMaterializeTriggerService; 047import org.apache.oozie.service.EventHandlerService; 048import org.apache.oozie.service.JPAService; 049import org.apache.oozie.service.Service; 050import org.apache.oozie.service.Services; 051import org.apache.oozie.sla.SLAOperations; 052import org.apache.oozie.util.DateUtils; 053import org.apache.oozie.util.Instrumentation; 054import org.apache.oozie.util.LogUtils; 055import org.apache.oozie.util.ParamChecker; 056import org.apache.oozie.util.StatusUtils; 057import org.apache.oozie.util.XConfiguration; 058import org.apache.oozie.util.XmlUtils; 059import org.apache.oozie.util.db.SLADbOperations; 060import org.jdom.Element; 061import org.jdom.JDOMException; 062 063import java.io.IOException; 064import java.io.StringReader; 065import java.sql.Timestamp; 066import java.util.Calendar; 067import java.util.Date; 068import java.util.TimeZone; 069 070/** 071 * Materialize actions for specified start and end time for coordinator job. 072 */ 073@SuppressWarnings("deprecation") 074public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCommand { 075 076 private JPAService jpaService = null; 077 private CoordinatorJobBean coordJob = null; 078 private String jobId = null; 079 private Date startMatdTime = null; 080 private Date endMatdTime = null; 081 private final int materializationWindow; 082 private int lastActionNumber = 1; // over-ride by DB value 083 private CoordinatorJob.Status prevStatus = null; 084 085 static final private int lookAheadWindow = ConfigurationService.getInt(CoordMaterializeTriggerService 086 .CONF_LOOKUP_INTERVAL); 087 088 /** 089 * Default MAX timeout in minutes, after which coordinator input check will timeout 090 */ 091 public static final String CONF_DEFAULT_MAX_TIMEOUT = Service.CONF_PREFIX + "coord.default.max.timeout"; 092 093 /** 094 * The constructor for class {@link CoordMaterializeTransitionXCommand} 095 * 096 * @param jobId coordinator job id 097 * @param materializationWindow materialization window to calculate end time 098 */ 099 public CoordMaterializeTransitionXCommand(String jobId, int materializationWindow) { 100 super("coord_mater", "coord_mater", 1); 101 this.jobId = ParamChecker.notEmpty(jobId, "jobId"); 102 this.materializationWindow = materializationWindow; 103 } 104 105 public CoordMaterializeTransitionXCommand(CoordinatorJobBean coordJob, int materializationWindow, Date startTime, 106 Date endTime) { 107 super("coord_mater", "coord_mater", 1); 108 this.jobId = ParamChecker.notEmpty(coordJob.getId(), "jobId"); 109 this.materializationWindow = materializationWindow; 110 this.coordJob = coordJob; 111 this.startMatdTime = startTime; 112 this.endMatdTime = endTime; 113 } 114 115 /* (non-Javadoc) 116 * @see org.apache.oozie.command.MaterializeTransitionXCommand#transitToNext() 117 */ 118 @Override 119 public void transitToNext() throws CommandException { 120 } 121 122 /* (non-Javadoc) 123 * @see org.apache.oozie.command.TransitionXCommand#updateJob() 124 */ 125 @Override 126 public void updateJob() throws CommandException { 127 updateList.add(new UpdateEntry(CoordJobQuery.UPDATE_COORD_JOB_MATERIALIZE,coordJob)); 128 } 129 130 /* (non-Javadoc) 131 * @see org.apache.oozie.command.MaterializeTransitionXCommand#performWrites() 132 */ 133 @Override 134 public void performWrites() throws CommandException { 135 try { 136 BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null); 137 // register the partition related dependencies of actions 138 for (JsonBean actionBean : insertList) { 139 if (actionBean instanceof CoordinatorActionBean) { 140 CoordinatorActionBean coordAction = (CoordinatorActionBean) actionBean; 141 if (EventHandlerService.isEnabled()) { 142 CoordinatorXCommand.generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName(), null); 143 } 144 145 // TODO: time 100s should be configurable 146 queue(new CoordActionNotificationXCommand(coordAction), 100); 147 148 //Delay for input check = (nominal time - now) 149 long checkDelay = coordAction.getNominalTime().getTime() - new Date().getTime(); 150 queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()), 151 Math.max(checkDelay, 0)); 152 153 if (!StringUtils.isEmpty(coordAction.getPushMissingDependencies())) { 154 // TODO: Delay in catchup mode? 155 queue(new CoordPushDependencyCheckXCommand(coordAction.getId(), true), 100); 156 } 157 } 158 } 159 } 160 catch (JPAExecutorException jex) { 161 throw new CommandException(jex); 162 } 163 } 164 165 /* (non-Javadoc) 166 * @see org.apache.oozie.command.XCommand#getEntityKey() 167 */ 168 @Override 169 public String getEntityKey() { 170 return this.jobId; 171 } 172 173 @Override 174 protected boolean isLockRequired() { 175 return true; 176 } 177 178 /* (non-Javadoc) 179 * @see org.apache.oozie.command.XCommand#loadState() 180 */ 181 @Override 182 protected void loadState() throws CommandException { 183 jpaService = Services.get().get(JPAService.class); 184 if (jpaService == null) { 185 LOG.error(ErrorCode.E0610); 186 } 187 188 try { 189 coordJob = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB_MATERIALIZE, jobId); 190 prevStatus = coordJob.getStatus(); 191 } 192 catch (JPAExecutorException jex) { 193 throw new CommandException(jex); 194 } 195 196 // calculate start materialize and end materialize time 197 calcMatdTime(); 198 199 LogUtils.setLogInfo(coordJob); 200 } 201 202 /** 203 * Calculate startMatdTime and endMatdTime from job's start time if next materialized time is null 204 * 205 * @throws CommandException thrown if failed to calculate startMatdTime and endMatdTime 206 */ 207 protected void calcMatdTime() throws CommandException { 208 Timestamp startTime = coordJob.getNextMaterializedTimestamp(); 209 if (startTime == null) { 210 startTime = coordJob.getStartTimestamp(); 211 } 212 // calculate end time by adding materializationWindow to start time. 213 // need to convert materializationWindow from secs to milliseconds 214 long startTimeMilli = startTime.getTime(); 215 long endTimeMilli = startTimeMilli + (materializationWindow * 1000); 216 217 startMatdTime = DateUtils.toDate(new Timestamp(startTimeMilli)); 218 endMatdTime = DateUtils.toDate(new Timestamp(endTimeMilli)); 219 endMatdTime = getMaterializationTimeForCatchUp(endMatdTime); 220 // if MaterializationWindow end time is greater than endTime 221 // for job, then set it to endTime of job 222 Date jobEndTime = coordJob.getEndTime(); 223 if (endMatdTime.compareTo(jobEndTime) > 0) { 224 endMatdTime = jobEndTime; 225 } 226 227 LOG.debug("Materializing coord job id=" + jobId + ", start=" + DateUtils.formatDateOozieTZ(startMatdTime) + ", end=" + DateUtils.formatDateOozieTZ(endMatdTime) 228 + ", window=" + materializationWindow); 229 } 230 231 /** 232 * Get materialization for window for catch-up jobs. for current jobs,it reruns currentMatdate, For catch-up, end 233 * Mataterilized Time = startMatdTime + MatThrottling * frequency; unless LAST_ONLY execution order is set, in which 234 * case it returns now (to materialize all actions in the past) 235 * 236 * @param currentMatTime 237 * @return 238 * @throws CommandException 239 * @throws JDOMException 240 */ 241 private Date getMaterializationTimeForCatchUp(Date currentMatTime) throws CommandException { 242 if (currentMatTime.after(new Date())) { 243 return currentMatTime; 244 } 245 if (coordJob.getExecutionOrder().equals(CoordinatorJob.Execution.LAST_ONLY) || 246 coordJob.getExecutionOrder().equals(CoordinatorJob.Execution.NONE)) { 247 return new Date(); 248 } 249 int frequency = 0; 250 try { 251 frequency = Integer.parseInt(coordJob.getFrequency()); 252 } 253 catch (NumberFormatException e) { 254 return currentMatTime; 255 } 256 257 TimeZone appTz = DateUtils.getTimeZone(coordJob.getTimeZone()); 258 TimeUnit freqTU = TimeUnit.valueOf(coordJob.getTimeUnitStr()); 259 Calendar startInstance = Calendar.getInstance(appTz); 260 startInstance.setTime(startMatdTime); 261 Calendar endMatInstance = null; 262 Calendar previousInstance = startInstance; 263 for (int i = 1; i <= coordJob.getMatThrottling(); i++) { 264 endMatInstance = (Calendar) startInstance.clone(); 265 endMatInstance.add(freqTU.getCalendarUnit(), i * frequency); 266 if (endMatInstance.getTime().compareTo(new Date()) >= 0) { 267 if (previousInstance.getTime().after(currentMatTime)) { 268 return previousInstance.getTime(); 269 } 270 else { 271 return currentMatTime; 272 } 273 } 274 previousInstance = endMatInstance; 275 } 276 if (endMatInstance == null) { 277 return currentMatTime; 278 } 279 else { 280 return endMatInstance.getTime(); 281 } 282 } 283 284 /* (non-Javadoc) 285 * @see org.apache.oozie.command.XCommand#verifyPrecondition() 286 */ 287 @Override 288 protected void verifyPrecondition() throws CommandException, PreconditionException { 289 if (!(coordJob.getStatus() == CoordinatorJobBean.Status.PREP || coordJob.getStatus() == CoordinatorJobBean.Status.RUNNING 290 || coordJob.getStatus() == CoordinatorJobBean.Status.RUNNINGWITHERROR)) { 291 throw new PreconditionException(ErrorCode.E1100, "CoordMaterializeTransitionXCommand for jobId=" + jobId 292 + " job is not in PREP or RUNNING but in " + coordJob.getStatus()); 293 } 294 295 if (coordJob.isDoneMaterialization()) { 296 throw new PreconditionException(ErrorCode.E1100, "CoordMaterializeTransitionXCommand for jobId =" + jobId 297 + " job is already materialized"); 298 } 299 300 if (coordJob.getNextMaterializedTimestamp() != null 301 && coordJob.getNextMaterializedTimestamp().compareTo(coordJob.getEndTimestamp()) >= 0) { 302 throw new PreconditionException(ErrorCode.E1100, "CoordMaterializeTransitionXCommand for jobId=" + jobId 303 + " job is already materialized"); 304 } 305 306 Timestamp startTime = coordJob.getNextMaterializedTimestamp(); 307 if (startTime == null) { 308 startTime = coordJob.getStartTimestamp(); 309 310 if (startTime.after(new Timestamp(System.currentTimeMillis() + lookAheadWindow * 1000))) { 311 throw new PreconditionException(ErrorCode.E1100, "CoordMaterializeTransitionXCommand for jobId=" 312 + jobId + " job's start time is not reached yet - nothing to materialize"); 313 } 314 } 315 316 if (coordJob.getNextMaterializedTimestamp() != null 317 && coordJob.getNextMaterializedTimestamp().after( 318 new Timestamp(System.currentTimeMillis() + lookAheadWindow * 1000))) { 319 throw new PreconditionException(ErrorCode.E1100, "CoordMaterializeTransitionXCommand for jobId=" + jobId 320 + " Request is for future time. Lookup time is " 321 + new Timestamp(System.currentTimeMillis() + lookAheadWindow * 1000) + " mat time is " 322 + coordJob.getNextMaterializedTimestamp()); 323 } 324 325 if (coordJob.getLastActionTime() != null && coordJob.getLastActionTime().compareTo(coordJob.getEndTime()) >= 0) { 326 throw new PreconditionException(ErrorCode.E1100, "ENDED Coordinator materialization for jobId = " + jobId 327 + ", all actions have been materialized from start time = " + coordJob.getStartTime() 328 + " to end time = " + coordJob.getEndTime() + ", job status = " + coordJob.getStatusStr()); 329 } 330 331 if (coordJob.getLastActionTime() != null && coordJob.getLastActionTime().compareTo(endMatdTime) >= 0) { 332 throw new PreconditionException(ErrorCode.E1100, "ENDED Coordinator materialization for jobId = " + jobId 333 + ", action is *already* materialized for Materialization start time = " + startMatdTime 334 + ", materialization end time = " + endMatdTime + ", job status = " + coordJob.getStatusStr()); 335 } 336 337 if (endMatdTime.after(coordJob.getEndTime())) { 338 throw new PreconditionException(ErrorCode.E1100, "ENDED Coordinator materialization for jobId = " + jobId 339 + " materialization end time = " + endMatdTime + " surpasses coordinator job's end time = " 340 + coordJob.getEndTime() + " job status = " + coordJob.getStatusStr()); 341 } 342 343 if (coordJob.getPauseTime() != null && !startMatdTime.before(coordJob.getPauseTime())) { 344 throw new PreconditionException(ErrorCode.E1100, "ENDED Coordinator materialization for jobId = " + jobId 345 + ", materialization start time = " + startMatdTime 346 + " is after or equal to coordinator job's pause time = " + coordJob.getPauseTime() 347 + ", job status = " + coordJob.getStatusStr()); 348 } 349 350 } 351 352 /* (non-Javadoc) 353 * @see org.apache.oozie.command.MaterializeTransitionXCommand#materialize() 354 */ 355 @Override 356 protected void materialize() throws CommandException { 357 Instrumentation.Cron cron = new Instrumentation.Cron(); 358 cron.start(); 359 try { 360 materializeActions(false); 361 updateJobMaterializeInfo(coordJob); 362 } 363 catch (CommandException ex) { 364 LOG.warn("Exception occurred:" + ex.getMessage() + " Making the job failed ", ex); 365 coordJob.setStatus(Job.Status.FAILED); 366 coordJob.resetPending(); 367 // remove any materialized actions and slaEvents 368 insertList.clear(); 369 } 370 catch (Exception e) { 371 LOG.error("Exception occurred:" + e.getMessage() + " Making the job failed ", e); 372 coordJob.setStatus(Job.Status.FAILED); 373 try { 374 CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_MATERIALIZE, coordJob); 375 } 376 catch (JPAExecutorException jex) { 377 throw new CommandException(ErrorCode.E1011, jex); 378 } 379 throw new CommandException(ErrorCode.E1012, e.getMessage(), e); 380 } finally { 381 cron.stop(); 382 instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".materialize", cron); 383 } 384 } 385 386 /** 387 * Create action instances starting from "startMatdTime" to "endMatdTime" and store them into coord action table. 388 * 389 * @param dryrun if this is a dry run 390 * @throws Exception thrown if failed to materialize actions 391 */ 392 protected String materializeActions(boolean dryrun) throws Exception { 393 394 Configuration jobConf = null; 395 try { 396 jobConf = new XConfiguration(new StringReader(coordJob.getConf())); 397 } 398 catch (IOException ioe) { 399 LOG.warn("Configuration parse error. read from DB :" + coordJob.getConf(), ioe); 400 throw new CommandException(ErrorCode.E1005, ioe.getMessage(), ioe); 401 } 402 403 String jobXml = coordJob.getJobXml(); 404 Element eJob = XmlUtils.parseXml(jobXml); 405 TimeZone appTz = DateUtils.getTimeZone(coordJob.getTimeZone()); 406 407 String frequency = coordJob.getFrequency(); 408 TimeUnit freqTU = TimeUnit.valueOf(coordJob.getTimeUnitStr()); 409 TimeUnit endOfFlag = TimeUnit.valueOf(eJob.getAttributeValue("end_of_duration")); 410 Calendar start = Calendar.getInstance(appTz); 411 start.setTime(startMatdTime); 412 DateUtils.moveToEnd(start, endOfFlag); 413 Calendar end = Calendar.getInstance(appTz); 414 end.setTime(endMatdTime); 415 lastActionNumber = coordJob.getLastActionNumber(); 416 //Intentionally printing dates in their own timezone, not Oozie timezone 417 LOG.info("materialize actions for tz=" + appTz.getDisplayName() + ",\n start=" + start.getTime() + ", end=" 418 + end.getTime() + ",\n timeUnit " + freqTU.getCalendarUnit() + ",\n frequency :" + frequency + ":" 419 + freqTU + ",\n lastActionNumber " + lastActionNumber); 420 // Keep the actual start time 421 Calendar origStart = Calendar.getInstance(appTz); 422 origStart.setTime(coordJob.getStartTimestamp()); 423 // Move to the End of duration, if needed. 424 DateUtils.moveToEnd(origStart, endOfFlag); 425 426 StringBuilder actionStrings = new StringBuilder(); 427 Date jobPauseTime = coordJob.getPauseTime(); 428 Calendar pause = null; 429 if (jobPauseTime != null) { 430 pause = Calendar.getInstance(appTz); 431 pause.setTime(DateUtils.convertDateToTimestamp(jobPauseTime)); 432 } 433 434 String action = null; 435 int numWaitingActions = dryrun ? 0 : jpaService.execute(new CoordActionsActiveCountJPAExecutor(coordJob.getId())); 436 int maxActionToBeCreated = coordJob.getMatThrottling() - numWaitingActions; 437 // If LAST_ONLY and all materialization is in the past, ignore maxActionsToBeCreated 438 boolean ignoreMaxActions = 439 (coordJob.getExecutionOrder().equals(CoordinatorJob.Execution.LAST_ONLY) || 440 coordJob.getExecutionOrder().equals(CoordinatorJob.Execution.NONE)) 441 && endMatdTime.before(new Date()); 442 LOG.debug("Coordinator job :" + coordJob.getId() + ", maxActionToBeCreated :" + maxActionToBeCreated 443 + ", Mat_Throttle :" + coordJob.getMatThrottling() + ", numWaitingActions :" + numWaitingActions); 444 445 boolean isCronFrequency = false; 446 447 Calendar effStart = (Calendar) start.clone(); 448 try { 449 int intFrequency = Integer.parseInt(coordJob.getFrequency()); 450 effStart = (Calendar) origStart.clone(); 451 effStart.add(freqTU.getCalendarUnit(), lastActionNumber * intFrequency); 452 } 453 catch (NumberFormatException e) { 454 isCronFrequency = true; 455 } 456 457 boolean firstMater = true; 458 while (effStart.compareTo(end) < 0 && (ignoreMaxActions || maxActionToBeCreated-- > 0)) { 459 if (pause != null && effStart.compareTo(pause) >= 0) { 460 break; 461 } 462 463 Date nextTime = effStart.getTime(); 464 465 if (isCronFrequency) { 466 if (effStart.getTime().compareTo(startMatdTime) == 0 && firstMater) { 467 effStart.add(Calendar.MINUTE, -1); 468 firstMater = false; 469 } 470 471 nextTime = CoordCommandUtils.getNextValidActionTimeForCronFrequency(effStart.getTime(), coordJob); 472 effStart.setTime(nextTime); 473 } 474 475 if (effStart.compareTo(end) < 0) { 476 477 if (pause != null && effStart.compareTo(pause) >= 0) { 478 break; 479 } 480 CoordinatorActionBean actionBean = new CoordinatorActionBean(); 481 lastActionNumber++; 482 483 int timeout = coordJob.getTimeout(); 484 LOG.debug("Materializing action for time=" + DateUtils.formatDateOozieTZ(effStart.getTime()) 485 + ", lastactionnumber=" + lastActionNumber + " timeout=" + timeout + " minutes"); 486 Date actualTime = new Date(); 487 action = CoordCommandUtils.materializeOneInstance(jobId, dryrun, (Element) eJob.clone(), 488 nextTime, actualTime, lastActionNumber, jobConf, actionBean); 489 actionBean.setTimeOut(timeout); 490 if (!dryrun) { 491 storeToDB(actionBean, action, jobConf); // Storing to table 492 493 } 494 else { 495 actionStrings.append("action for new instance"); 496 actionStrings.append(action); 497 } 498 } 499 else { 500 break; 501 } 502 503 if (!isCronFrequency) { 504 effStart = (Calendar) origStart.clone(); 505 effStart.add(freqTU.getCalendarUnit(), lastActionNumber * Integer.parseInt(coordJob.getFrequency())); 506 } 507 } 508 509 if (isCronFrequency) { 510 if (effStart.compareTo(end) < 0 && !(ignoreMaxActions || maxActionToBeCreated-- > 0)) { 511 //Since we exceed the throttle, we need to move the nextMadtime forward 512 //to avoid creating duplicate actions 513 if (!firstMater) { 514 effStart.setTime(CoordCommandUtils.getNextValidActionTimeForCronFrequency(effStart.getTime(), coordJob)); 515 } 516 } 517 } 518 519 endMatdTime = effStart.getTime(); 520 521 if (!dryrun) { 522 return action; 523 } 524 else { 525 return actionStrings.toString(); 526 } 527 } 528 529 private void storeToDB(CoordinatorActionBean actionBean, String actionXml, Configuration jobConf) throws Exception { 530 LOG.debug("In storeToDB() coord action id = " + actionBean.getId() + ", size of actionXml = " 531 + actionXml.length()); 532 actionBean.setActionXml(actionXml); 533 insertList.add(actionBean); 534 writeActionSlaRegistration(actionXml, actionBean, jobConf); 535 } 536 537 private void writeActionSlaRegistration(String actionXml, CoordinatorActionBean actionBean, Configuration jobConf) 538 throws Exception { 539 Element eAction = XmlUtils.parseXml(actionXml); 540 Element eSla = eAction.getChild("action", eAction.getNamespace()).getChild("info", eAction.getNamespace("sla")); 541 SLAEventBean slaEvent = SLADbOperations.createSlaRegistrationEvent(eSla, actionBean.getId(), 542 SlaAppType.COORDINATOR_ACTION, coordJob.getUser(), coordJob.getGroup(), LOG); 543 if (slaEvent != null) { 544 insertList.add(slaEvent); 545 } 546 // inserting into new table also 547 SLAOperations.createSlaRegistrationEvent(eSla, actionBean.getId(), actionBean.getJobId(), 548 AppType.COORDINATOR_ACTION, coordJob.getUser(), coordJob.getAppName(), LOG, false, 549 CoordUtils.isSlaAlertDisabled(actionBean, coordJob.getAppName(), jobConf)); 550 } 551 552 private void updateJobMaterializeInfo(CoordinatorJobBean job) throws CommandException { 553 job.setLastActionTime(endMatdTime); 554 job.setLastActionNumber(lastActionNumber); 555 // if the job endtime == action endtime, we don't need to materialize this job anymore 556 Date jobEndTime = job.getEndTime(); 557 558 559 if (job.getStatus() == CoordinatorJob.Status.PREP){ 560 LOG.info("[" + job.getId() + "]: Update status from " + job.getStatus() + " to RUNNING"); 561 job.setStatus(Job.Status.RUNNING); 562 } 563 job.setPending(); 564 565 if (jobEndTime.compareTo(endMatdTime) <= 0) { 566 LOG.info("[" + job.getId() + "]: all actions have been materialized, set pending to true"); 567 // set doneMaterialization to true when materialization is done 568 job.setDoneMaterialization(); 569 } 570 job.setStatus(StatusUtils.getStatus(job)); 571 LOG.info("Coord Job status updated to = " + job.getStatus()); 572 job.setNextMaterializedTime(endMatdTime); 573 } 574 575 /* (non-Javadoc) 576 * @see org.apache.oozie.command.XCommand#getKey() 577 */ 578 @Override 579 public String getKey() { 580 return getName() + "_" + jobId; 581 } 582 583 /* (non-Javadoc) 584 * @see org.apache.oozie.command.TransitionXCommand#notifyParent() 585 */ 586 @Override 587 public void notifyParent() throws CommandException { 588 // update bundle action only when status changes in coord job 589 if (this.coordJob.getBundleId() != null) { 590 if (!prevStatus.equals(coordJob.getStatus())) { 591 BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus); 592 bundleStatusUpdate.call(); 593 } 594 } 595 } 596}