001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.sla; 019 020 import java.util.ArrayList; 021 import java.util.Collections; 022 import java.util.Date; 023 import java.util.HashSet; 024 import java.util.Iterator; 025 import java.util.List; 026 import java.util.Map; 027 import java.util.Set; 028 import java.util.concurrent.ConcurrentHashMap; 029 030 import org.apache.hadoop.conf.Configuration; 031 import org.apache.oozie.AppType; 032 import org.apache.oozie.CoordinatorActionBean; 033 import org.apache.oozie.ErrorCode; 034 import org.apache.oozie.WorkflowActionBean; 035 import org.apache.oozie.WorkflowJobBean; 036 import org.apache.oozie.client.CoordinatorAction; 037 import org.apache.oozie.client.WorkflowAction; 038 import org.apache.oozie.client.WorkflowJob; 039 import org.apache.oozie.client.event.JobEvent; 040 import org.apache.oozie.client.event.SLAEvent.EventStatus; 041 import org.apache.oozie.client.event.SLAEvent.SLAStatus; 042 import org.apache.oozie.client.rest.JsonBean; 043 import org.apache.oozie.executor.jpa.CoordActionGetForSLAJPAExecutor; 044 import org.apache.oozie.executor.jpa.JPAExecutorException; 045 import org.apache.oozie.executor.jpa.WorkflowActionGetForSLAJPAExecutor; 046 import org.apache.oozie.executor.jpa.WorkflowJobGetForSLAJPAExecutor; 047 048 import org.apache.oozie.executor.jpa.sla.SLACalculationInsertUpdateJPAExecutor; 049 import org.apache.oozie.executor.jpa.sla.SLARegistrationGetOnRestartJPAExecutor; 050 import org.apache.oozie.executor.jpa.sla.SLASummaryGetJPAExecutor; 051 import org.apache.oozie.executor.jpa.sla.SLASummaryGetRecordsOnRestartJPAExecutor; 052 import org.apache.oozie.executor.jpa.sla.SLASummaryUpdateForSLAStatusActualTimesJPAExecutor; 053 import org.apache.oozie.service.EventHandlerService; 054 import org.apache.oozie.service.JPAService; 055 import org.apache.oozie.service.ServiceException; 056 import org.apache.oozie.service.Services; 057 import org.apache.oozie.sla.service.SLAService; 058 import org.apache.oozie.util.XLog; 059 060 061 /** 062 * Implementation class for SLACalculator that calculates SLA related to 063 * start/end/duration of jobs using a memory-based map 064 */ 065 public class SLACalculatorMemory implements SLACalculator { 066 067 private static final XLog LOG = XLog.getLog(SLACalculatorMemory.class); 068 // TODO optimization priority based insertion/processing/bumping up-down 069 private static Map<String, SLACalcStatus> slaMap; 070 private static Set<String> historySet; 071 private static int capacity; 072 private static JPAService jpaService; 073 private EventHandlerService eventHandler; 074 private static int modifiedAfter; 075 private static long jobEventLatency; 076 077 @Override 078 public void init(Configuration conf) throws ServiceException { 079 capacity = conf.getInt(SLAService.CONF_CAPACITY, 5000); 080 jobEventLatency = conf.getInt(SLAService.CONF_JOB_EVENT_LATENCY, 90 * 1000); 081 slaMap = new ConcurrentHashMap<String, SLACalcStatus>(); 082 historySet = Collections.synchronizedSet(new HashSet<String>()); 083 jpaService = Services.get().get(JPAService.class); 084 eventHandler = Services.get().get(EventHandlerService.class); 085 // load events modified after 086 modifiedAfter = conf.getInt(SLAService.CONF_EVENTS_MODIFIED_AFTER, 7); 087 loadOnRestart(); 088 089 } 090 091 private void loadOnRestart() { 092 boolean isJobModified = false; 093 try { 094 long slaPendingCount = 0; 095 long statusPendingCount = 0; 096 List<SLASummaryBean> summaryBeans = jpaService.execute(new SLASummaryGetRecordsOnRestartJPAExecutor( 097 modifiedAfter)); 098 for (SLASummaryBean summaryBean : summaryBeans) { 099 String jobId = summaryBean.getId(); 100 try { 101 switch (summaryBean.getAppType()) { 102 case COORDINATOR_ACTION: 103 isJobModified = processSummaryBeanForCoordAction(summaryBean, jobId); 104 break; 105 case WORKFLOW_ACTION: 106 isJobModified = processSummaryBeanForWorkflowAction(summaryBean, jobId); 107 break; 108 case WORKFLOW_JOB: 109 isJobModified = processSummaryBeanForWorkflowJob(summaryBean, jobId); 110 break; 111 default: 112 break; 113 } 114 if (isJobModified) { 115 jpaService.execute(new SLASummaryUpdateForSLAStatusActualTimesJPAExecutor(summaryBean)); 116 } 117 } 118 catch (Exception e) { 119 LOG.warn("Failed to load records for " + jobId, e); 120 } 121 try { 122 if (summaryBean.getEventProcessed() == 7) { 123 historySet.add(jobId); 124 statusPendingCount++; 125 } 126 else if (summaryBean.getEventProcessed() <= 7) { 127 SLARegistrationBean slaRegBean = jpaService.execute(new SLARegistrationGetOnRestartJPAExecutor( 128 jobId)); 129 SLACalcStatus slaCalcStatus = new SLACalcStatus(summaryBean, slaRegBean); 130 slaMap.put(jobId, slaCalcStatus); 131 slaPendingCount++; 132 } 133 } 134 catch (Exception e) { 135 LOG.warn("Failed to fetch/update records for " + jobId, e); 136 } 137 138 } 139 LOG.info("Loaded SLASummary pendingSLA=" + slaPendingCount + ", pendingStatusUpdate=" + statusPendingCount); 140 141 } 142 catch (Exception e) { 143 LOG.warn("Failed to retrieve SLASummary records on restart", e); 144 } 145 } 146 147 private boolean processSummaryBeanForCoordAction(SLASummaryBean summaryBean, String jobId) 148 throws JPAExecutorException { 149 boolean isJobModified = false; 150 CoordinatorActionBean coordAction = null; 151 coordAction = jpaService.execute(new CoordActionGetForSLAJPAExecutor(jobId)); 152 if (!coordAction.getStatusStr().equals(summaryBean.getJobStatus())) { 153 LOG.trace("Coordinator action status is " + coordAction.getStatusStr() + " and summary bean status is " 154 + summaryBean.getJobStatus()); 155 isJobModified = true; 156 summaryBean.setJobStatus(coordAction.getStatusStr()); 157 if (coordAction.isTerminalStatus()) { 158 WorkflowJobBean wfJob = jpaService.execute(new WorkflowJobGetForSLAJPAExecutor(coordAction 159 .getExternalId())); 160 setEndForSLASummaryBean(summaryBean, wfJob.getStartTime(), coordAction.getLastModifiedTime(), 161 coordAction.getStatusStr()); 162 } 163 else if (coordAction.getStatus() != CoordinatorAction.Status.WAITING) { 164 WorkflowJobBean wfJob = jpaService.execute(new WorkflowJobGetForSLAJPAExecutor(coordAction 165 .getExternalId())); 166 setStartForSLASummaryBean(summaryBean, summaryBean.getEventProcessed(), wfJob.getStartTime()); 167 } 168 } 169 return isJobModified; 170 } 171 172 private boolean processSummaryBeanForWorkflowAction(SLASummaryBean summaryBean, String jobId) 173 throws JPAExecutorException { 174 boolean isJobModified = false; 175 WorkflowActionBean wfAction = null; 176 wfAction = jpaService.execute(new WorkflowActionGetForSLAJPAExecutor(jobId)); 177 if (!wfAction.getStatusStr().equals(summaryBean.getJobStatus())) { 178 LOG.trace("Workflow action status is " + wfAction.getStatusStr() + "and summary bean status is " 179 + summaryBean.getJobStatus()); 180 isJobModified = true; 181 summaryBean.setJobStatus(wfAction.getStatusStr()); 182 if (wfAction.inTerminalState()) { 183 setEndForSLASummaryBean(summaryBean, wfAction.getStartTime(), wfAction.getEndTime(), wfAction.getStatusStr()); 184 } 185 else if (wfAction.getStatus() != WorkflowAction.Status.PREP) { 186 setStartForSLASummaryBean(summaryBean, summaryBean.getEventProcessed(), wfAction.getStartTime()); 187 } 188 } 189 return isJobModified; 190 } 191 192 private boolean processSummaryBeanForWorkflowJob(SLASummaryBean summaryBean, String jobId) 193 throws JPAExecutorException { 194 boolean isJobModified = false; 195 WorkflowJobBean wfJob = null; 196 wfJob = jpaService.execute(new WorkflowJobGetForSLAJPAExecutor(jobId)); 197 if (!wfJob.getStatusStr().equals(summaryBean.getJobStatus())) { 198 LOG.trace("Workflow job status is " + wfJob.getStatusStr() + "and summary bean status is " 199 + summaryBean.getJobStatus()); 200 isJobModified = true; 201 summaryBean.setJobStatus(wfJob.getStatusStr()); 202 if (wfJob.inTerminalState()) { 203 setEndForSLASummaryBean(summaryBean, wfJob.getStartTime(), wfJob.getEndTime(), wfJob.getStatusStr()); 204 } 205 else if (wfJob.getStatus() != WorkflowJob.Status.PREP) { 206 setStartForSLASummaryBean(summaryBean, summaryBean.getEventProcessed(), wfJob.getStartTime()); 207 } 208 } 209 return isJobModified; 210 } 211 212 private void setEndForSLASummaryBean(SLASummaryBean summaryBean, Date startTime, Date endTime, String status) { 213 byte eventProc = summaryBean.getEventProcessed(); 214 summaryBean.setEventProcessed(8); 215 summaryBean.setActualStart(startTime); 216 summaryBean.setActualEnd(endTime); 217 long actualDuration = endTime.getTime() - startTime.getTime(); 218 summaryBean.setActualDuration(actualDuration); 219 if (eventProc < 4) { 220 if (status.equals(WorkflowJob.Status.SUCCEEDED.name()) || status.equals(WorkflowAction.Status.OK.name()) 221 || status.equals(CoordinatorAction.Status.SUCCEEDED.name())) { 222 if (endTime.getTime() <= summaryBean.getExpectedEnd().getTime()) { 223 summaryBean.setSLAStatus(SLAStatus.MET); 224 } 225 else { 226 summaryBean.setSLAStatus(SLAStatus.MISS); 227 } 228 } 229 else { 230 summaryBean.setSLAStatus(SLAStatus.MISS); 231 } 232 } 233 234 } 235 236 private void setStartForSLASummaryBean(SLASummaryBean summaryBean, byte eventProc, Date startTime) { 237 if (((eventProc & 1) == 0)) { 238 eventProc += 1; 239 summaryBean.setEventProcessed(eventProc); 240 } 241 if (summaryBean.getSLAStatus().equals(SLAStatus.NOT_STARTED)) { 242 summaryBean.setSLAStatus(SLAStatus.IN_PROCESS); 243 } 244 summaryBean.setActualStart(startTime); 245 } 246 247 @Override 248 public int size() { 249 return slaMap.size(); 250 } 251 252 @Override 253 public SLACalcStatus get(String jobId) throws JPAExecutorException { 254 SLACalcStatus memObj; 255 memObj = slaMap.get(jobId); 256 if (memObj == null && historySet.contains(jobId)) { 257 memObj = new SLACalcStatus(jpaService.execute(new SLASummaryGetJPAExecutor(jobId)), 258 jpaService.execute(new SLARegistrationGetOnRestartJPAExecutor(jobId))); 259 } 260 return memObj; 261 } 262 263 @Override 264 public Iterator<String> iterator() { 265 return slaMap.keySet().iterator(); 266 } 267 268 @Override 269 public boolean isEmpty() { 270 return slaMap.isEmpty(); 271 } 272 273 @Override 274 public void clear() { 275 slaMap.clear(); 276 historySet.clear(); 277 } 278 279 /** 280 * Invoked via periodic run, update the SLA for registered jobs 281 */ 282 protected void updateJobSla(String jobId) throws JPAExecutorException, ServiceException { 283 SLACalcStatus slaCalc = slaMap.get(jobId); 284 synchronized (slaCalc) { 285 boolean change = false; 286 byte eventProc = slaCalc.getEventProcessed(); 287 SLARegistrationBean reg = slaCalc.getSLARegistrationBean(); 288 // calculation w.r.t current time and status 289 if ((eventProc & 1) == 0) { // first bit (start-processed) unset 290 if (reg.getExpectedStart() != null) { 291 if (reg.getExpectedStart().getTime() + jobEventLatency < System.currentTimeMillis()) { 292 confirmWithDB(slaCalc); 293 eventProc = slaCalc.getEventProcessed(); 294 if (eventProc != 8 && (eventProc & 1 ) == 0) { 295 //Some DB exception 296 slaCalc.setEventStatus(EventStatus.START_MISS); 297 eventHandler.queueEvent(new SLACalcStatus(slaCalc)); 298 eventProc++; 299 } 300 change = true; 301 } 302 } 303 else { 304 eventProc++; //disable further processing for optional start sla condition 305 change = true; 306 } 307 } 308 if (((eventProc >> 1) & 1) == 0 && eventProc != 8) { // check if second bit (duration-processed) is unset 309 if (reg.getExpectedDuration() == -1) { 310 eventProc += 2; 311 change = true; 312 } 313 else if (slaCalc.getActualStart() != null) { 314 if ((reg.getExpectedDuration() + jobEventLatency) < (System.currentTimeMillis() - slaCalc 315 .getActualStart().getTime())) { 316 slaCalc.setEventProcessed(eventProc); 317 confirmWithDB(slaCalc); 318 eventProc = slaCalc.getEventProcessed(); 319 if (eventProc != 8 && ((eventProc >> 1) & 1 ) == 0) { 320 //Some DB exception 321 slaCalc.setEventStatus(EventStatus.DURATION_MISS); 322 eventHandler.queueEvent(new SLACalcStatus(slaCalc)); 323 eventProc += 2; 324 } 325 change = true; 326 } 327 } 328 } 329 if (eventProc < 4) { 330 if (reg.getExpectedEnd().getTime() + jobEventLatency < System.currentTimeMillis()) { 331 slaCalc.setEventProcessed(eventProc); 332 confirmWithDB(slaCalc); 333 eventProc = slaCalc.getEventProcessed(); 334 change = true; 335 } 336 } 337 if (change) { 338 if (slaCalc.getEventProcessed() >= 8) { //no more processing, no transfer to history set 339 eventProc = 8; 340 slaCalc.setEventProcessed(8); // Should not be > 8. But to handle any corner cases. 341 slaMap.remove(jobId); 342 } 343 else { 344 slaCalc.setEventProcessed(eventProc); 345 } 346 SLASummaryBean slaSummaryBean = new SLASummaryBean(); 347 slaSummaryBean.setId(slaCalc.getId()); 348 slaSummaryBean.setEventProcessed(eventProc); 349 slaSummaryBean.setSLAStatus(slaCalc.getSLAStatus()); 350 slaSummaryBean.setEventStatus(slaCalc.getEventStatus()); 351 slaSummaryBean.setActualEnd(slaCalc.getActualEnd()); 352 slaSummaryBean.setActualStart(slaCalc.getActualStart()); 353 slaSummaryBean.setActualDuration(slaCalc.getActualDuration()); 354 slaSummaryBean.setJobStatus(slaCalc.getJobStatus()); 355 jpaService.execute(new SLASummaryUpdateForSLAStatusActualTimesJPAExecutor(slaSummaryBean)); 356 if (eventProc == 7) { 357 historySet.add(jobId); 358 slaMap.remove(jobId); 359 LOG.trace("Removed Job [{0}] from map after End-processed", jobId); 360 } 361 362 } 363 } 364 } 365 366 /** 367 * Periodically run by the SLAService worker threads to update SLA status by 368 * iterating through all the jobs in the map 369 */ 370 @Override 371 public void updateAllSlaStatus() { 372 LOG.info("Running periodic SLA check"); 373 Iterator<String> iterator = slaMap.keySet().iterator(); 374 while (iterator.hasNext()) { 375 String jobId = iterator.next(); 376 try { 377 LOG.trace("Processing SLA for jobid={0}", jobId); 378 updateJobSla(jobId); 379 } 380 catch (Exception e) { 381 LOG.error("Exception in SLA processing for job [{0}]", jobId, e); 382 } 383 } 384 } 385 386 /** 387 * Register a new job into the map for SLA tracking 388 */ 389 @Override 390 public boolean addRegistration(String jobId, SLARegistrationBean reg) throws JPAExecutorException { 391 try { 392 if (slaMap.size() < capacity) { 393 SLACalcStatus slaCalc = new SLACalcStatus(reg); 394 slaCalc.setSLAStatus(SLAStatus.NOT_STARTED); 395 slaCalc.setJobStatus(getJobStatus(reg.getAppType())); 396 slaMap.put(jobId, slaCalc); 397 List<JsonBean> insertList = new ArrayList<JsonBean>(); 398 insertList.add(reg); 399 insertList.add(new SLASummaryBean(slaCalc)); 400 jpaService.execute(new SLACalculationInsertUpdateJPAExecutor(insertList, null)); 401 LOG.trace("SLA Registration Event - Job:" + jobId); 402 return true; 403 } 404 else { 405 LOG.error( 406 "SLACalculator memory capacity reached. Cannot add or update new SLA Registration entry for job [{0}]", 407 reg.getId()); 408 } 409 } 410 catch (JPAExecutorException jpa) { 411 throw jpa; 412 } 413 return false; 414 } 415 416 private String getJobStatus(AppType appType) { 417 String status = null; 418 switch (appType) { 419 case COORDINATOR_ACTION: 420 status = CoordinatorAction.Status.WAITING.name(); 421 break; 422 case WORKFLOW_ACTION: 423 status = WorkflowAction.Status.PREP.name(); 424 break; 425 case WORKFLOW_JOB: 426 status = WorkflowJob.Status.PREP.name(); 427 break; 428 default: 429 break; 430 } 431 return status; 432 } 433 434 /** 435 * Update job into the map for SLA tracking 436 */ 437 @Override 438 public boolean updateRegistration(String jobId, SLARegistrationBean reg) throws JPAExecutorException { 439 try { 440 if (slaMap.size() < capacity) { 441 SLACalcStatus slaCalc = new SLACalcStatus(reg); 442 slaCalc.setSLAStatus(SLAStatus.NOT_STARTED); 443 slaCalc.setJobStatus(getJobStatus(reg.getAppType())); 444 slaMap.put(jobId, slaCalc); 445 List<JsonBean> updateList = new ArrayList<JsonBean>(); 446 updateList.add(reg); 447 updateList.add(new SLASummaryBean(slaCalc)); 448 jpaService.execute(new SLACalculationInsertUpdateJPAExecutor(null, updateList)); 449 LOG.trace("SLA Registration Event - Job:" + jobId); 450 return true; 451 } 452 else { 453 LOG.error( 454 "SLACalculator memory capacity reached. Cannot add or update new SLA Registration entry for job [{0}]", 455 reg.getId()); 456 } 457 } 458 catch (JPAExecutorException jpa) { 459 throw jpa; 460 } 461 return false; 462 } 463 464 /** 465 * Remove job from being tracked in map 466 */ 467 @Override 468 public void removeRegistration(String jobId) { 469 if (slaMap.remove(jobId) == null) { 470 historySet.remove(jobId); 471 } 472 } 473 474 /** 475 * Triggered after receiving Job status change event, update SLA status 476 * accordingly 477 */ 478 @Override 479 public boolean addJobStatus(String jobId, String jobStatus, JobEvent.EventStatus jobEventStatus, Date startTime, 480 Date endTime) throws JPAExecutorException, ServiceException { 481 SLACalcStatus slaCalc = slaMap.get(jobId); 482 SLASummaryBean slaInfo = null; 483 boolean hasSla = false; 484 if (slaCalc != null) { 485 synchronized (slaCalc) { 486 slaCalc.setJobStatus(jobStatus); 487 switch (jobEventStatus) { 488 case STARTED: 489 slaInfo = processJobStartSLA(slaCalc, startTime); 490 break; 491 case SUCCESS: 492 slaInfo = processJobEndSuccessSLA(slaCalc, startTime, endTime); 493 break; 494 case FAILURE: 495 slaInfo = processJobEndFailureSLA(slaCalc, startTime, endTime); 496 break; 497 default: 498 LOG.debug("Unknown Job Status for SLA purpose[{0}]", jobEventStatus); 499 slaInfo = getSLASummaryBean(slaCalc); 500 } 501 502 if (slaCalc.getEventProcessed() == 7) { 503 slaInfo.setEventProcessed(8); 504 slaMap.remove(jobId); 505 } 506 hasSla = true; 507 } 508 LOG.trace("SLA Status Event - Job:" + jobId + " Status:" + slaCalc.getSLAStatus()); 509 } 510 else if (historySet.contains(jobId)) { 511 slaInfo = jpaService.execute(new SLASummaryGetJPAExecutor(jobId)); 512 if (slaInfo == null) { 513 throw new JPAExecutorException(ErrorCode.E0604, jobId); 514 } 515 slaInfo.setJobStatus(jobStatus); 516 slaInfo.setActualStart(startTime); 517 slaInfo.setActualEnd(endTime); 518 if (endTime != null) { 519 slaInfo.setActualDuration(endTime.getTime() - startTime.getTime()); 520 } 521 slaInfo.setEventProcessed(8); 522 historySet.remove(jobId); 523 hasSla = true; 524 } 525 526 if (hasSla) { 527 jpaService.execute(new SLASummaryUpdateForSLAStatusActualTimesJPAExecutor(slaInfo)); 528 } 529 return hasSla; 530 } 531 532 /** 533 * Process SLA for jobs that started running. Also update actual-start time 534 * 535 * @param slaCalc 536 * @param actualStart 537 * @return SLASummaryBean 538 */ 539 private SLASummaryBean processJobStartSLA(SLACalcStatus slaCalc, Date actualStart) { 540 slaCalc.setActualStart(actualStart); 541 if (slaCalc.getSLAStatus().equals(SLAStatus.NOT_STARTED)) { 542 slaCalc.setSLAStatus(SLAStatus.IN_PROCESS); 543 } 544 SLARegistrationBean reg = slaCalc.getSLARegistrationBean(); 545 Date expecStart = reg.getExpectedStart(); 546 byte eventProc = slaCalc.getEventProcessed(); 547 // set event proc here 548 if (((eventProc & 1) == 0)) { 549 if (expecStart != null) { 550 if (actualStart.getTime() > expecStart.getTime()) { 551 slaCalc.setEventStatus(EventStatus.START_MISS); 552 } 553 else { 554 slaCalc.setEventStatus(EventStatus.START_MET); 555 } 556 eventHandler.queueEvent(new SLACalcStatus(slaCalc)); 557 } 558 eventProc += 1; 559 slaCalc.setEventProcessed(eventProc); 560 } 561 return getSLASummaryBean(slaCalc); 562 } 563 564 /** 565 * Process SLA for jobs that ended successfully. Also update actual-start 566 * and end time 567 * 568 * @param slaCalc 569 * @param actualStart 570 * @param actualEnd 571 * @return SLASummaryBean 572 * @throws JPAExecutorException 573 */ 574 private SLASummaryBean processJobEndSuccessSLA(SLACalcStatus slaCalc, Date actualStart, Date actualEnd) throws JPAExecutorException { 575 SLARegistrationBean reg = slaCalc.getSLARegistrationBean(); 576 slaCalc.setActualStart(actualStart); 577 slaCalc.setActualEnd(actualEnd); 578 long expectedDuration = reg.getExpectedDuration(); 579 long actualDuration = actualEnd.getTime() - actualStart.getTime(); 580 slaCalc.setActualDuration(actualDuration); 581 //check event proc 582 byte eventProc = slaCalc.getEventProcessed(); 583 if (((eventProc >> 1) & 1) == 0) { 584 processDurationSLA(expectedDuration, actualDuration, slaCalc); 585 eventProc += 2; 586 slaCalc.setEventProcessed(eventProc); 587 } 588 589 if (eventProc < 4) { 590 Date expectedEnd = reg.getExpectedEnd(); 591 if (actualEnd.getTime() > expectedEnd.getTime()) { 592 slaCalc.setEventStatus(EventStatus.END_MISS); 593 slaCalc.setSLAStatus(SLAStatus.MISS); 594 } 595 else { 596 slaCalc.setEventStatus(EventStatus.END_MET); 597 slaCalc.setSLAStatus(SLAStatus.MET); 598 } 599 eventProc += 4; 600 slaCalc.setEventProcessed(eventProc); 601 eventHandler.queueEvent(new SLACalcStatus(slaCalc)); 602 } 603 return getSLASummaryBean(slaCalc); 604 } 605 606 /** 607 * Process SLA for jobs that ended in failure. Also update actual-start and 608 * end time 609 * 610 * @param slaCalc 611 * @param actualStart 612 * @param actualEnd 613 * @return SLASummaryBean 614 * @throws JPAExecutorException 615 */ 616 private SLASummaryBean processJobEndFailureSLA(SLACalcStatus slaCalc, Date actualStart, Date actualEnd) throws JPAExecutorException { 617 slaCalc.setActualStart(actualStart); 618 slaCalc.setActualEnd(actualEnd); 619 if (actualStart == null) { // job failed before starting 620 if (slaCalc.getEventProcessed() < 4) { 621 slaCalc.setEventStatus(EventStatus.END_MISS); 622 slaCalc.setSLAStatus(SLAStatus.MISS); 623 eventHandler.queueEvent(new SLACalcStatus(slaCalc)); 624 slaCalc.setEventProcessed(7); 625 return getSLASummaryBean(slaCalc); 626 } 627 } 628 SLARegistrationBean reg = slaCalc.getSLARegistrationBean(); 629 long expectedDuration = reg.getExpectedDuration(); 630 long actualDuration = actualEnd.getTime() - actualStart.getTime(); 631 slaCalc.setActualDuration(actualDuration); 632 633 byte eventProc = slaCalc.getEventProcessed(); 634 if (((eventProc >> 1) & 1) == 0) { 635 if (expectedDuration != -1) { 636 slaCalc.setEventStatus(EventStatus.DURATION_MISS); 637 eventHandler.queueEvent(new SLACalcStatus(slaCalc)); 638 } 639 eventProc += 2; 640 slaCalc.setEventProcessed(eventProc); 641 } 642 if (eventProc < 4) { 643 slaCalc.setEventStatus(EventStatus.END_MISS); 644 slaCalc.setSLAStatus(SLAStatus.MISS); 645 eventProc += 4; 646 slaCalc.setEventProcessed(eventProc); 647 eventHandler.queueEvent(new SLACalcStatus(slaCalc)); 648 } 649 return getSLASummaryBean(slaCalc); 650 } 651 652 private SLASummaryBean getSLASummaryBean (SLACalcStatus slaCalc) { 653 SLASummaryBean slaSummaryBean = new SLASummaryBean(); 654 slaSummaryBean.setActualStart(slaCalc.getActualStart()); 655 slaSummaryBean.setActualEnd(slaCalc.getActualEnd()); 656 slaSummaryBean.setActualDuration(slaCalc.getActualDuration()); 657 slaSummaryBean.setSLAStatus(slaCalc.getSLAStatus()); 658 slaSummaryBean.setEventStatus(slaCalc.getEventStatus()); 659 slaSummaryBean.setEventProcessed(slaCalc.getEventProcessed()); 660 slaSummaryBean.setId(slaCalc.getId()); 661 slaSummaryBean.setJobStatus(slaCalc.getJobStatus()); 662 return slaSummaryBean; 663 } 664 665 private void processDurationSLA(long expected, long actual, SLACalcStatus slaCalc) { 666 if (expected != -1 && actual > expected) { 667 slaCalc.setEventStatus(EventStatus.DURATION_MISS); 668 eventHandler.queueEvent(new SLACalcStatus(slaCalc)); 669 } 670 else if (expected != -1 && actual <= expected) { 671 slaCalc.setEventStatus(EventStatus.DURATION_MET); 672 eventHandler.queueEvent(new SLACalcStatus(slaCalc)); 673 } 674 } 675 676 /* 677 * Confirm alerts against source of truth - DB. Also required in case of High Availability 678 */ 679 private void confirmWithDB(SLACalcStatus slaCalc) { 680 boolean ended = false, isEndMiss = false; 681 try { 682 switch (slaCalc.getAppType()) { 683 case WORKFLOW_JOB: 684 WorkflowJobBean wf = jpaService.execute(new WorkflowJobGetForSLAJPAExecutor(slaCalc.getId())); 685 if (wf.getEndTime() != null) { 686 ended = true; 687 if (wf.getStatus() == WorkflowJob.Status.KILLED || wf.getStatus() == WorkflowJob.Status.FAILED 688 || wf.getEndTime().getTime() > slaCalc.getExpectedEnd().getTime()) { 689 isEndMiss = true; 690 } 691 } 692 slaCalc.setActualStart(wf.getStartTime()); 693 slaCalc.setActualEnd(wf.getEndTime()); 694 slaCalc.setJobStatus(wf.getStatusStr()); 695 break; 696 case WORKFLOW_ACTION: 697 WorkflowActionBean wa = jpaService.execute(new WorkflowActionGetForSLAJPAExecutor(slaCalc.getId())); 698 if (wa.getEndTime() != null) { 699 ended = true; 700 if (wa.isTerminalWithFailure() 701 || wa.getEndTime().getTime() > slaCalc.getExpectedEnd().getTime()) { 702 isEndMiss = true; 703 } 704 } 705 slaCalc.setActualStart(wa.getStartTime()); 706 slaCalc.setActualEnd(wa.getEndTime()); 707 slaCalc.setJobStatus(wa.getStatusStr()); 708 break; 709 case COORDINATOR_ACTION: 710 CoordinatorActionBean ca = jpaService.execute(new CoordActionGetForSLAJPAExecutor(slaCalc.getId())); 711 if (ca.isTerminalWithFailure()) { 712 isEndMiss = ended = true; 713 slaCalc.setActualStart(null); 714 slaCalc.setActualEnd(ca.getLastModifiedTime()); 715 } 716 if (ca.getExternalId() != null) { 717 wf = jpaService.execute(new WorkflowJobGetForSLAJPAExecutor(ca.getExternalId())); 718 if (wf.getEndTime() != null) { 719 ended = true; 720 if (wf.getEndTime().getTime() > slaCalc.getExpectedEnd().getTime()) { 721 isEndMiss = true; 722 } 723 } 724 slaCalc.setActualEnd(wf.getEndTime()); 725 slaCalc.setActualStart(wf.getStartTime()); 726 } 727 slaCalc.setJobStatus(ca.getStatusStr()); 728 break; 729 default: 730 LOG.debug("Unsupported App-type for SLA - " + slaCalc.getAppType()); 731 } 732 733 byte eventProc = slaCalc.getEventProcessed(); 734 if (ended) { 735 if (isEndMiss) { 736 slaCalc.setSLAStatus(SLAStatus.MISS); 737 } 738 else { 739 slaCalc.setSLAStatus(SLAStatus.MET); 740 } 741 if (slaCalc.getActualStart() != null) { 742 if ((eventProc & 1) == 0) { 743 if (slaCalc.getExpectedStart().getTime() < slaCalc.getActualStart().getTime()) { 744 slaCalc.setEventStatus(EventStatus.START_MISS); 745 } 746 else { 747 slaCalc.setEventStatus(EventStatus.START_MET); 748 } 749 eventHandler.queueEvent(new SLACalcStatus(slaCalc)); 750 } 751 slaCalc.setActualDuration(slaCalc.getActualEnd().getTime() - slaCalc.getActualStart().getTime()); 752 if (((eventProc >> 1) & 1) == 0) { 753 processDurationSLA(slaCalc.getExpectedDuration(), slaCalc.getActualDuration(), slaCalc); 754 } 755 } 756 if (eventProc < 4) { 757 if (isEndMiss) { 758 slaCalc.setEventStatus(EventStatus.END_MISS); 759 } 760 else { 761 slaCalc.setEventStatus(EventStatus.END_MET); 762 } 763 eventHandler.queueEvent(new SLACalcStatus(slaCalc)); 764 } 765 slaCalc.setEventProcessed(8); 766 } 767 else { 768 if (slaCalc.getActualStart() != null) { 769 slaCalc.setSLAStatus(SLAStatus.IN_PROCESS); 770 } 771 if ((eventProc & 1) == 0) { 772 if (slaCalc.getActualStart() != null) { 773 if (slaCalc.getExpectedStart().getTime() < slaCalc.getActualStart().getTime()) { 774 slaCalc.setEventStatus(EventStatus.START_MISS); 775 } 776 else { 777 slaCalc.setEventStatus(EventStatus.START_MET); 778 } 779 eventHandler.queueEvent(new SLACalcStatus(slaCalc)); 780 eventProc++; 781 } 782 else if (slaCalc.getExpectedStart().getTime() < System.currentTimeMillis()) { 783 slaCalc.setEventStatus(EventStatus.START_MISS); 784 eventHandler.queueEvent(new SLACalcStatus(slaCalc)); 785 eventProc++; 786 } 787 } 788 if (((eventProc >> 1) & 1) == 0 && slaCalc.getActualStart() != null 789 && slaCalc.getExpectedDuration() != -1) { 790 if (System.currentTimeMillis() - slaCalc.getActualStart().getTime() > slaCalc.getExpectedDuration()) { 791 slaCalc.setEventStatus(EventStatus.DURATION_MISS); 792 eventHandler.queueEvent(new SLACalcStatus(slaCalc)); 793 eventProc += 2; 794 } 795 } 796 if (eventProc < 4 && slaCalc.getExpectedEnd().getTime() < System.currentTimeMillis()) { 797 slaCalc.setEventStatus(EventStatus.END_MISS); 798 slaCalc.setSLAStatus(SLAStatus.MISS); 799 eventHandler.queueEvent(new SLACalcStatus(slaCalc)); 800 eventProc += 4; 801 } 802 slaCalc.setEventProcessed(eventProc); 803 } 804 } 805 catch (Exception e) { 806 LOG.warn("Error while confirming SLA against DB for jobid= " + slaCalc.getId() + ". Exception is " 807 + e.getClass().getName() + ": " + e.getMessage()); 808 if (slaCalc.getEventProcessed() < 4 && slaCalc.getExpectedEnd().getTime() < System.currentTimeMillis()) { 809 slaCalc.setEventStatus(EventStatus.END_MISS); 810 slaCalc.setSLAStatus(SLAStatus.MISS); 811 eventHandler.queueEvent(new SLACalcStatus(slaCalc)); 812 slaCalc.setEventProcessed(slaCalc.getEventProcessed() + 4); 813 } 814 } 815 } 816 817 }