001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.sla; 019 020import java.sql.Timestamp; 021import java.util.ArrayList; 022import java.util.Collections; 023import java.util.Date; 024import java.util.HashSet; 025import java.util.Iterator; 026import java.util.List; 027import java.util.Map; 028import java.util.Set; 029import java.util.concurrent.ConcurrentHashMap; 030 031import org.apache.hadoop.conf.Configuration; 032import org.apache.oozie.AppType; 033import org.apache.oozie.CoordinatorActionBean; 034import org.apache.oozie.CoordinatorJobBean; 035import org.apache.oozie.ErrorCode; 036import org.apache.oozie.WorkflowActionBean; 037import org.apache.oozie.WorkflowJobBean; 038import org.apache.oozie.XException; 039import org.apache.oozie.client.CoordinatorAction; 040import org.apache.oozie.client.WorkflowAction; 041import org.apache.oozie.client.WorkflowJob; 042import org.apache.oozie.client.event.JobEvent; 043import org.apache.oozie.client.event.SLAEvent.EventStatus; 044import org.apache.oozie.client.event.SLAEvent.SLAStatus; 045import org.apache.oozie.client.rest.JsonBean; 046import org.apache.oozie.executor.jpa.BatchQueryExecutor; 047import org.apache.oozie.executor.jpa.CoordActionGetForSLAJPAExecutor; 048import org.apache.oozie.executor.jpa.CoordActionQueryExecutor; 049import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery; 050import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; 051import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery; 052import org.apache.oozie.executor.jpa.JPAExecutorException; 053import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor; 054import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor; 055import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor; 056import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor.SLARegQuery; 057import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor; 058import org.apache.oozie.executor.jpa.WorkflowActionGetForSLAJPAExecutor; 059import org.apache.oozie.executor.jpa.WorkflowJobGetForSLAJPAExecutor; 060import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery; 061import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery; 062import org.apache.oozie.executor.jpa.sla.SLASummaryGetRecordsOnRestartJPAExecutor; 063import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery; 064import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry; 065import org.apache.oozie.lock.LockToken; 066import org.apache.oozie.service.EventHandlerService; 067import org.apache.oozie.service.JPAService; 068import org.apache.oozie.service.JobsConcurrencyService; 069import org.apache.oozie.service.MemoryLocksService; 070import org.apache.oozie.service.SchedulerService; 071import org.apache.oozie.service.ServiceException; 072import org.apache.oozie.service.Services; 073import org.apache.oozie.sla.service.SLAService; 074import org.apache.oozie.util.DateUtils; 075import org.apache.oozie.util.LogUtils; 076import org.apache.oozie.util.XLog; 077 078import com.google.common.annotations.VisibleForTesting; 079 080 081/** 082 * Implementation class for SLACalculator that calculates SLA related to 083 * start/end/duration of jobs using a memory-based map 084 */ 085public class SLACalculatorMemory implements SLACalculator { 086 087 private static XLog LOG = XLog.getLog(SLACalculatorMemory.class); 088 // TODO optimization priority based insertion/processing/bumping up-down 089 protected Map<String, SLACalcStatus> slaMap; 090 protected Set<String> historySet; 091 private static int capacity; 092 private static JPAService jpaService; 093 protected EventHandlerService eventHandler; 094 private static int modifiedAfter; 095 private static long jobEventLatency; 096 097 @Override 098 public void init(Configuration conf) throws ServiceException { 099 capacity = conf.getInt(SLAService.CONF_CAPACITY, 5000); 100 jobEventLatency = conf.getInt(SLAService.CONF_JOB_EVENT_LATENCY, 90 * 1000); 101 slaMap = new ConcurrentHashMap<String, SLACalcStatus>(); 102 historySet = Collections.synchronizedSet(new HashSet<String>()); 103 jpaService = Services.get().get(JPAService.class); 104 eventHandler = Services.get().get(EventHandlerService.class); 105 // load events modified after 106 modifiedAfter = conf.getInt(SLAService.CONF_EVENTS_MODIFIED_AFTER, 7); 107 loadOnRestart(); 108 Runnable purgeThread = new HistoryPurgeWorker(); 109 // schedule runnable by default 1 day 110 Services.get() 111 .get(SchedulerService.class) 112 .schedule(purgeThread, 86400, Services.get().getConf().getInt(SLAService.CONF_SLA_HISTORY_PURGE_INTERVAL, 86400), 113 SchedulerService.Unit.SEC); 114 } 115 116 public class HistoryPurgeWorker implements Runnable { 117 118 public HistoryPurgeWorker() { 119 } 120 121 @Override 122 public void run() { 123 if (Thread.currentThread().isInterrupted()) { 124 return; 125 } 126 Iterator<String> jobItr = historySet.iterator(); 127 while (jobItr.hasNext()) { 128 String jobId = jobItr.next(); 129 130 if (jobId.endsWith("-W")) { 131 WorkflowJobBean wfJob = null; 132 try { 133 wfJob = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_STATUS, jobId); 134 } 135 catch (JPAExecutorException e) { 136 if (e.getErrorCode().equals(ErrorCode.E0604)) { 137 jobItr.remove(); 138 } 139 else { 140 LOG.info("Failed to fetch the workflow job: " + jobId, e); 141 } 142 } 143 if (wfJob != null && wfJob.inTerminalState()) { 144 try { 145 updateSLASummary(wfJob.getId(), wfJob.getStartTime(), wfJob.getEndTime()); 146 jobItr.remove(); 147 } 148 catch (JPAExecutorException e) { 149 LOG.info("Failed to update SLASummaryBean when purging history set entry for " + jobId, e); 150 } 151 152 } 153 } 154 else if (jobId.contains("-W@")) { 155 WorkflowActionBean wfAction = null; 156 try { 157 wfAction = WorkflowActionQueryExecutor.getInstance().get( 158 WorkflowActionQuery.GET_ACTION_COMPLETED, jobId); 159 } 160 catch (JPAExecutorException e) { 161 if (e.getErrorCode().equals(ErrorCode.E0605)) { 162 jobItr.remove(); 163 } 164 else { 165 LOG.info("Failed to fetch the workflow action: " + jobId, e); 166 } 167 } 168 if (wfAction != null && (wfAction.isComplete() || wfAction.isTerminalWithFailure())) { 169 try { 170 updateSLASummary(wfAction.getId(), wfAction.getStartTime(), wfAction.getEndTime()); 171 jobItr.remove(); 172 } 173 catch (JPAExecutorException e) { 174 LOG.info("Failed to update SLASummaryBean when purging history set entry for " + jobId, e); 175 } 176 } 177 } 178 else if (jobId.contains("-C@")) { 179 CoordinatorActionBean cAction = null; 180 try { 181 cAction = CoordActionQueryExecutor.getInstance().get(CoordActionQuery.GET_COORD_ACTION, jobId); 182 } 183 catch (JPAExecutorException e) { 184 if (e.getErrorCode().equals(ErrorCode.E0605)) { 185 jobItr.remove(); 186 } 187 else { 188 LOG.info("Failed to fetch the coord action: " + jobId, e); 189 } 190 } 191 if (cAction != null && cAction.isTerminalStatus()) { 192 try { 193 updateSLASummaryForCoordAction(cAction); 194 jobItr.remove(); 195 } 196 catch (JPAExecutorException e) { 197 XLog.getLog(SLACalculatorMemory.class).info( 198 "Failed to update SLASummaryBean when purging history set entry for " + jobId, e); 199 } 200 201 } 202 } 203 else if (jobId.endsWith("-C")) { 204 CoordinatorJobBean cJob = null; 205 try { 206 cJob = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB_STATUS_PARENTID, 207 jobId); 208 } 209 catch (JPAExecutorException e) { 210 if (e.getErrorCode().equals(ErrorCode.E0604)) { 211 jobItr.remove(); 212 } 213 else { 214 LOG.info("Failed to fetch the coord job: " + jobId, e); 215 } 216 } 217 if (cJob != null && cJob.isTerminalStatus()) { 218 try { 219 updateSLASummary(cJob.getId(), cJob.getStartTime(), cJob.getEndTime()); 220 jobItr.remove(); 221 } 222 catch (JPAExecutorException e) { 223 LOG.info("Failed to update SLASummaryBean when purging history set entry for " + jobId, e); 224 } 225 226 } 227 } 228 } 229 } 230 231 private void updateSLASummary(String id, Date startTime, Date endTime) throws JPAExecutorException { 232 SLASummaryBean sla = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, id); 233 if (sla != null) { 234 sla.setActualStart(startTime); 235 sla.setActualEnd(endTime); 236 if (startTime != null && endTime != null) { 237 sla.setActualDuration(endTime.getTime() - startTime.getTime()); 238 } 239 sla.setLastModifiedTime(new Date()); 240 sla.setEventProcessed(8); 241 SLASummaryQueryExecutor.getInstance().executeUpdate( 242 SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_ACTUAL_TIMES, sla); 243 } 244 } 245 246 private void updateSLASummaryForCoordAction(CoordinatorActionBean bean) throws JPAExecutorException { 247 String wrkflowId = bean.getExternalId(); 248 if (wrkflowId != null) { 249 WorkflowJobBean wrkflow = WorkflowJobQueryExecutor.getInstance().get( 250 WorkflowJobQuery.GET_WORKFLOW_START_END_TIME, wrkflowId); 251 if (wrkflow != null) { 252 updateSLASummary(bean.getId(), wrkflow.getStartTime(), wrkflow.getEndTime()); 253 } 254 } 255 } 256 } 257 258 private void loadOnRestart() { 259 boolean isJobModified = false; 260 try { 261 long slaPendingCount = 0; 262 long statusPendingCount = 0; 263 List<SLASummaryBean> summaryBeans = jpaService.execute(new SLASummaryGetRecordsOnRestartJPAExecutor( 264 modifiedAfter)); 265 for (SLASummaryBean summaryBean : summaryBeans) { 266 String jobId = summaryBean.getId(); 267 LockToken lock = null; 268 switch (summaryBean.getAppType()) { 269 case COORDINATOR_ACTION: 270 isJobModified = processSummaryBeanForCoordAction(summaryBean, jobId); 271 break; 272 case WORKFLOW_ACTION: 273 isJobModified = processSummaryBeanForWorkflowAction(summaryBean, jobId); 274 break; 275 case WORKFLOW_JOB: 276 isJobModified = processSummaryBeanForWorkflowJob(summaryBean, jobId); 277 break; 278 default: 279 break; 280 } 281 if (isJobModified) { 282 try { 283 boolean update = true; 284 if (Services.get().get(JobsConcurrencyService.class).isHighlyAvailableMode()) { 285 lock = Services 286 .get() 287 .get(MemoryLocksService.class) 288 .getWriteLock( 289 SLACalcStatus.SLA_ENTITYKEY_PREFIX + jobId, 290 Services.get().getConf() 291 .getLong(SLAService.CONF_SLA_CALC_LOCK_TIMEOUT, 5 * 1000)); 292 if (lock == null) { 293 update = false; 294 } 295 } 296 if (update) { 297 summaryBean.setLastModifiedTime(new Date()); 298 SLASummaryQueryExecutor.getInstance().executeUpdate( 299 SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES, summaryBean); 300 } 301 } 302 catch (Exception e) { 303 LOG.warn("Failed to load records for " + jobId, e); 304 } 305 finally { 306 if (lock != null) { 307 lock.release(); 308 lock = null; 309 } 310 } 311 } 312 try { 313 if (summaryBean.getEventProcessed() == 7) { 314 historySet.add(jobId); 315 statusPendingCount++; 316 } 317 else if (summaryBean.getEventProcessed() <= 7) { 318 SLARegistrationBean slaRegBean = SLARegistrationQueryExecutor.getInstance().get( 319 SLARegQuery.GET_SLA_REG_ON_RESTART, jobId); 320 SLACalcStatus slaCalcStatus = new SLACalcStatus(summaryBean, slaRegBean); 321 slaMap.put(jobId, slaCalcStatus); 322 slaPendingCount++; 323 } 324 } 325 catch (Exception e) { 326 LOG.warn("Failed to fetch/update records for " + jobId, e); 327 } 328 329 } 330 LOG.info("Loaded SLASummary pendingSLA=" + slaPendingCount + ", pendingStatusUpdate=" + statusPendingCount); 331 332 } 333 catch (Exception e) { 334 LOG.warn("Failed to retrieve SLASummary records on restart", e); 335 } 336 } 337 338 private boolean processSummaryBeanForCoordAction(SLASummaryBean summaryBean, String jobId) 339 throws JPAExecutorException { 340 boolean isJobModified = false; 341 CoordinatorActionBean coordAction = null; 342 coordAction = jpaService.execute(new CoordActionGetForSLAJPAExecutor(jobId)); 343 if (!coordAction.getStatusStr().equals(summaryBean.getJobStatus())) { 344 LOG.trace("Coordinator action status is " + coordAction.getStatusStr() + " and summary bean status is " 345 + summaryBean.getJobStatus()); 346 isJobModified = true; 347 summaryBean.setJobStatus(coordAction.getStatusStr()); 348 if (coordAction.isTerminalStatus()) { 349 WorkflowJobBean wfJob = jpaService.execute(new WorkflowJobGetForSLAJPAExecutor(coordAction 350 .getExternalId())); 351 setEndForSLASummaryBean(summaryBean, wfJob.getStartTime(), coordAction.getLastModifiedTime(), 352 coordAction.getStatusStr()); 353 } 354 else if (coordAction.getStatus() != CoordinatorAction.Status.WAITING) { 355 WorkflowJobBean wfJob = jpaService.execute(new WorkflowJobGetForSLAJPAExecutor(coordAction 356 .getExternalId())); 357 setStartForSLASummaryBean(summaryBean, summaryBean.getEventProcessed(), wfJob.getStartTime()); 358 } 359 } 360 return isJobModified; 361 } 362 363 private boolean processSummaryBeanForWorkflowAction(SLASummaryBean summaryBean, String jobId) 364 throws JPAExecutorException { 365 boolean isJobModified = false; 366 WorkflowActionBean wfAction = null; 367 wfAction = jpaService.execute(new WorkflowActionGetForSLAJPAExecutor(jobId)); 368 if (!wfAction.getStatusStr().equals(summaryBean.getJobStatus())) { 369 LOG.trace("Workflow action status is " + wfAction.getStatusStr() + "and summary bean status is " 370 + summaryBean.getJobStatus()); 371 isJobModified = true; 372 summaryBean.setJobStatus(wfAction.getStatusStr()); 373 if (wfAction.inTerminalState()) { 374 setEndForSLASummaryBean(summaryBean, wfAction.getStartTime(), wfAction.getEndTime(), wfAction.getStatusStr()); 375 } 376 else if (wfAction.getStatus() != WorkflowAction.Status.PREP) { 377 setStartForSLASummaryBean(summaryBean, summaryBean.getEventProcessed(), wfAction.getStartTime()); 378 } 379 } 380 return isJobModified; 381 } 382 383 private boolean processSummaryBeanForWorkflowJob(SLASummaryBean summaryBean, String jobId) 384 throws JPAExecutorException { 385 boolean isJobModified = false; 386 WorkflowJobBean wfJob = null; 387 wfJob = jpaService.execute(new WorkflowJobGetForSLAJPAExecutor(jobId)); 388 if (!wfJob.getStatusStr().equals(summaryBean.getJobStatus())) { 389 LOG.trace("Workflow job status is " + wfJob.getStatusStr() + "and summary bean status is " 390 + summaryBean.getJobStatus()); 391 isJobModified = true; 392 summaryBean.setJobStatus(wfJob.getStatusStr()); 393 if (wfJob.inTerminalState()) { 394 setEndForSLASummaryBean(summaryBean, wfJob.getStartTime(), wfJob.getEndTime(), wfJob.getStatusStr()); 395 } 396 else if (wfJob.getStatus() != WorkflowJob.Status.PREP) { 397 setStartForSLASummaryBean(summaryBean, summaryBean.getEventProcessed(), wfJob.getStartTime()); 398 } 399 } 400 return isJobModified; 401 } 402 403 private void setEndForSLASummaryBean(SLASummaryBean summaryBean, Date startTime, Date endTime, String status) { 404 byte eventProc = summaryBean.getEventProcessed(); 405 summaryBean.setEventProcessed(8); 406 summaryBean.setActualStart(startTime); 407 summaryBean.setActualEnd(endTime); 408 long actualDuration = endTime.getTime() - startTime.getTime(); 409 summaryBean.setActualDuration(actualDuration); 410 if (eventProc < 4) { 411 if (status.equals(WorkflowJob.Status.SUCCEEDED.name()) || status.equals(WorkflowAction.Status.OK.name()) 412 || status.equals(CoordinatorAction.Status.SUCCEEDED.name())) { 413 if (endTime.getTime() <= summaryBean.getExpectedEnd().getTime()) { 414 summaryBean.setSLAStatus(SLAStatus.MET); 415 } 416 else { 417 summaryBean.setSLAStatus(SLAStatus.MISS); 418 } 419 } 420 else { 421 summaryBean.setSLAStatus(SLAStatus.MISS); 422 } 423 } 424 425 } 426 427 private void setStartForSLASummaryBean(SLASummaryBean summaryBean, byte eventProc, Date startTime) { 428 if (((eventProc & 1) == 0)) { 429 eventProc += 1; 430 summaryBean.setEventProcessed(eventProc); 431 } 432 if (summaryBean.getSLAStatus().equals(SLAStatus.NOT_STARTED)) { 433 summaryBean.setSLAStatus(SLAStatus.IN_PROCESS); 434 } 435 summaryBean.setActualStart(startTime); 436 } 437 438 @Override 439 public int size() { 440 return slaMap.size(); 441 } 442 443 @Override 444 public SLACalcStatus get(String jobId) throws JPAExecutorException { 445 SLACalcStatus memObj; 446 memObj = slaMap.get(jobId); 447 if (memObj == null && historySet.contains(jobId)) { 448 memObj = new SLACalcStatus(SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId), 449 SLARegistrationQueryExecutor.getInstance().get(SLARegQuery.GET_SLA_REG_ON_RESTART, jobId)); 450 } 451 return memObj; 452 } 453 454 @Override 455 public Iterator<String> iterator() { 456 return slaMap.keySet().iterator(); 457 } 458 459 @Override 460 public boolean isEmpty() { 461 return slaMap.isEmpty(); 462 } 463 464 @Override 465 public void clear() { 466 slaMap.clear(); 467 historySet.clear(); 468 } 469 470 /** 471 * Invoked via periodic run, update the SLA for registered jobs 472 */ 473 protected void updateJobSla(String jobId) throws Exception { 474 SLACalcStatus slaCalc = slaMap.get(jobId); 475 synchronized (slaCalc) { 476 boolean change = false; 477 // get eventProcessed on DB for validation in HA 478 Object eventProcObj = ((SLASummaryQueryExecutor) SLASummaryQueryExecutor.getInstance()).getSingleValue( 479 SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED, jobId); 480 byte eventProc = ((Byte) eventProcObj).byteValue(); 481 if (eventProc >= 7) { 482 if (eventProc == 7) { 483 historySet.add(jobId); 484 } 485 slaMap.remove(jobId); 486 LOG.trace("Removed Job [{0}] from map as SLA processed", jobId); 487 } 488 else { 489 slaCalc.setEventProcessed(eventProc); 490 SLARegistrationBean reg = slaCalc.getSLARegistrationBean(); 491 // calculation w.r.t current time and status 492 if ((eventProc & 1) == 0) { // first bit (start-processed) unset 493 if (reg.getExpectedStart() != null) { 494 if (reg.getExpectedStart().getTime() + jobEventLatency < System.currentTimeMillis()) { 495 confirmWithDB(slaCalc); 496 eventProc = slaCalc.getEventProcessed(); 497 if (eventProc != 8 && (eventProc & 1) == 0) { 498 // Some DB exception 499 slaCalc.setEventStatus(EventStatus.START_MISS); 500 eventHandler.queueEvent(new SLACalcStatus(slaCalc)); 501 eventProc++; 502 } 503 change = true; 504 } 505 } 506 else { 507 eventProc++; // disable further processing for optional start sla condition 508 change = true; 509 } 510 } 511 // check if second bit (duration-processed) is unset 512 if (eventProc != 8 && ((eventProc >> 1) & 1) == 0) { 513 if (reg.getExpectedDuration() == -1) { 514 eventProc += 2; 515 change = true; 516 } 517 else if (slaCalc.getActualStart() != null) { 518 if ((reg.getExpectedDuration() + jobEventLatency) < (System.currentTimeMillis() - slaCalc 519 .getActualStart().getTime())) { 520 slaCalc.setEventProcessed(eventProc); 521 confirmWithDB(slaCalc); 522 eventProc = slaCalc.getEventProcessed(); 523 if (eventProc != 8 && ((eventProc >> 1) & 1) == 0) { 524 // Some DB exception 525 slaCalc.setEventStatus(EventStatus.DURATION_MISS); 526 eventHandler.queueEvent(new SLACalcStatus(slaCalc)); 527 eventProc += 2; 528 } 529 change = true; 530 } 531 } 532 } 533 if (eventProc < 4) { 534 if (reg.getExpectedEnd().getTime() + jobEventLatency < System.currentTimeMillis()) { 535 slaCalc.setEventProcessed(eventProc); 536 confirmWithDB(slaCalc); 537 eventProc = slaCalc.getEventProcessed(); 538 change = true; 539 } 540 } 541 if (change) { 542 try { 543 boolean locked = true; 544 slaCalc.acquireLock(); 545 locked = slaCalc.isLocked(); 546 if (locked) { 547 // no more processing, no transfer to history set 548 if (slaCalc.getEventProcessed() >= 8) { 549 eventProc = 8; 550 // Should not be > 8. But to handle any corner cases 551 slaCalc.setEventProcessed(8); 552 slaMap.remove(jobId); 553 } 554 else { 555 slaCalc.setEventProcessed(eventProc); 556 } 557 SLASummaryBean slaSummaryBean = new SLASummaryBean(); 558 slaSummaryBean.setId(slaCalc.getId()); 559 slaSummaryBean.setEventProcessed(eventProc); 560 slaSummaryBean.setSLAStatus(slaCalc.getSLAStatus()); 561 slaSummaryBean.setEventStatus(slaCalc.getEventStatus()); 562 slaSummaryBean.setActualEnd(slaCalc.getActualEnd()); 563 slaSummaryBean.setActualStart(slaCalc.getActualStart()); 564 slaSummaryBean.setActualDuration(slaCalc.getActualDuration()); 565 slaSummaryBean.setJobStatus(slaCalc.getJobStatus()); 566 slaSummaryBean.setLastModifiedTime(new Date()); 567 SLASummaryQueryExecutor.getInstance().executeUpdate( 568 SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES, slaSummaryBean); 569 if (eventProc == 7) { 570 historySet.add(jobId); 571 slaMap.remove(jobId); 572 LOG.trace("Removed Job [{0}] from map after End-processed", jobId); 573 } 574 } 575 } 576 catch (InterruptedException e) { 577 throw new XException(ErrorCode.E0606, slaCalc.getId(), slaCalc.getLockTimeOut()); 578 } 579 finally { 580 slaCalc.releaseLock(); 581 } 582 } 583 } 584 } 585 } 586 587 /** 588 * Periodically run by the SLAService worker threads to update SLA status by 589 * iterating through all the jobs in the map 590 */ 591 @Override 592 public void updateAllSlaStatus() { 593 LOG.info("Running periodic SLA check"); 594 Iterator<String> iterator = slaMap.keySet().iterator(); 595 while (iterator.hasNext()) { 596 String jobId = iterator.next(); 597 try { 598 LOG.trace("Processing SLA for jobid={0}", jobId); 599 updateJobSla(jobId); 600 } 601 catch (Exception e) { 602 setLogPrefix(jobId); 603 LOG.error("Exception in SLA processing for job [{0}]", jobId, e); 604 LogUtils.clearLogPrefix(); 605 } 606 } 607 } 608 609 /** 610 * Register a new job into the map for SLA tracking 611 */ 612 @Override 613 public boolean addRegistration(String jobId, SLARegistrationBean reg) throws JPAExecutorException { 614 try { 615 if (slaMap.size() < capacity) { 616 SLACalcStatus slaCalc = new SLACalcStatus(reg); 617 slaCalc.setSLAStatus(SLAStatus.NOT_STARTED); 618 slaCalc.setJobStatus(getJobStatus(reg.getAppType())); 619 slaMap.put(jobId, slaCalc); 620 List<JsonBean> insertList = new ArrayList<JsonBean>(); 621 final SLASummaryBean summaryBean = new SLASummaryBean(slaCalc); 622 final Timestamp currentTime = DateUtils.convertDateToTimestamp(new Date()); 623 reg.setCreatedTimestamp(currentTime); 624 summaryBean.setCreatedTimestamp(currentTime); 625 insertList.add(reg); 626 insertList.add(summaryBean); 627 BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, null, null); 628 LOG.trace("SLA Registration Event - Job:" + jobId); 629 return true; 630 } 631 else { 632 setLogPrefix(reg.getId()); 633 LOG.error( 634 "SLACalculator memory capacity reached. Cannot add or update new SLA Registration entry for job [{0}]", 635 reg.getId()); 636 LogUtils.clearLogPrefix(); 637 } 638 } 639 catch (JPAExecutorException jpa) { 640 throw jpa; 641 } 642 return false; 643 } 644 645 private String getJobStatus(AppType appType) { 646 String status = null; 647 switch (appType) { 648 case COORDINATOR_ACTION: 649 status = CoordinatorAction.Status.WAITING.name(); 650 break; 651 case WORKFLOW_ACTION: 652 status = WorkflowAction.Status.PREP.name(); 653 break; 654 case WORKFLOW_JOB: 655 status = WorkflowJob.Status.PREP.name(); 656 break; 657 default: 658 break; 659 } 660 return status; 661 } 662 663 /** 664 * Update job into the map for SLA tracking 665 */ 666 @Override 667 public boolean updateRegistration(String jobId, SLARegistrationBean reg) throws JPAExecutorException { 668 try { 669 if (slaMap.size() < capacity) { 670 SLACalcStatus slaCalc = new SLACalcStatus(reg); 671 slaCalc.setSLAStatus(SLAStatus.NOT_STARTED); 672 slaCalc.setJobStatus(getJobStatus(reg.getAppType())); 673 slaMap.put(jobId, slaCalc); 674 List<UpdateEntry> updateList = new ArrayList<UpdateEntry>(); 675 updateList.add(new UpdateEntry<SLARegQuery>(SLARegQuery.UPDATE_SLA_REG_ALL, reg)); 676 updateList.add(new UpdateEntry<SLASummaryQuery>(SLASummaryQuery.UPDATE_SLA_SUMMARY_ALL, 677 new SLASummaryBean(slaCalc))); 678 BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null); 679 LOG.trace("SLA Registration Event - Job:" + jobId); 680 return true; 681 } 682 else { 683 setLogPrefix(reg.getId()); 684 LOG.error( 685 "SLACalculator memory capacity reached. Cannot add or update new SLA Registration entry for job [{0}]", 686 reg.getId()); 687 LogUtils.clearLogPrefix(); 688 } 689 } 690 catch (JPAExecutorException jpa) { 691 throw jpa; 692 } 693 return false; 694 } 695 696 /** 697 * Remove job from being tracked in map 698 */ 699 @Override 700 public void removeRegistration(String jobId) { 701 if (slaMap.remove(jobId) == null) { 702 historySet.remove(jobId); 703 } 704 } 705 706 /** 707 * Triggered after receiving Job status change event, update SLA status 708 * accordingly 709 */ 710 @Override 711 public boolean addJobStatus(String jobId, String jobStatus, JobEvent.EventStatus jobEventStatus, Date startTime, 712 Date endTime) throws JPAExecutorException, ServiceException { 713 SLACalcStatus slaCalc = slaMap.get(jobId); 714 SLASummaryBean slaInfo = null; 715 boolean hasSla = false; 716 if (slaCalc == null) { 717 if (historySet.contains(jobId)) { 718 slaInfo = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId); 719 if (slaInfo == null) { 720 throw new JPAExecutorException(ErrorCode.E0604, jobId); 721 } 722 slaInfo.setJobStatus(jobStatus); 723 slaInfo.setActualStart(startTime); 724 slaInfo.setActualEnd(endTime); 725 if (endTime != null) { 726 slaInfo.setActualDuration(endTime.getTime() - startTime.getTime()); 727 } 728 slaInfo.setEventProcessed(8); 729 historySet.remove(jobId); 730 slaInfo.setLastModifiedTime(new Date()); 731 SLASummaryQueryExecutor.getInstance().executeUpdate( 732 SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES, slaInfo); 733 hasSla = true; 734 } 735 else if (Services.get().get(JobsConcurrencyService.class).isHighlyAvailableMode()) { 736 // jobid might not exist in slaMap in HA Setting 737 SLARegistrationBean slaRegBean = SLARegistrationQueryExecutor.getInstance().get( 738 SLARegQuery.GET_SLA_REG_ALL, jobId); 739 if (slaRegBean != null) { // filter out jobs picked by SLA job event listener 740 // but not actually configured for SLA 741 SLASummaryBean slaSummaryBean = SLASummaryQueryExecutor.getInstance().get( 742 SLASummaryQuery.GET_SLA_SUMMARY, jobId); 743 if (slaSummaryBean.getEventProcessed() < 7) { 744 slaCalc = new SLACalcStatus(slaSummaryBean, slaRegBean); 745 slaMap.put(jobId, slaCalc); 746 } 747 } 748 } 749 } 750 if (slaCalc != null) { 751 synchronized (slaCalc) { 752 try { 753 // only get ZK lock when multiple servers running 754 boolean locked = true; 755 slaCalc.acquireLock(); 756 locked = slaCalc.isLocked(); 757 if (locked) { 758 // get eventProcessed on DB for validation in HA 759 Object eventProcObj = ((SLASummaryQueryExecutor) SLASummaryQueryExecutor.getInstance()) 760 .getSingleValue(SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED, jobId); 761 byte eventProc = ((Byte) eventProcObj).byteValue(); 762 slaCalc.setEventProcessed(eventProc); 763 slaCalc.setJobStatus(jobStatus); 764 switch (jobEventStatus) { 765 case STARTED: 766 slaInfo = processJobStartSLA(slaCalc, startTime); 767 break; 768 case SUCCESS: 769 slaInfo = processJobEndSuccessSLA(slaCalc, startTime, endTime); 770 break; 771 case FAILURE: 772 slaInfo = processJobEndFailureSLA(slaCalc, startTime, endTime); 773 break; 774 default: 775 LOG.debug("Unknown Job Status for SLA purpose[{0}]", jobEventStatus); 776 slaInfo = getSLASummaryBean(slaCalc); 777 } 778 if (slaCalc.getEventProcessed() == 7) { 779 slaInfo.setEventProcessed(8); 780 slaMap.remove(jobId); 781 } 782 slaInfo.setLastModifiedTime(new Date()); 783 SLASummaryQueryExecutor.getInstance().executeUpdate( 784 SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES, slaInfo); 785 hasSla = true; 786 } 787 } 788 catch (InterruptedException e) { 789 throw new ServiceException(ErrorCode.E0606, slaCalc.getEntityKey(), slaCalc.getLockTimeOut()); 790 } 791 finally { 792 slaCalc.releaseLock(); 793 } 794 } 795 LOG.trace("SLA Status Event - Job:" + jobId + " Status:" + slaCalc.getSLAStatus()); 796 } 797 798 return hasSla; 799 } 800 801 /** 802 * Process SLA for jobs that started running. Also update actual-start time 803 * 804 * @param slaCalc 805 * @param actualStart 806 * @return SLASummaryBean 807 */ 808 private SLASummaryBean processJobStartSLA(SLACalcStatus slaCalc, Date actualStart) { 809 slaCalc.setActualStart(actualStart); 810 if (slaCalc.getSLAStatus().equals(SLAStatus.NOT_STARTED)) { 811 slaCalc.setSLAStatus(SLAStatus.IN_PROCESS); 812 } 813 SLARegistrationBean reg = slaCalc.getSLARegistrationBean(); 814 Date expecStart = reg.getExpectedStart(); 815 byte eventProc = slaCalc.getEventProcessed(); 816 // set event proc here 817 if (((eventProc & 1) == 0)) { 818 if (expecStart != null) { 819 if (actualStart.getTime() > expecStart.getTime()) { 820 slaCalc.setEventStatus(EventStatus.START_MISS); 821 } 822 else { 823 slaCalc.setEventStatus(EventStatus.START_MET); 824 } 825 eventHandler.queueEvent(new SLACalcStatus(slaCalc)); 826 } 827 eventProc += 1; 828 slaCalc.setEventProcessed(eventProc); 829 } 830 return getSLASummaryBean(slaCalc); 831 } 832 833 /** 834 * Process SLA for jobs that ended successfully. Also update actual-start 835 * and end time 836 * 837 * @param slaCalc 838 * @param actualStart 839 * @param actualEnd 840 * @return SLASummaryBean 841 * @throws JPAExecutorException 842 */ 843 private SLASummaryBean processJobEndSuccessSLA(SLACalcStatus slaCalc, Date actualStart, Date actualEnd) throws JPAExecutorException { 844 SLARegistrationBean reg = slaCalc.getSLARegistrationBean(); 845 slaCalc.setActualStart(actualStart); 846 slaCalc.setActualEnd(actualEnd); 847 long expectedDuration = reg.getExpectedDuration(); 848 long actualDuration = actualEnd.getTime() - actualStart.getTime(); 849 slaCalc.setActualDuration(actualDuration); 850 //check event proc 851 byte eventProc = slaCalc.getEventProcessed(); 852 if (((eventProc >> 1) & 1) == 0) { 853 processDurationSLA(expectedDuration, actualDuration, slaCalc); 854 eventProc += 2; 855 slaCalc.setEventProcessed(eventProc); 856 } 857 858 if (eventProc < 4) { 859 Date expectedEnd = reg.getExpectedEnd(); 860 if (actualEnd.getTime() > expectedEnd.getTime()) { 861 slaCalc.setEventStatus(EventStatus.END_MISS); 862 slaCalc.setSLAStatus(SLAStatus.MISS); 863 } 864 else { 865 slaCalc.setEventStatus(EventStatus.END_MET); 866 slaCalc.setSLAStatus(SLAStatus.MET); 867 } 868 eventProc += 4; 869 slaCalc.setEventProcessed(eventProc); 870 eventHandler.queueEvent(new SLACalcStatus(slaCalc)); 871 } 872 return getSLASummaryBean(slaCalc); 873 } 874 875 /** 876 * Process SLA for jobs that ended in failure. Also update actual-start and 877 * end time 878 * 879 * @param slaCalc 880 * @param actualStart 881 * @param actualEnd 882 * @return SLASummaryBean 883 * @throws JPAExecutorException 884 */ 885 private SLASummaryBean processJobEndFailureSLA(SLACalcStatus slaCalc, Date actualStart, Date actualEnd) throws JPAExecutorException { 886 slaCalc.setActualStart(actualStart); 887 slaCalc.setActualEnd(actualEnd); 888 if (actualStart == null) { // job failed before starting 889 if (slaCalc.getEventProcessed() < 4) { 890 slaCalc.setEventStatus(EventStatus.END_MISS); 891 slaCalc.setSLAStatus(SLAStatus.MISS); 892 eventHandler.queueEvent(new SLACalcStatus(slaCalc)); 893 slaCalc.setEventProcessed(7); 894 return getSLASummaryBean(slaCalc); 895 } 896 } 897 SLARegistrationBean reg = slaCalc.getSLARegistrationBean(); 898 long expectedDuration = reg.getExpectedDuration(); 899 long actualDuration = actualEnd.getTime() - actualStart.getTime(); 900 slaCalc.setActualDuration(actualDuration); 901 902 byte eventProc = slaCalc.getEventProcessed(); 903 if (((eventProc >> 1) & 1) == 0) { 904 if (expectedDuration != -1) { 905 slaCalc.setEventStatus(EventStatus.DURATION_MISS); 906 eventHandler.queueEvent(new SLACalcStatus(slaCalc)); 907 } 908 eventProc += 2; 909 slaCalc.setEventProcessed(eventProc); 910 } 911 if (eventProc < 4) { 912 slaCalc.setEventStatus(EventStatus.END_MISS); 913 slaCalc.setSLAStatus(SLAStatus.MISS); 914 eventProc += 4; 915 slaCalc.setEventProcessed(eventProc); 916 eventHandler.queueEvent(new SLACalcStatus(slaCalc)); 917 } 918 return getSLASummaryBean(slaCalc); 919 } 920 921 private SLASummaryBean getSLASummaryBean (SLACalcStatus slaCalc) { 922 SLASummaryBean slaSummaryBean = new SLASummaryBean(); 923 slaSummaryBean.setActualStart(slaCalc.getActualStart()); 924 slaSummaryBean.setActualEnd(slaCalc.getActualEnd()); 925 slaSummaryBean.setActualDuration(slaCalc.getActualDuration()); 926 slaSummaryBean.setSLAStatus(slaCalc.getSLAStatus()); 927 slaSummaryBean.setEventStatus(slaCalc.getEventStatus()); 928 slaSummaryBean.setEventProcessed(slaCalc.getEventProcessed()); 929 slaSummaryBean.setId(slaCalc.getId()); 930 slaSummaryBean.setJobStatus(slaCalc.getJobStatus()); 931 return slaSummaryBean; 932 } 933 934 private void processDurationSLA(long expected, long actual, SLACalcStatus slaCalc) { 935 if (expected != -1 && actual > expected) { 936 slaCalc.setEventStatus(EventStatus.DURATION_MISS); 937 eventHandler.queueEvent(new SLACalcStatus(slaCalc)); 938 } 939 else if (expected != -1 && actual <= expected) { 940 slaCalc.setEventStatus(EventStatus.DURATION_MET); 941 eventHandler.queueEvent(new SLACalcStatus(slaCalc)); 942 } 943 } 944 945 /* 946 * Confirm alerts against source of truth - DB. Also required in case of High Availability 947 */ 948 private void confirmWithDB(SLACalcStatus slaCalc) { 949 boolean ended = false, isEndMiss = false; 950 try { 951 switch (slaCalc.getAppType()) { 952 case WORKFLOW_JOB: 953 WorkflowJobBean wf = jpaService.execute(new WorkflowJobGetForSLAJPAExecutor(slaCalc.getId())); 954 if (wf.getEndTime() != null) { 955 ended = true; 956 if (wf.getStatus() == WorkflowJob.Status.KILLED || wf.getStatus() == WorkflowJob.Status.FAILED 957 || wf.getEndTime().getTime() > slaCalc.getExpectedEnd().getTime()) { 958 isEndMiss = true; 959 } 960 } 961 slaCalc.setActualStart(wf.getStartTime()); 962 slaCalc.setActualEnd(wf.getEndTime()); 963 slaCalc.setJobStatus(wf.getStatusStr()); 964 break; 965 case WORKFLOW_ACTION: 966 WorkflowActionBean wa = jpaService.execute(new WorkflowActionGetForSLAJPAExecutor(slaCalc.getId())); 967 if (wa.getEndTime() != null) { 968 ended = true; 969 if (wa.isTerminalWithFailure() 970 || wa.getEndTime().getTime() > slaCalc.getExpectedEnd().getTime()) { 971 isEndMiss = true; 972 } 973 } 974 slaCalc.setActualStart(wa.getStartTime()); 975 slaCalc.setActualEnd(wa.getEndTime()); 976 slaCalc.setJobStatus(wa.getStatusStr()); 977 break; 978 case COORDINATOR_ACTION: 979 CoordinatorActionBean ca = jpaService.execute(new CoordActionGetForSLAJPAExecutor(slaCalc.getId())); 980 if (ca.isTerminalWithFailure()) { 981 isEndMiss = ended = true; 982 slaCalc.setActualEnd(ca.getLastModifiedTime()); 983 } 984 if (ca.getExternalId() != null) { 985 wf = jpaService.execute(new WorkflowJobGetForSLAJPAExecutor(ca.getExternalId())); 986 if (wf.getEndTime() != null) { 987 ended = true; 988 if (wf.getEndTime().getTime() > slaCalc.getExpectedEnd().getTime()) { 989 isEndMiss = true; 990 } 991 } 992 slaCalc.setActualEnd(wf.getEndTime()); 993 slaCalc.setActualStart(wf.getStartTime()); 994 } 995 slaCalc.setJobStatus(ca.getStatusStr()); 996 break; 997 default: 998 LOG.debug("Unsupported App-type for SLA - " + slaCalc.getAppType()); 999 } 1000 1001 byte eventProc = slaCalc.getEventProcessed(); 1002 if (ended) { 1003 if (isEndMiss) { 1004 slaCalc.setSLAStatus(SLAStatus.MISS); 1005 } 1006 else { 1007 slaCalc.setSLAStatus(SLAStatus.MET); 1008 } 1009 if (slaCalc.getActualStart() != null) { 1010 if ((eventProc & 1) == 0) { 1011 if (slaCalc.getExpectedStart().getTime() < slaCalc.getActualStart().getTime()) { 1012 slaCalc.setEventStatus(EventStatus.START_MISS); 1013 } 1014 else { 1015 slaCalc.setEventStatus(EventStatus.START_MET); 1016 } 1017 eventHandler.queueEvent(new SLACalcStatus(slaCalc)); 1018 } 1019 slaCalc.setActualDuration(slaCalc.getActualEnd().getTime() - slaCalc.getActualStart().getTime()); 1020 if (((eventProc >> 1) & 1) == 0) { 1021 processDurationSLA(slaCalc.getExpectedDuration(), slaCalc.getActualDuration(), slaCalc); 1022 } 1023 } 1024 if (eventProc < 4) { 1025 if (isEndMiss) { 1026 slaCalc.setEventStatus(EventStatus.END_MISS); 1027 } 1028 else { 1029 slaCalc.setEventStatus(EventStatus.END_MET); 1030 } 1031 eventHandler.queueEvent(new SLACalcStatus(slaCalc)); 1032 } 1033 slaCalc.setEventProcessed(8); 1034 } 1035 else { 1036 if (slaCalc.getActualStart() != null) { 1037 slaCalc.setSLAStatus(SLAStatus.IN_PROCESS); 1038 } 1039 if ((eventProc & 1) == 0) { 1040 if (slaCalc.getActualStart() != null) { 1041 if (slaCalc.getExpectedStart().getTime() < slaCalc.getActualStart().getTime()) { 1042 slaCalc.setEventStatus(EventStatus.START_MISS); 1043 } 1044 else { 1045 slaCalc.setEventStatus(EventStatus.START_MET); 1046 } 1047 eventHandler.queueEvent(new SLACalcStatus(slaCalc)); 1048 eventProc++; 1049 } 1050 else if (slaCalc.getExpectedStart().getTime() < System.currentTimeMillis()) { 1051 slaCalc.setEventStatus(EventStatus.START_MISS); 1052 eventHandler.queueEvent(new SLACalcStatus(slaCalc)); 1053 eventProc++; 1054 } 1055 } 1056 if (((eventProc >> 1) & 1) == 0 && slaCalc.getActualStart() != null 1057 && slaCalc.getExpectedDuration() != -1) { 1058 if (System.currentTimeMillis() - slaCalc.getActualStart().getTime() > slaCalc.getExpectedDuration()) { 1059 slaCalc.setEventStatus(EventStatus.DURATION_MISS); 1060 eventHandler.queueEvent(new SLACalcStatus(slaCalc)); 1061 eventProc += 2; 1062 } 1063 } 1064 if (eventProc < 4 && slaCalc.getExpectedEnd().getTime() < System.currentTimeMillis()) { 1065 slaCalc.setEventStatus(EventStatus.END_MISS); 1066 slaCalc.setSLAStatus(SLAStatus.MISS); 1067 eventHandler.queueEvent(new SLACalcStatus(slaCalc)); 1068 eventProc += 4; 1069 } 1070 slaCalc.setEventProcessed(eventProc); 1071 } 1072 } 1073 catch (Exception e) { 1074 LOG.warn("Error while confirming SLA against DB for jobid= " + slaCalc.getId() + ". Exception is " 1075 + e.getClass().getName() + ": " + e.getMessage()); 1076 if (slaCalc.getEventProcessed() < 4 && slaCalc.getExpectedEnd().getTime() < System.currentTimeMillis()) { 1077 slaCalc.setEventStatus(EventStatus.END_MISS); 1078 slaCalc.setSLAStatus(SLAStatus.MISS); 1079 eventHandler.queueEvent(new SLACalcStatus(slaCalc)); 1080 slaCalc.setEventProcessed(slaCalc.getEventProcessed() + 4); 1081 } 1082 } 1083 } 1084 1085 @VisibleForTesting 1086 public boolean isJobIdInSLAMap(String jobId) { 1087 return this.slaMap.containsKey(jobId); 1088 } 1089 1090 @VisibleForTesting 1091 public boolean isJobIdInHistorySet(String jobId) { 1092 return this.historySet.contains(jobId); 1093 } 1094 1095 private void setLogPrefix(String jobId) { 1096 LOG = LogUtils.setLogInfo(LOG, jobId, null, null); 1097 } 1098}