001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.sla; 020 021import java.sql.Timestamp; 022import java.util.ArrayList; 023import java.util.Collections; 024import java.util.Date; 025import java.util.HashSet; 026import java.util.Iterator; 027import java.util.List; 028import java.util.Map; 029import java.util.Set; 030import java.util.concurrent.ConcurrentHashMap; 031 032import org.apache.hadoop.conf.Configuration; 033import org.apache.oozie.AppType; 034import org.apache.oozie.ErrorCode; 035import org.apache.oozie.XException; 036import org.apache.oozie.client.CoordinatorAction; 037import org.apache.oozie.client.OozieClient; 038import org.apache.oozie.client.WorkflowAction; 039import org.apache.oozie.client.WorkflowJob; 040import org.apache.oozie.client.event.JobEvent; 041import org.apache.oozie.client.event.SLAEvent.SLAStatus; 042import org.apache.oozie.client.rest.JsonBean; 043import org.apache.oozie.client.rest.RestConstants; 044import org.apache.oozie.command.CommandException; 045import org.apache.oozie.executor.jpa.BatchQueryExecutor; 046import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry; 047import org.apache.oozie.executor.jpa.JPAExecutorException; 048import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor; 049import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor.SLARegQuery; 050import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor; 051import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery; 052import org.apache.oozie.executor.jpa.sla.SLASummaryGetRecordsOnRestartJPAExecutor; 053import org.apache.oozie.service.ConfigurationService; 054import org.apache.oozie.service.EventHandlerService; 055import org.apache.oozie.service.JPAService; 056import org.apache.oozie.service.SchedulerService; 057import org.apache.oozie.service.ServiceException; 058import org.apache.oozie.service.Services; 059import org.apache.oozie.sla.service.SLAService; 060import org.apache.oozie.util.DateUtils; 061import org.apache.oozie.util.LogUtils; 062import org.apache.oozie.util.XLog; 063import org.apache.oozie.util.Pair; 064 065import com.google.common.annotations.VisibleForTesting; 066 067/** 068 * Implementation class for SLACalculator that calculates SLA related to 069 * start/end/duration of jobs using a memory-based map 070 */ 071public class SLACalculatorMemory implements SLACalculator { 072 073 private static XLog LOG = XLog.getLog(SLACalculatorMemory.class); 074 // TODO optimization priority based insertion/processing/bumping up-down 075 protected Map<String, SLACalcStatus> slaMap; 076 protected Set<String> historySet; 077 private static int capacity; 078 private static JPAService jpaService; 079 protected EventHandlerService eventHandler; 080 private static int modifiedAfter; 081 private static long jobEventLatency; 082 083 @Override 084 public void init(Configuration conf) throws ServiceException { 085 capacity = ConfigurationService.getInt(conf, SLAService.CONF_CAPACITY); 086 jobEventLatency = ConfigurationService.getInt(conf, SLAService.CONF_JOB_EVENT_LATENCY); 087 slaMap = new ConcurrentHashMap<String, SLACalcStatus>(); 088 historySet = Collections.synchronizedSet(new HashSet<String>()); 089 jpaService = Services.get().get(JPAService.class); 090 eventHandler = Services.get().get(EventHandlerService.class); 091 // load events modified after 092 modifiedAfter = conf.getInt(SLAService.CONF_EVENTS_MODIFIED_AFTER, 7); 093 loadOnRestart(); 094 Runnable purgeThread = new HistoryPurgeWorker(); 095 // schedule runnable by default 1 hours 096 Services.get() 097 .get(SchedulerService.class) 098 .schedule(purgeThread, 3600, Services.get().getConf().getInt(SLAService.CONF_SLA_HISTORY_PURGE_INTERVAL, 3600), 099 SchedulerService.Unit.SEC); 100 } 101 102 public class HistoryPurgeWorker extends Thread { 103 104 public HistoryPurgeWorker() { 105 } 106 107 @Override 108 public void run() { 109 if (Thread.currentThread().isInterrupted()) { 110 return; 111 } 112 Iterator<String> jobItr = historySet.iterator(); 113 while (jobItr.hasNext()) { 114 String jobId = jobItr.next(); 115 LOG.debug(" Running HistoryPurgeWorker for " + jobId); 116 try { 117 boolean isDone = SLAXCommandFactory.getSLAJobHistoryXCommand(jobId).call(); 118 if (isDone) { 119 LOG.debug("[{0}] job is finished and processed. Removing from history"); 120 jobItr.remove(); 121 } 122 } 123 catch (CommandException e) { 124 if (e.getErrorCode().equals(ErrorCode.E0604) || e.getErrorCode().equals(ErrorCode.E0605)) { 125 LOG.warn("Job is not found in db: " + jobId, e); 126 jobItr.remove(); 127 } 128 else { 129 LOG.error("Failed to fetch the job: " + jobId, e); 130 } 131 } 132 } 133 } 134 } 135 136 private void loadOnRestart() { 137 try { 138 List<SLASummaryBean> summaryBeans = jpaService 139 .execute(new SLASummaryGetRecordsOnRestartJPAExecutor(modifiedAfter)); 140 for (SLASummaryBean summaryBean : summaryBeans) { 141 String jobId = summaryBean.getId(); 142 slaMap.put(jobId, new SLACalcStatus(summaryBean)); 143 } 144 LOG.info("Loaded {0} SLASummary object after restart", slaMap.size()); 145 } 146 catch (Exception e) { 147 LOG.warn("Failed to retrieve SLASummary records on restart", e); 148 } 149 } 150 151 @Override 152 public int size() { 153 return slaMap.size(); 154 } 155 156 @VisibleForTesting 157 public Set<String> getHistorySet(){ 158 return historySet; 159 } 160 161 @Override 162 public SLACalcStatus get(String jobId) throws JPAExecutorException { 163 SLACalcStatus memObj; 164 memObj = slaMap.get(jobId); 165 if (memObj == null && historySet.contains(jobId)) { 166 memObj = new SLACalcStatus(SLASummaryQueryExecutor.getInstance() 167 .get(SLASummaryQuery.GET_SLA_SUMMARY, jobId), SLARegistrationQueryExecutor.getInstance().get( 168 SLARegQuery.GET_SLA_REG_ON_RESTART, jobId)); 169 } 170 return memObj; 171 } 172 173 /** 174 * Get SLACalcStatus from map if SLARegistration is not null, else create a new SLACalcStatus 175 * This function deosn't update slaMap 176 * @param jobId 177 * @return 178 * @throws JPAExecutorException 179 */ 180 private SLACalcStatus getOrCreateSLACalcStatus(String jobId) throws JPAExecutorException { 181 SLACalcStatus memObj; 182 memObj = slaMap.get(jobId); 183 // if the request came from immediately after restart don't use map SLACalcStatus. 184 if (memObj == null || memObj.getSLARegistrationBean() == null) { 185 SLARegistrationBean registrationBean = SLARegistrationQueryExecutor.getInstance() 186 .get(SLARegQuery.GET_SLA_REG_ON_RESTART, jobId); 187 SLASummaryBean summaryBean = memObj == null 188 ? SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId) 189 : memObj.getSLASummaryBean(); 190 return new SLACalcStatus(summaryBean, registrationBean); 191 } 192 return memObj; 193 } 194 195 @Override 196 public Iterator<String> iterator() { 197 return slaMap.keySet().iterator(); 198 } 199 200 @Override 201 public boolean isEmpty() { 202 return slaMap.isEmpty(); 203 } 204 205 @Override 206 public void clear() { 207 slaMap.clear(); 208 historySet.clear(); 209 } 210 211 /** 212 * Invoked via periodic run, update the SLA for registered jobs 213 */ 214 protected void updateJobSla(String jobId) throws Exception { 215 SLACalcStatus slaCalc = slaMap.get(jobId); 216 217 if (slaCalc == null) { 218 // job might be processed and removed from map by addJobStatus 219 return; 220 } 221 boolean firstCheckAfterRetstart = checkAndUpdateSLACalcAfterRestart(slaCalc); 222 // get eventProcessed on DB for validation in HA 223 SLASummaryBean summaryBean = null; 224 try { 225 summaryBean = ((SLASummaryQueryExecutor) SLASummaryQueryExecutor.getInstance()) 226 .get(SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED_LAST_MODIFIED, jobId); 227 } 228 catch (JPAExecutorException e) { 229 if (e.getErrorCode().equals(ErrorCode.E0604) || e.getErrorCode().equals(ErrorCode.E0605)) { 230 LOG.debug("job [{0}] is is not in DB, removing from Memory", jobId); 231 slaMap.remove(jobId); 232 return; 233 } 234 throw e; 235 } 236 byte eventProc = summaryBean.getEventProcessed(); 237 slaCalc.setEventProcessed(eventProc); 238 if (eventProc >= 7) { 239 if (eventProc == 7) { 240 historySet.add(jobId); 241 } 242 slaMap.remove(jobId); 243 LOG.trace("Removed Job [{0}] from map as SLA processed", jobId); 244 } 245 else { 246 if (!slaCalc.getLastModifiedTime().equals(summaryBean.getLastModifiedTime())) { 247 // Update last modified time. 248 slaCalc.setLastModifiedTime(summaryBean.getLastModifiedTime()); 249 reloadExpectedTimeAndConfig(slaCalc); 250 LOG.debug("Last modified time has changed for job " + jobId + " reloading config from DB"); 251 } 252 if (firstCheckAfterRetstart || isChanged(slaCalc)) { 253 LOG.debug("{0} job has SLA event change. EventProc = {1}, status = {2}", slaCalc.getId(), 254 slaCalc.getEventProcessed(), slaCalc.getJobStatus()); 255 try { 256 SLAXCommandFactory.getSLAEventXCommand(slaCalc).call(); 257 checkEventProc(slaCalc); 258 } 259 catch (XException e) { 260 if (e.getErrorCode().equals(ErrorCode.E0604) || e.getErrorCode().equals(ErrorCode.E0605)) { 261 LOG.debug("job [{0}] is is not in DB, removing from Memory", slaCalc.getId()); 262 slaMap.remove(jobId); 263 } 264 else { 265 if (firstCheckAfterRetstart) { 266 slaCalc.setSLARegistrationBean(null); 267 } 268 } 269 } 270 } 271 } 272 } 273 274 private boolean isChanged(SLACalcStatus slaCalc) { 275 SLARegistrationBean reg = slaCalc.getSLARegistrationBean(); 276 byte eventProc = slaCalc.getEventProcessed(); 277 278 if ((eventProc & 1) == 0) { // first bit (start-processed) unset 279 if (reg.getExpectedStart() != null) { 280 if (reg.getExpectedStart().getTime() + jobEventLatency < System.currentTimeMillis()) { 281 return true; 282 } 283 } 284 else { 285 return true; 286 } 287 } 288 if (eventProc != 8 && ((eventProc >> 1) & 1) == 0) { 289 if (reg.getExpectedDuration() == -1) { 290 return true; 291 } 292 else if (slaCalc.getActualStart() != null) { 293 if ((reg.getExpectedDuration() + jobEventLatency) < (System.currentTimeMillis() - slaCalc 294 .getActualStart().getTime())) { 295 return true; 296 } 297 } 298 } 299 if (eventProc < 4) { 300 if (reg.getExpectedEnd().getTime() + jobEventLatency < System.currentTimeMillis()) { 301 return true; 302 } 303 } 304 return false; 305 } 306 307 @SuppressWarnings("rawtypes") 308 private void updateDBSlaConfig(SLACalcStatus slaCalc, List<UpdateEntry> updateList) throws JPAExecutorException { 309 updateList.add(new UpdateEntry<SLARegQuery>(SLARegQuery.UPDATE_SLA_CONFIG, slaCalc.getSLARegistrationBean())); 310 slaCalc.setLastModifiedTime(new Date()); 311 updateList.add(new UpdateEntry<SLASummaryQuery>(SLASummaryQuery.UPDATE_SLA_SUMMARY_LAST_MODIFIED_TIME, 312 new SLASummaryBean(slaCalc))); 313 } 314 315 @SuppressWarnings("rawtypes") 316 private void updateDBSlaExpectedValues(SLACalcStatus slaCalc, List<UpdateEntry> updateList) 317 throws JPAExecutorException { 318 slaCalc.setLastModifiedTime(new Date()); 319 updateList.add(new UpdateEntry<SLARegQuery>(SLARegQuery.UPDATE_SLA_EXPECTED_VALUE, slaCalc 320 .getSLARegistrationBean())); 321 updateList.add(new UpdateEntry<SLASummaryQuery>(SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_EXPECTED_TIMES, 322 new SLASummaryBean(slaCalc))); 323 } 324 325 @SuppressWarnings("rawtypes") 326 private void executeBatchQuery(List<UpdateEntry> updateList) throws JPAExecutorException { 327 BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null); 328 } 329 330 /** 331 * Periodically run by the SLAService worker threads to update SLA status by 332 * iterating through all the jobs in the map 333 */ 334 @Override 335 public void updateAllSlaStatus() { 336 LOG.info("Running periodic SLA check"); 337 Iterator<String> iterator = slaMap.keySet().iterator(); 338 while (iterator.hasNext()) { 339 String jobId = iterator.next(); 340 try { 341 LOG.trace("Processing SLA for jobid={0}", jobId); 342 updateJobSla(jobId); 343 } 344 catch (Exception e) { 345 setLogPrefix(jobId); 346 LOG.error("Exception in SLA processing for job [{0}]", jobId, e); 347 LogUtils.clearLogPrefix(); 348 } 349 } 350 } 351 352 /** 353 * Register a new job into the map for SLA tracking 354 */ 355 @Override 356 public boolean addRegistration(String jobId, SLARegistrationBean reg) throws JPAExecutorException { 357 try { 358 if (slaMap.size() < capacity) { 359 SLACalcStatus slaCalc = new SLACalcStatus(reg); 360 slaCalc.setSLAStatus(SLAStatus.NOT_STARTED); 361 slaCalc.setJobStatus(getJobStatus(reg.getAppType())); 362 slaMap.put(jobId, slaCalc); 363 List<JsonBean> insertList = new ArrayList<JsonBean>(); 364 final SLASummaryBean summaryBean = new SLASummaryBean(slaCalc); 365 final Timestamp currentTime = DateUtils.convertDateToTimestamp(new Date()); 366 reg.setCreatedTimestamp(currentTime); 367 summaryBean.setCreatedTimestamp(currentTime); 368 insertList.add(reg); 369 insertList.add(summaryBean); 370 BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, null, null); 371 LOG.trace("SLA Registration Event - Job:" + jobId); 372 return true; 373 } 374 else { 375 setLogPrefix(reg.getId()); 376 LOG.error( 377 "SLACalculator memory capacity reached. Cannot add or update new SLA Registration entry for job [{0}]", 378 reg.getId()); 379 LogUtils.clearLogPrefix(); 380 } 381 } 382 catch (JPAExecutorException jpa) { 383 throw jpa; 384 } 385 return false; 386 } 387 388 private String getJobStatus(AppType appType) { 389 String status = null; 390 switch (appType) { 391 case COORDINATOR_ACTION: 392 status = CoordinatorAction.Status.WAITING.name(); 393 break; 394 case WORKFLOW_ACTION: 395 status = WorkflowAction.Status.PREP.name(); 396 break; 397 case WORKFLOW_JOB: 398 status = WorkflowJob.Status.PREP.name(); 399 break; 400 default: 401 break; 402 } 403 return status; 404 } 405 406 /** 407 * Update job into the map for SLA tracking 408 */ 409 @Override 410 public boolean updateRegistration(String jobId, SLARegistrationBean reg) throws JPAExecutorException { 411 try { 412 if (slaMap.size() < capacity) { 413 SLACalcStatus slaCalc = new SLACalcStatus(reg); 414 slaCalc.setSLAStatus(SLAStatus.NOT_STARTED); 415 slaCalc.setJobStatus(getJobStatus(reg.getAppType())); 416 slaMap.put(jobId, slaCalc); 417 418 @SuppressWarnings("rawtypes") 419 List<UpdateEntry> updateList = new ArrayList<UpdateEntry>(); 420 updateList.add(new UpdateEntry<SLARegQuery>(SLARegQuery.UPDATE_SLA_REG_ALL, reg)); 421 updateList.add(new UpdateEntry<SLASummaryQuery>(SLASummaryQuery.UPDATE_SLA_SUMMARY_ALL, 422 new SLASummaryBean(slaCalc))); 423 BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null); 424 LOG.trace("SLA Registration Event - Job:" + jobId); 425 return true; 426 } 427 else { 428 setLogPrefix(reg.getId()); 429 LOG.error( 430 "SLACalculator memory capacity reached. Cannot add or update new SLA Registration entry for job [{0}]", 431 reg.getId()); 432 LogUtils.clearLogPrefix(); 433 } 434 } 435 catch (JPAExecutorException jpa) { 436 throw jpa; 437 } 438 return false; 439 } 440 441 /** 442 * Remove job from being tracked in map 443 */ 444 @Override 445 public void removeRegistration(String jobId) { 446 if (slaMap.remove(jobId) == null) { 447 historySet.remove(jobId); 448 } 449 } 450 451 /** 452 * Triggered after receiving Job status change event, update SLA status 453 * accordingly 454 */ 455 @Override 456 public boolean addJobStatus(String jobId, String jobStatus, JobEvent.EventStatus jobEventStatus, Date startTime, 457 Date endTime) throws JPAExecutorException, ServiceException { 458 LOG.debug( 459 "Received addJobStatus request for job [{0}] jobStatus = [{1}], jobEventStatus = [{2}], startTime = [{3}], " 460 + "endTime = [{4}] ", jobId, jobStatus, jobEventStatus, startTime, endTime); 461 SLACalcStatus slaCalc = slaMap.get(jobId); 462 boolean firstCheckAfterRetstart = checkAndUpdateSLACalcAfterRestart(slaCalc); 463 if (slaCalc == null) { 464 SLARegistrationBean slaRegBean = SLARegistrationQueryExecutor.getInstance().get( 465 SLARegQuery.GET_SLA_REG_ALL, jobId); 466 if (slaRegBean != null) { // filter out jobs picked by SLA job event listener 467 // but not actually configured for SLA 468 SLASummaryBean slaSummaryBean = SLASummaryQueryExecutor.getInstance().get( 469 SLASummaryQuery.GET_SLA_SUMMARY, jobId); 470 slaCalc = new SLACalcStatus(slaSummaryBean, slaRegBean); 471 slaMap.put(jobId, slaCalc); 472 } 473 } 474 else { 475 SLASummaryBean summaryBean = ((SLASummaryQueryExecutor) SLASummaryQueryExecutor.getInstance()).get( 476 SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED_LAST_MODIFIED, jobId); 477 byte eventProc = summaryBean.getEventProcessed(); 478 if (!slaCalc.getLastModifiedTime().equals(summaryBean.getLastModifiedTime())) { 479 // Update last modified time. 480 slaCalc.setLastModifiedTime(summaryBean.getLastModifiedTime()); 481 reloadExpectedTimeAndConfig(slaCalc); 482 LOG.debug("Last modified time has changed for job " + jobId + " reloading config from DB"); 483 484 } 485 slaCalc.setEventProcessed(eventProc); 486 } 487 if (slaCalc != null) { 488 try { 489 SLAXCommandFactory.getSLAEventXCommand(slaCalc, 490 ConfigurationService.getLong(SLAService.CONF_SLA_CALC_LOCK_TIMEOUT, 20 * 1000)).call(); 491 checkEventProc(slaCalc); 492 } 493 catch (XException e) { 494 if (firstCheckAfterRetstart) { 495 slaCalc.setSLARegistrationBean(null); 496 } 497 LOG.error(e); 498 throw new ServiceException(e); 499 } 500 return true; 501 } 502 else { 503 return false; 504 } 505 } 506 507 private void checkEventProc(SLACalcStatus slaCalc){ 508 byte eventProc = slaCalc.getEventProcessed(); 509 if (slaCalc.getEventProcessed() >= 8) { 510 slaMap.remove(slaCalc.getId()); 511 LOG.debug("Removed Job [{0}] from map after Event-processed=8", slaCalc.getId()); 512 } 513 if (eventProc == 7) { 514 historySet.add(slaCalc.getId()); 515 slaMap.remove(slaCalc.getId()); 516 LOG.debug("Removed Job [{0}] from map after Event-processed=7", slaCalc.getId()); 517 } 518 } 519 520 public void reloadExpectedTimeAndConfig(SLACalcStatus slaCalc) throws JPAExecutorException { 521 SLARegistrationBean regBean = SLARegistrationQueryExecutor.getInstance().get( 522 SLARegQuery.GET_SLA_EXPECTED_VALUE_CONFIG, slaCalc.getId()); 523 524 if (regBean.getExpectedDuration() > 0) { 525 slaCalc.getSLARegistrationBean().setExpectedDuration(regBean.getExpectedDuration()); 526 } 527 if (regBean.getExpectedEnd() != null) { 528 slaCalc.getSLARegistrationBean().setExpectedEnd(regBean.getExpectedEnd()); 529 } 530 if (regBean.getExpectedStart() != null) { 531 slaCalc.getSLARegistrationBean().setExpectedStart(regBean.getExpectedStart()); 532 } 533 if (regBean.getSLAConfigMap().containsKey(OozieClient.SLA_DISABLE_ALERT)) { 534 slaCalc.getSLARegistrationBean().addToSLAConfigMap(OozieClient.SLA_DISABLE_ALERT, 535 regBean.getSLAConfigMap().get(OozieClient.SLA_DISABLE_ALERT)); 536 } 537 if (regBean.getNominalTime() != null) { 538 slaCalc.getSLARegistrationBean().setNominalTime(regBean.getNominalTime()); 539 } 540 } 541 542 @VisibleForTesting 543 public boolean isJobIdInSLAMap(String jobId) { 544 return this.slaMap.containsKey(jobId); 545 } 546 547 @VisibleForTesting 548 public boolean isJobIdInHistorySet(String jobId) { 549 return this.historySet.contains(jobId); 550 } 551 552 private void setLogPrefix(String jobId) { 553 LOG = LogUtils.setLogInfo(LOG, jobId, null, null); 554 } 555 556 @Override 557 public boolean enableAlert(List<String> jobIds) throws JPAExecutorException, ServiceException { 558 boolean isJobFound = false; 559 @SuppressWarnings("rawtypes") 560 List<UpdateEntry> updateList = new ArrayList<BatchQueryExecutor.UpdateEntry>(); 561 for (String jobId : jobIds) { 562 SLACalcStatus slaCalc = getOrCreateSLACalcStatus(jobId); 563 slaCalc.getSLARegistrationBean().removeFromSLAConfigMap(OozieClient.SLA_DISABLE_ALERT); 564 updateDBSlaConfig(slaCalc, updateList); 565 isJobFound = true; 566 } 567 executeBatchQuery(updateList); 568 return isJobFound; 569 } 570 571 @Override 572 public boolean enableChildJobAlert(List<String> parentJobIds) throws JPAExecutorException, ServiceException { 573 return enableAlert(getSLAJobsforParents(parentJobIds)); 574 } 575 576 @Override 577 public boolean disableAlert(List<String> jobIds) throws JPAExecutorException, ServiceException { 578 boolean isJobFound = false; 579 @SuppressWarnings("rawtypes") 580 List<UpdateEntry> updateList = new ArrayList<BatchQueryExecutor.UpdateEntry>(); 581 582 for (String jobId : jobIds) { 583 SLACalcStatus slaCalc = getOrCreateSLACalcStatus(jobId); 584 slaCalc.getSLARegistrationBean().addToSLAConfigMap(OozieClient.SLA_DISABLE_ALERT, Boolean.toString(true)); 585 updateDBSlaConfig(slaCalc, updateList); 586 isJobFound = true; 587 } 588 executeBatchQuery(updateList); 589 return isJobFound; 590 } 591 592 @Override 593 public boolean disableChildJobAlert(List<String> parentJobIds) throws JPAExecutorException, ServiceException { 594 return disableAlert(getSLAJobsforParents(parentJobIds)); 595 } 596 597 @Override 598 public boolean changeDefinition(List<Pair<String, Map<String, String>>> jobIdsSLAPair) throws JPAExecutorException, 599 ServiceException { 600 boolean isJobFound = false; 601 @SuppressWarnings("rawtypes") 602 List<UpdateEntry> updateList = new ArrayList<BatchQueryExecutor.UpdateEntry>(); 603 for (Pair<String, Map<String, String>> jobIdSLAPair : jobIdsSLAPair) { 604 SLACalcStatus slaCalc = getOrCreateSLACalcStatus(jobIdSLAPair.getFist()); 605 updateParams(slaCalc, jobIdSLAPair.getSecond()); 606 updateDBSlaExpectedValues(slaCalc, updateList); 607 isJobFound = true; 608 } 609 executeBatchQuery(updateList); 610 return isJobFound; 611 } 612 613 private void updateParams(SLACalcStatus slaCalc, Map<String, String> newParams) throws ServiceException { 614 SLARegistrationBean reg = slaCalc.getSLARegistrationBean(); 615 if (newParams != null) { 616 try { 617 Date newNominal = SLAOperations.setNominalTime(newParams.get(RestConstants.SLA_NOMINAL_TIME), reg); 618 SLAOperations.setExpectedStart(newParams.get(RestConstants.SLA_SHOULD_START), newNominal, reg); 619 SLAOperations.setExpectedEnd(newParams.get(RestConstants.SLA_SHOULD_END), newNominal, reg); 620 SLAOperations.setExpectedDuration(newParams.get(RestConstants.SLA_MAX_DURATION), reg); 621 } 622 catch (CommandException ce) { 623 throw new ServiceException(ce); 624 } 625 } 626 } 627 628 private List<String> getSLAJobsforParents(List<String> parentJobIds) throws JPAExecutorException { 629 List<String> childJobIds = new ArrayList<String>(); 630 for (String jobId : parentJobIds) { 631 List<SLARegistrationBean> registrationBeanList = SLARegistrationQueryExecutor.getInstance().getList( 632 SLARegQuery.GET_SLA_REG_FOR_PARENT_ID, jobId); 633 for (SLARegistrationBean bean : registrationBeanList) { 634 childJobIds.add(bean.getId()); 635 } 636 } 637 return childJobIds; 638 } 639 640 private boolean checkAndUpdateSLACalcAfterRestart(SLACalcStatus slaCalc) throws JPAExecutorException { 641 if (slaCalc != null && slaCalc.getSLARegistrationBean() == null) { 642 return updateSLARegistartion(slaCalc); 643 } 644 return false; 645 } 646 647 public boolean updateSLARegistartion(SLACalcStatus slaCalc) throws JPAExecutorException { 648 if (slaCalc.getSLARegistrationBean() == null) { 649 synchronized (slaCalc) { 650 if (slaCalc.getSLARegistrationBean() == null) { 651 SLARegistrationBean slaRegBean = SLARegistrationQueryExecutor.getInstance() 652 .get(SLARegQuery.GET_SLA_REG_ON_RESTART, slaCalc.getId()); 653 slaCalc.updateSLARegistrationBean(slaRegBean); 654 return true; 655 } 656 } 657 } 658 return false; 659 } 660}