001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.sla; 020 021import java.sql.Timestamp; 022import java.util.ArrayList; 023import java.util.Collections; 024import java.util.Date; 025import java.util.HashSet; 026import java.util.Iterator; 027import java.util.List; 028import java.util.Map; 029import java.util.Set; 030import java.util.concurrent.ConcurrentHashMap; 031 032import org.apache.hadoop.conf.Configuration; 033import org.apache.oozie.AppType; 034import org.apache.oozie.ErrorCode; 035import org.apache.oozie.XException; 036import org.apache.oozie.client.CoordinatorAction; 037import org.apache.oozie.client.OozieClient; 038import org.apache.oozie.client.WorkflowAction; 039import org.apache.oozie.client.WorkflowJob; 040import org.apache.oozie.client.event.JobEvent; 041import org.apache.oozie.client.event.SLAEvent.SLAStatus; 042import org.apache.oozie.client.rest.JsonBean; 043import org.apache.oozie.client.rest.RestConstants; 044import org.apache.oozie.command.CommandException; 045import org.apache.oozie.executor.jpa.BatchQueryExecutor; 046import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry; 047import org.apache.oozie.executor.jpa.JPAExecutorException; 048import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor; 049import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor.SLARegQuery; 050import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor; 051import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery; 052import org.apache.oozie.executor.jpa.sla.SLASummaryGetRecordsOnRestartJPAExecutor; 053import org.apache.oozie.service.ConfigurationService; 054import org.apache.oozie.service.EventHandlerService; 055import org.apache.oozie.service.InstrumentationService; 056import org.apache.oozie.service.JPAService; 057import org.apache.oozie.service.SchedulerService; 058import org.apache.oozie.service.ServiceException; 059import org.apache.oozie.service.Services; 060import org.apache.oozie.sla.service.SLAService; 061 062import com.google.common.annotations.VisibleForTesting; 063import org.apache.oozie.util.DateUtils; 064import org.apache.oozie.util.Instrumentation; 065import org.apache.oozie.util.LogUtils; 066import org.apache.oozie.util.Pair; 067import org.apache.oozie.util.XLog; 068 069/** 070 * Implementation class for SLACalculator that calculates SLA related to 071 * start/end/duration of jobs using a memory-based map 072 */ 073public class SLACalculatorMemory implements SLACalculator { 074 075 private static XLog LOG = XLog.getLog(SLACalculatorMemory.class); 076 // TODO optimization priority based insertion/processing/bumping up-down 077 protected Map<String, SLACalcStatus> slaMap; 078 protected Set<String> historySet; 079 private static int capacity; 080 private static JPAService jpaService; 081 protected EventHandlerService eventHandler; 082 private static int modifiedAfter; 083 private static long jobEventLatency; 084 private Instrumentation instrumentation; 085 public static final String INSTRUMENTATION_GROUP = "sla-calculator"; 086 public static final String SLA_MAP = "sla-map"; 087 088 @Override 089 public void init(Configuration conf) throws ServiceException { 090 capacity = ConfigurationService.getInt(conf, SLAService.CONF_CAPACITY); 091 jobEventLatency = ConfigurationService.getInt(conf, SLAService.CONF_JOB_EVENT_LATENCY); 092 slaMap = new ConcurrentHashMap<String, SLACalcStatus>(); 093 historySet = Collections.synchronizedSet(new HashSet<String>()); 094 jpaService = Services.get().get(JPAService.class); 095 eventHandler = Services.get().get(EventHandlerService.class); 096 instrumentation = Services.get().get(InstrumentationService.class).get(); 097 // load events modified after 098 modifiedAfter = conf.getInt(SLAService.CONF_EVENTS_MODIFIED_AFTER, 7); 099 loadOnRestart(); 100 Runnable purgeThread = new HistoryPurgeWorker(); 101 // schedule runnable by default 1 hours 102 Services.get() 103 .get(SchedulerService.class) 104 .schedule(purgeThread, 3600, Services.get().getConf().getInt(SLAService.CONF_SLA_HISTORY_PURGE_INTERVAL, 3600), 105 SchedulerService.Unit.SEC); 106 } 107 108 public class HistoryPurgeWorker extends Thread { 109 110 public HistoryPurgeWorker() { 111 } 112 113 @Override 114 public void run() { 115 if (Thread.currentThread().isInterrupted()) { 116 return; 117 } 118 Iterator<String> jobItr = historySet.iterator(); 119 while (jobItr.hasNext()) { 120 String jobId = jobItr.next(); 121 LOG.debug(" Running HistoryPurgeWorker for " + jobId); 122 try { 123 boolean isDone = SLAXCommandFactory.getSLAJobHistoryXCommand(jobId).call(); 124 if (isDone) { 125 LOG.debug("[{0}] job is finished and processed. Removing from history"); 126 jobItr.remove(); 127 } 128 } 129 catch (CommandException e) { 130 if (e.getErrorCode().equals(ErrorCode.E0604) || e.getErrorCode().equals(ErrorCode.E0605)) { 131 LOG.warn("Job is not found in db: " + jobId, e); 132 jobItr.remove(); 133 } 134 else { 135 LOG.error("Failed to fetch the job: " + jobId, e); 136 } 137 } 138 } 139 } 140 } 141 142 private void loadOnRestart() { 143 try { 144 List<SLASummaryBean> summaryBeans = jpaService 145 .execute(new SLASummaryGetRecordsOnRestartJPAExecutor(modifiedAfter)); 146 for (SLASummaryBean summaryBean : summaryBeans) { 147 String jobId = summaryBean.getId(); 148 putAndIncrement(jobId, new SLACalcStatus(summaryBean)); 149 } 150 LOG.info("Loaded {0} SLASummary object after restart", slaMap.size()); 151 } 152 catch (Exception e) { 153 LOG.warn("Failed to retrieve SLASummary records on restart", e); 154 } 155 } 156 157 @Override 158 public int size() { 159 return slaMap.size(); 160 } 161 162 @VisibleForTesting 163 public Set<String> getHistorySet(){ 164 return historySet; 165 } 166 167 @Override 168 public SLACalcStatus get(String jobId) throws JPAExecutorException { 169 SLACalcStatus memObj; 170 memObj = slaMap.get(jobId); 171 if (memObj == null && historySet.contains(jobId)) { 172 memObj = new SLACalcStatus(SLASummaryQueryExecutor.getInstance() 173 .get(SLASummaryQuery.GET_SLA_SUMMARY, jobId), SLARegistrationQueryExecutor.getInstance().get( 174 SLARegQuery.GET_SLA_REG_ON_RESTART, jobId)); 175 } 176 return memObj; 177 } 178 179 /** 180 * Get SLACalcStatus from map if SLARegistration is not null, else create a new SLACalcStatus 181 * This function deosn't update slaMap 182 * @param jobId 183 * @return SLACalcStatus returns SLACalcStatus from map if SLARegistration is not null, 184 * else create a new SLACalcStatus 185 * @throws JPAExecutorException 186 */ 187 private SLACalcStatus getOrCreateSLACalcStatus(String jobId) throws JPAExecutorException { 188 SLACalcStatus memObj; 189 memObj = slaMap.get(jobId); 190 // if the request came from immediately after restart don't use map SLACalcStatus. 191 if (memObj == null || memObj.getSLARegistrationBean() == null) { 192 SLARegistrationBean registrationBean = SLARegistrationQueryExecutor.getInstance() 193 .get(SLARegQuery.GET_SLA_REG_ON_RESTART, jobId); 194 SLASummaryBean summaryBean = memObj == null 195 ? SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId) 196 : memObj.getSLASummaryBean(); 197 return new SLACalcStatus(summaryBean, registrationBean); 198 } 199 return memObj; 200 } 201 202 @Override 203 public Iterator<String> iterator() { 204 return slaMap.keySet().iterator(); 205 } 206 207 @Override 208 public boolean isEmpty() { 209 return slaMap.isEmpty(); 210 } 211 212 @Override 213 public void clear() { 214 final int originalSize = slaMap.size(); 215 slaMap.clear(); 216 historySet.clear(); 217 instrumentation.decr(INSTRUMENTATION_GROUP, SLA_MAP, originalSize); 218 } 219 220 /** 221 * Invoked via periodic run, update the SLA for registered jobs 222 */ 223 protected void updateJobSla(String jobId) throws Exception { 224 SLACalcStatus slaCalc = slaMap.get(jobId); 225 226 if (slaCalc == null) { 227 // job might be processed and removed from map by addJobStatus 228 return; 229 } 230 boolean firstCheckAfterRetstart = checkAndUpdateSLACalcAfterRestart(slaCalc); 231 // get eventProcessed on DB for validation in HA 232 SLASummaryBean summaryBean = null; 233 try { 234 summaryBean = ((SLASummaryQueryExecutor) SLASummaryQueryExecutor.getInstance()) 235 .get(SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED_LAST_MODIFIED, jobId); 236 } 237 catch (JPAExecutorException e) { 238 if (e.getErrorCode().equals(ErrorCode.E0604) || e.getErrorCode().equals(ErrorCode.E0605)) { 239 LOG.debug("job [{0}] is is not in DB, removing from Memory", jobId); 240 removeAndDecrement(jobId); 241 return; 242 } 243 throw e; 244 } 245 byte eventProc = summaryBean.getEventProcessed(); 246 slaCalc.setEventProcessed(eventProc); 247 if (eventProc >= 7) { 248 if (eventProc == 7) { 249 historySet.add(jobId); 250 } 251 removeAndDecrement(jobId); 252 LOG.trace("Removed Job [{0}] from map as SLA processed", jobId); 253 } 254 else { 255 if (!slaCalc.getLastModifiedTime().equals(summaryBean.getLastModifiedTime())) { 256 // Update last modified time. 257 slaCalc.setLastModifiedTime(summaryBean.getLastModifiedTime()); 258 reloadExpectedTimeAndConfig(slaCalc); 259 LOG.debug("Last modified time has changed for job " + jobId + " reloading config from DB"); 260 } 261 if (firstCheckAfterRetstart || isChanged(slaCalc)) { 262 LOG.debug("{0} job has SLA event change. EventProc = {1}, status = {2}", slaCalc.getId(), 263 slaCalc.getEventProcessed(), slaCalc.getJobStatus()); 264 try { 265 SLAXCommandFactory.getSLAEventXCommand(slaCalc).call(); 266 checkEventProc(slaCalc); 267 } 268 catch (XException e) { 269 if (e.getErrorCode().equals(ErrorCode.E0604) || e.getErrorCode().equals(ErrorCode.E0605)) { 270 LOG.debug("job [{0}] is is not in DB, removing from Memory", slaCalc.getId()); 271 removeAndDecrement(jobId); 272 } 273 else { 274 if (firstCheckAfterRetstart) { 275 slaCalc.setSLARegistrationBean(null); 276 } 277 } 278 } 279 } 280 } 281 } 282 283 private boolean isChanged(SLACalcStatus slaCalc) { 284 SLARegistrationBean reg = slaCalc.getSLARegistrationBean(); 285 byte eventProc = slaCalc.getEventProcessed(); 286 287 if ((eventProc & 1) == 0) { // first bit (start-processed) unset 288 if (reg.getExpectedStart() != null) { 289 if (reg.getExpectedStart().getTime() + jobEventLatency < System.currentTimeMillis()) { 290 return true; 291 } 292 } 293 else { 294 return true; 295 } 296 } 297 if (eventProc != 8 && ((eventProc >> 1) & 1) == 0) { 298 if (reg.getExpectedDuration() == -1) { 299 return true; 300 } 301 else if (slaCalc.getActualStart() != null) { 302 if ((reg.getExpectedDuration() + jobEventLatency) < (System.currentTimeMillis() - slaCalc 303 .getActualStart().getTime())) { 304 return true; 305 } 306 } 307 } 308 if (eventProc < 4) { 309 if (reg.getExpectedEnd().getTime() + jobEventLatency < System.currentTimeMillis()) { 310 return true; 311 } 312 } 313 return false; 314 } 315 316 @SuppressWarnings("rawtypes") 317 private void updateDBSlaConfig(SLACalcStatus slaCalc, List<UpdateEntry> updateList) throws JPAExecutorException { 318 updateList.add(new UpdateEntry<SLARegQuery>(SLARegQuery.UPDATE_SLA_CONFIG, slaCalc.getSLARegistrationBean())); 319 slaCalc.setLastModifiedTime(new Date()); 320 updateList.add(new UpdateEntry<SLASummaryQuery>(SLASummaryQuery.UPDATE_SLA_SUMMARY_LAST_MODIFIED_TIME, 321 new SLASummaryBean(slaCalc))); 322 } 323 324 @SuppressWarnings("rawtypes") 325 private void updateDBSlaExpectedValues(SLACalcStatus slaCalc, List<UpdateEntry> updateList) 326 throws JPAExecutorException { 327 slaCalc.setLastModifiedTime(new Date()); 328 updateList.add(new UpdateEntry<SLARegQuery>(SLARegQuery.UPDATE_SLA_EXPECTED_VALUE, slaCalc 329 .getSLARegistrationBean())); 330 updateList.add(new UpdateEntry<SLASummaryQuery>(SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_EXPECTED_TIMES, 331 new SLASummaryBean(slaCalc))); 332 } 333 334 @SuppressWarnings("rawtypes") 335 private void executeBatchQuery(List<UpdateEntry> updateList) throws JPAExecutorException { 336 BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null); 337 } 338 339 /** 340 * Periodically run by the SLAService worker threads to update SLA status by 341 * iterating through all the jobs in the map 342 */ 343 @Override 344 public void updateAllSlaStatus() { 345 LOG.info("Running periodic SLA check"); 346 Iterator<String> iterator = slaMap.keySet().iterator(); 347 while (iterator.hasNext()) { 348 String jobId = iterator.next(); 349 try { 350 LOG.trace("Processing SLA for jobid={0}", jobId); 351 updateJobSla(jobId); 352 } 353 catch (Exception e) { 354 setLogPrefix(jobId); 355 LOG.error("Exception in SLA processing for job [{0}]", jobId, e); 356 LogUtils.clearLogPrefix(); 357 } 358 } 359 } 360 361 /** 362 * Register a new job into the map for SLA tracking 363 * @return true if successful 364 */ 365 @Override 366 public boolean addRegistration(String jobId, SLARegistrationBean reg) throws JPAExecutorException { 367 try { 368 if (slaMap.size() < capacity) { 369 SLACalcStatus slaCalc = new SLACalcStatus(reg); 370 slaCalc.setSLAStatus(SLAStatus.NOT_STARTED); 371 slaCalc.setJobStatus(getJobStatus(reg.getAppType())); 372 putAndIncrement(jobId, slaCalc); 373 List<JsonBean> insertList = new ArrayList<JsonBean>(); 374 final SLASummaryBean summaryBean = new SLASummaryBean(slaCalc); 375 final Timestamp currentTime = DateUtils.convertDateToTimestamp(new Date()); 376 reg.setCreatedTimestamp(currentTime); 377 summaryBean.setCreatedTimestamp(currentTime); 378 insertList.add(reg); 379 insertList.add(summaryBean); 380 BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, null, null); 381 LOG.trace("SLA Registration Event - Job:" + jobId); 382 return true; 383 } 384 else { 385 setLogPrefix(reg.getId()); 386 LOG.error( 387 "SLACalculator memory capacity reached. Cannot add or update new SLA Registration entry for job [{0}]", 388 reg.getId()); 389 LogUtils.clearLogPrefix(); 390 } 391 } 392 catch (JPAExecutorException jpa) { 393 throw jpa; 394 } 395 return false; 396 } 397 398 private String getJobStatus(AppType appType) { 399 String status = null; 400 switch (appType) { 401 case COORDINATOR_ACTION: 402 status = CoordinatorAction.Status.WAITING.name(); 403 break; 404 case WORKFLOW_ACTION: 405 status = WorkflowAction.Status.PREP.name(); 406 break; 407 case WORKFLOW_JOB: 408 status = WorkflowJob.Status.PREP.name(); 409 break; 410 default: 411 break; 412 } 413 return status; 414 } 415 416 /** 417 * Update job into the map for SLA tracking 418 */ 419 @Override 420 public boolean updateRegistration(String jobId, SLARegistrationBean reg) throws JPAExecutorException { 421 try { 422 if (slaMap.size() < capacity) { 423 SLACalcStatus slaCalc = new SLACalcStatus(reg); 424 slaCalc.setSLAStatus(SLAStatus.NOT_STARTED); 425 slaCalc.setJobStatus(getJobStatus(reg.getAppType())); 426 putAndIncrement(jobId, slaCalc); 427 428 @SuppressWarnings("rawtypes") 429 List<UpdateEntry> updateList = new ArrayList<UpdateEntry>(); 430 updateList.add(new UpdateEntry<SLARegQuery>(SLARegQuery.UPDATE_SLA_REG_ALL, reg)); 431 updateList.add(new UpdateEntry<SLASummaryQuery>(SLASummaryQuery.UPDATE_SLA_SUMMARY_ALL, 432 new SLASummaryBean(slaCalc))); 433 BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null); 434 LOG.trace("SLA Registration Event - Job:" + jobId); 435 return true; 436 } 437 else { 438 setLogPrefix(reg.getId()); 439 LOG.error( 440 "SLACalculator memory capacity reached. Cannot add or update new SLA Registration entry for job [{0}]", 441 reg.getId()); 442 LogUtils.clearLogPrefix(); 443 } 444 } 445 catch (JPAExecutorException jpa) { 446 throw jpa; 447 } 448 return false; 449 } 450 451 /** 452 * Remove job from being tracked in map 453 */ 454 @Override 455 public void removeRegistration(String jobId) { 456 if (!removeAndDecrement(jobId)) { 457 historySet.remove(jobId); 458 } 459 } 460 461 /** 462 * Triggered after receiving Job status change event, update SLA status 463 * accordingly 464 */ 465 @Override 466 public boolean addJobStatus(String jobId, String jobStatus, JobEvent.EventStatus jobEventStatus, Date startTime, 467 Date endTime) throws JPAExecutorException, ServiceException { 468 LOG.debug( 469 "Received addJobStatus request for job [{0}] jobStatus = [{1}], jobEventStatus = [{2}], startTime = [{3}], " 470 + "endTime = [{4}] ", jobId, jobStatus, jobEventStatus, startTime, endTime); 471 SLACalcStatus slaCalc = slaMap.get(jobId); 472 boolean firstCheckAfterRetstart = checkAndUpdateSLACalcAfterRestart(slaCalc); 473 if (slaCalc == null) { 474 SLARegistrationBean slaRegBean = SLARegistrationQueryExecutor.getInstance().get( 475 SLARegQuery.GET_SLA_REG_ALL, jobId); 476 if (slaRegBean != null) { // filter out jobs picked by SLA job event listener 477 // but not actually configured for SLA 478 SLASummaryBean slaSummaryBean = SLASummaryQueryExecutor.getInstance().get( 479 SLASummaryQuery.GET_SLA_SUMMARY, jobId); 480 slaCalc = new SLACalcStatus(slaSummaryBean, slaRegBean); 481 putAndIncrement(jobId, slaCalc); 482 } 483 } 484 else { 485 SLASummaryBean summaryBean = ((SLASummaryQueryExecutor) SLASummaryQueryExecutor.getInstance()).get( 486 SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED_LAST_MODIFIED, jobId); 487 byte eventProc = summaryBean.getEventProcessed(); 488 if (!slaCalc.getLastModifiedTime().equals(summaryBean.getLastModifiedTime())) { 489 // Update last modified time. 490 slaCalc.setLastModifiedTime(summaryBean.getLastModifiedTime()); 491 reloadExpectedTimeAndConfig(slaCalc); 492 LOG.debug("Last modified time has changed for job " + jobId + " reloading config from DB"); 493 494 } 495 slaCalc.setEventProcessed(eventProc); 496 } 497 if (slaCalc != null) { 498 try { 499 SLAXCommandFactory.getSLAEventXCommand(slaCalc, 500 ConfigurationService.getLong(SLAService.CONF_SLA_CALC_LOCK_TIMEOUT, 20 * 1000)).call(); 501 checkEventProc(slaCalc); 502 } 503 catch (XException e) { 504 if (firstCheckAfterRetstart) { 505 slaCalc.setSLARegistrationBean(null); 506 } 507 LOG.error(e); 508 throw new ServiceException(e); 509 } 510 return true; 511 } 512 else { 513 return false; 514 } 515 } 516 517 private void checkEventProc(SLACalcStatus slaCalc){ 518 byte eventProc = slaCalc.getEventProcessed(); 519 if (slaCalc.getEventProcessed() >= 8) { 520 removeAndDecrement(slaCalc.getId()); 521 LOG.debug("Removed Job [{0}] from map after Event-processed=8", slaCalc.getId()); 522 } 523 if (eventProc == 7) { 524 historySet.add(slaCalc.getId()); 525 removeAndDecrement(slaCalc.getId()); 526 LOG.debug("Removed Job [{0}] from map after Event-processed=7", slaCalc.getId()); 527 } 528 } 529 530 public void reloadExpectedTimeAndConfig(SLACalcStatus slaCalc) throws JPAExecutorException { 531 SLARegistrationBean regBean = SLARegistrationQueryExecutor.getInstance().get( 532 SLARegQuery.GET_SLA_EXPECTED_VALUE_CONFIG, slaCalc.getId()); 533 534 if (regBean.getExpectedDuration() > 0) { 535 slaCalc.getSLARegistrationBean().setExpectedDuration(regBean.getExpectedDuration()); 536 } 537 if (regBean.getExpectedEnd() != null) { 538 slaCalc.getSLARegistrationBean().setExpectedEnd(regBean.getExpectedEnd()); 539 } 540 if (regBean.getExpectedStart() != null) { 541 slaCalc.getSLARegistrationBean().setExpectedStart(regBean.getExpectedStart()); 542 } 543 if (regBean.getSLAConfigMap().containsKey(OozieClient.SLA_DISABLE_ALERT)) { 544 slaCalc.getSLARegistrationBean().addToSLAConfigMap(OozieClient.SLA_DISABLE_ALERT, 545 regBean.getSLAConfigMap().get(OozieClient.SLA_DISABLE_ALERT)); 546 } 547 if (regBean.getNominalTime() != null) { 548 slaCalc.getSLARegistrationBean().setNominalTime(regBean.getNominalTime()); 549 } 550 } 551 552 @VisibleForTesting 553 public boolean isJobIdInSLAMap(String jobId) { 554 return this.slaMap.containsKey(jobId); 555 } 556 557 @VisibleForTesting 558 public boolean isJobIdInHistorySet(String jobId) { 559 return this.historySet.contains(jobId); 560 } 561 562 private void setLogPrefix(String jobId) { 563 LOG = LogUtils.setLogInfo(LOG, jobId, null, null); 564 } 565 566 @Override 567 public boolean enableAlert(List<String> jobIds) throws JPAExecutorException, ServiceException { 568 boolean isJobFound = false; 569 @SuppressWarnings("rawtypes") 570 List<UpdateEntry> updateList = new ArrayList<BatchQueryExecutor.UpdateEntry>(); 571 for (String jobId : jobIds) { 572 SLACalcStatus slaCalc = getOrCreateSLACalcStatus(jobId); 573 slaCalc.getSLARegistrationBean().removeFromSLAConfigMap(OozieClient.SLA_DISABLE_ALERT); 574 updateDBSlaConfig(slaCalc, updateList); 575 isJobFound = true; 576 } 577 executeBatchQuery(updateList); 578 return isJobFound; 579 } 580 581 @Override 582 public boolean enableChildJobAlert(List<String> parentJobIds) throws JPAExecutorException, ServiceException { 583 return enableAlert(getSLAJobsforParents(parentJobIds)); 584 } 585 586 @Override 587 public boolean disableAlert(List<String> jobIds) throws JPAExecutorException, ServiceException { 588 boolean isJobFound = false; 589 @SuppressWarnings("rawtypes") 590 List<UpdateEntry> updateList = new ArrayList<BatchQueryExecutor.UpdateEntry>(); 591 592 for (String jobId : jobIds) { 593 SLACalcStatus slaCalc = getOrCreateSLACalcStatus(jobId); 594 slaCalc.getSLARegistrationBean().addToSLAConfigMap(OozieClient.SLA_DISABLE_ALERT, Boolean.toString(true)); 595 updateDBSlaConfig(slaCalc, updateList); 596 isJobFound = true; 597 } 598 executeBatchQuery(updateList); 599 return isJobFound; 600 } 601 602 @Override 603 public boolean disableChildJobAlert(List<String> parentJobIds) throws JPAExecutorException, ServiceException { 604 return disableAlert(getSLAJobsforParents(parentJobIds)); 605 } 606 607 @Override 608 public boolean changeDefinition(List<Pair<String, Map<String, String>>> jobIdsSLAPair) throws JPAExecutorException, 609 ServiceException { 610 boolean isJobFound = false; 611 @SuppressWarnings("rawtypes") 612 List<UpdateEntry> updateList = new ArrayList<BatchQueryExecutor.UpdateEntry>(); 613 for (Pair<String, Map<String, String>> jobIdSLAPair : jobIdsSLAPair) { 614 SLACalcStatus slaCalc = getOrCreateSLACalcStatus(jobIdSLAPair.getFirst()); 615 updateParams(slaCalc, jobIdSLAPair.getSecond()); 616 updateDBSlaExpectedValues(slaCalc, updateList); 617 isJobFound = true; 618 } 619 executeBatchQuery(updateList); 620 return isJobFound; 621 } 622 623 private void updateParams(SLACalcStatus slaCalc, Map<String, String> newParams) throws ServiceException { 624 SLARegistrationBean reg = slaCalc.getSLARegistrationBean(); 625 if (newParams != null) { 626 try { 627 Date newNominal = SLAOperations.setNominalTime(newParams.get(RestConstants.SLA_NOMINAL_TIME), reg); 628 SLAOperations.setExpectedStart(newParams.get(RestConstants.SLA_SHOULD_START), newNominal, reg); 629 SLAOperations.setExpectedEnd(newParams.get(RestConstants.SLA_SHOULD_END), newNominal, reg); 630 SLAOperations.setExpectedDuration(newParams.get(RestConstants.SLA_MAX_DURATION), reg); 631 } 632 catch (CommandException ce) { 633 throw new ServiceException(ce); 634 } 635 } 636 } 637 638 private List<String> getSLAJobsforParents(List<String> parentJobIds) throws JPAExecutorException { 639 List<String> childJobIds = new ArrayList<String>(); 640 for (String jobId : parentJobIds) { 641 List<SLARegistrationBean> registrationBeanList = SLARegistrationQueryExecutor.getInstance().getList( 642 SLARegQuery.GET_SLA_REG_FOR_PARENT_ID, jobId); 643 for (SLARegistrationBean bean : registrationBeanList) { 644 childJobIds.add(bean.getId()); 645 } 646 } 647 return childJobIds; 648 } 649 650 private boolean checkAndUpdateSLACalcAfterRestart(SLACalcStatus slaCalc) throws JPAExecutorException { 651 if (slaCalc != null && slaCalc.getSLARegistrationBean() == null) { 652 return updateSLARegistartion(slaCalc); 653 } 654 return false; 655 } 656 657 public boolean updateSLARegistartion(SLACalcStatus slaCalc) throws JPAExecutorException { 658 if (slaCalc.getSLARegistrationBean() == null) { 659 synchronized (slaCalc) { 660 if (slaCalc.getSLARegistrationBean() == null) { 661 SLARegistrationBean slaRegBean = SLARegistrationQueryExecutor.getInstance() 662 .get(SLARegQuery.GET_SLA_REG_ON_RESTART, slaCalc.getId()); 663 slaCalc.updateSLARegistrationBean(slaRegBean); 664 return true; 665 } 666 } 667 } 668 return false; 669 } 670 671 private boolean putAndIncrement(final String jobId, final SLACalcStatus newStatus) { 672 if (slaMap.put(jobId, newStatus) == null) { 673 LOG.trace("Added a new item to SLA map. [jobId={0}]", jobId); 674 instrumentation.incr(INSTRUMENTATION_GROUP, SLA_MAP, 1); 675 return true; 676 } 677 678 LOG.trace("Updated an existing item in SLA map. [jobId={0}]", jobId); 679 return false; 680 } 681 682 private boolean removeAndDecrement(final String jobId) { 683 if (slaMap.remove(jobId) != null) { 684 LOG.trace("Removed an existing item from SLA map. [jobId={0}]", jobId); 685 instrumentation.decr(INSTRUMENTATION_GROUP, SLA_MAP, 1); 686 return true; 687 } 688 689 LOG.trace("Tried to remove a non-existing item from SLA map. [jobId={0}]", jobId); 690 return false; 691 } 692}