001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.sla; 020 021import java.sql.Timestamp; 022import java.util.ArrayList; 023import java.util.Collections; 024import java.util.Date; 025import java.util.HashSet; 026import java.util.Iterator; 027import java.util.List; 028import java.util.Map; 029import java.util.Set; 030import java.util.concurrent.ConcurrentHashMap; 031 032import org.apache.hadoop.conf.Configuration; 033import org.apache.oozie.AppType; 034import org.apache.oozie.ErrorCode; 035import org.apache.oozie.XException; 036import org.apache.oozie.client.CoordinatorAction; 037import org.apache.oozie.client.OozieClient; 038import org.apache.oozie.client.WorkflowAction; 039import org.apache.oozie.client.WorkflowJob; 040import org.apache.oozie.client.event.JobEvent; 041import org.apache.oozie.client.event.SLAEvent.SLAStatus; 042import org.apache.oozie.client.rest.JsonBean; 043import org.apache.oozie.client.rest.RestConstants; 044import org.apache.oozie.command.CommandException; 045import org.apache.oozie.executor.jpa.BatchQueryExecutor; 046import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry; 047import org.apache.oozie.executor.jpa.JPAExecutorException; 048import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor; 049import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor.SLARegQuery; 050import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor; 051import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery; 052import org.apache.oozie.executor.jpa.sla.SLASummaryGetRecordsOnRestartJPAExecutor; 053import org.apache.oozie.service.ConfigurationService; 054import org.apache.oozie.service.EventHandlerService; 055import org.apache.oozie.service.JPAService; 056import org.apache.oozie.service.SchedulerService; 057import org.apache.oozie.service.ServiceException; 058import org.apache.oozie.service.Services; 059import org.apache.oozie.sla.service.SLAService; 060import org.apache.oozie.util.DateUtils; 061import org.apache.oozie.util.LogUtils; 062import org.apache.oozie.util.XLog; 063import org.apache.oozie.util.Pair; 064 065import com.google.common.annotations.VisibleForTesting; 066 067/** 068 * Implementation class for SLACalculator that calculates SLA related to 069 * start/end/duration of jobs using a memory-based map 070 */ 071public class SLACalculatorMemory implements SLACalculator { 072 073 private static XLog LOG = XLog.getLog(SLACalculatorMemory.class); 074 // TODO optimization priority based insertion/processing/bumping up-down 075 protected Map<String, SLACalcStatus> slaMap; 076 protected Set<String> historySet; 077 private static int capacity; 078 private static JPAService jpaService; 079 protected EventHandlerService eventHandler; 080 private static int modifiedAfter; 081 private static long jobEventLatency; 082 083 @Override 084 public void init(Configuration conf) throws ServiceException { 085 capacity = ConfigurationService.getInt(conf, SLAService.CONF_CAPACITY); 086 jobEventLatency = ConfigurationService.getInt(conf, SLAService.CONF_JOB_EVENT_LATENCY); 087 slaMap = new ConcurrentHashMap<String, SLACalcStatus>(); 088 historySet = Collections.synchronizedSet(new HashSet<String>()); 089 jpaService = Services.get().get(JPAService.class); 090 eventHandler = Services.get().get(EventHandlerService.class); 091 // load events modified after 092 modifiedAfter = conf.getInt(SLAService.CONF_EVENTS_MODIFIED_AFTER, 7); 093 loadOnRestart(); 094 Runnable purgeThread = new HistoryPurgeWorker(); 095 // schedule runnable by default 1 hours 096 Services.get() 097 .get(SchedulerService.class) 098 .schedule(purgeThread, 3600, Services.get().getConf().getInt(SLAService.CONF_SLA_HISTORY_PURGE_INTERVAL, 3600), 099 SchedulerService.Unit.SEC); 100 } 101 102 public class HistoryPurgeWorker extends Thread { 103 104 public HistoryPurgeWorker() { 105 } 106 107 @Override 108 public void run() { 109 if (Thread.currentThread().isInterrupted()) { 110 return; 111 } 112 Iterator<String> jobItr = historySet.iterator(); 113 while (jobItr.hasNext()) { 114 String jobId = jobItr.next(); 115 LOG.debug(" Running HistoryPurgeWorker for " + jobId); 116 try { 117 boolean isDone = SLAXCommandFactory.getSLAJobHistoryXCommand(jobId).call(); 118 if (isDone) { 119 LOG.debug("[{0}] job is finished and processed. Removing from history"); 120 jobItr.remove(); 121 } 122 } 123 catch (CommandException e) { 124 if (e.getErrorCode().equals(ErrorCode.E0604) || e.getErrorCode().equals(ErrorCode.E0605)) { 125 LOG.warn("Job is not found in db: " + jobId, e); 126 jobItr.remove(); 127 } 128 else { 129 LOG.error("Failed to fetch the job: " + jobId, e); 130 } 131 } 132 } 133 } 134 } 135 136 private void loadOnRestart() { 137 long slaPendingCount = 0; 138 long statusPendingCount = 0; 139 140 try { 141 List<SLASummaryBean> summaryBeans = jpaService.execute(new SLASummaryGetRecordsOnRestartJPAExecutor( 142 modifiedAfter)); 143 for (SLASummaryBean summaryBean : summaryBeans) { 144 String jobId = summaryBean.getId(); 145 146 SLARegistrationBean slaRegBean = SLARegistrationQueryExecutor.getInstance().get( 147 SLARegQuery.GET_SLA_REG_ON_RESTART, jobId); 148 SLACalcStatus slaCalcStatus = new SLACalcStatus(summaryBean, slaRegBean); 149 150 // Processed missed jobs 151 try { 152 SLAXCommandFactory.getSLAEventXCommand(slaCalcStatus).call(); 153 } 154 catch (Throwable e) { 155 LOG.error("Error while updating job {0}", slaCalcStatus.getId(), e); 156 } 157 158 if (slaCalcStatus.getEventProcessed() == 7) { 159 historySet.add(jobId); 160 statusPendingCount++; 161 LOG.debug("Adding job [{0}] to historySet. EventProcessed is [{1}]", slaCalcStatus, 162 slaCalcStatus); 163 } 164 else if (slaCalcStatus.getEventProcessed() < 7) { 165 slaMap.put(jobId, slaCalcStatus); 166 slaPendingCount++; 167 LOG.debug("Adding job [{0}] to slamap. EventProcessed is [{1}]", slaCalcStatus, 168 slaCalcStatus); 169 170 } 171 } 172 LOG.info("Loaded SLASummary pendingSLA=" + slaPendingCount + ", pendingStatusUpdate=" + statusPendingCount); 173 } 174 catch (Exception e) { 175 LOG.warn("Failed to retrieve SLASummary records on restart", e); 176 } 177 } 178 179 @Override 180 public int size() { 181 return slaMap.size(); 182 } 183 184 @VisibleForTesting 185 public Set<String> getHistorySet(){ 186 return historySet; 187 } 188 189 @Override 190 public SLACalcStatus get(String jobId) throws JPAExecutorException { 191 SLACalcStatus memObj; 192 memObj = slaMap.get(jobId); 193 if (memObj == null && historySet.contains(jobId)) { 194 memObj = new SLACalcStatus(SLASummaryQueryExecutor.getInstance() 195 .get(SLASummaryQuery.GET_SLA_SUMMARY, jobId), SLARegistrationQueryExecutor.getInstance().get( 196 SLARegQuery.GET_SLA_REG_ON_RESTART, jobId)); 197 } 198 return memObj; 199 } 200 201 private SLACalcStatus getSLACalcStatus(String jobId) throws JPAExecutorException { 202 SLACalcStatus memObj; 203 memObj = slaMap.get(jobId); 204 if (memObj == null) { 205 memObj = new SLACalcStatus(SLASummaryQueryExecutor.getInstance() 206 .get(SLASummaryQuery.GET_SLA_SUMMARY, jobId), SLARegistrationQueryExecutor.getInstance().get( 207 SLARegQuery.GET_SLA_REG_ON_RESTART, jobId)); 208 } 209 return memObj; 210 } 211 212 @Override 213 public Iterator<String> iterator() { 214 return slaMap.keySet().iterator(); 215 } 216 217 @Override 218 public boolean isEmpty() { 219 return slaMap.isEmpty(); 220 } 221 222 @Override 223 public void clear() { 224 slaMap.clear(); 225 historySet.clear(); 226 } 227 228 /** 229 * Invoked via periodic run, update the SLA for registered jobs 230 */ 231 protected void updateJobSla(String jobId) throws Exception { 232 SLACalcStatus slaCalc = slaMap.get(jobId); 233 234 if (slaCalc == null) { 235 // job might be processed and removed from map by addJobStatus 236 return; 237 } 238 synchronized (slaCalc) { 239 // get eventProcessed on DB for validation in HA 240 SLASummaryBean summaryBean = null; 241 try { 242 summaryBean = ((SLASummaryQueryExecutor) SLASummaryQueryExecutor.getInstance()).get( 243 SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED_LAST_MODIFIED, jobId); 244 } 245 catch (JPAExecutorException e) { 246 if (e.getErrorCode().equals(ErrorCode.E0604) || e.getErrorCode().equals(ErrorCode.E0605)) { 247 LOG.debug("job [{0}] is is not in DB, removing from Memory", jobId); 248 slaMap.remove(jobId); 249 return; 250 } 251 throw e; 252 } 253 byte eventProc = summaryBean.getEventProcessed(); 254 slaCalc.setEventProcessed(eventProc); 255 if (eventProc >= 7) { 256 if (eventProc == 7) { 257 historySet.add(jobId); 258 } 259 slaMap.remove(jobId); 260 LOG.trace("Removed Job [{0}] from map as SLA processed", jobId); 261 } 262 else { 263 if (!slaCalc.getLastModifiedTime().equals(summaryBean.getLastModifiedTime())) { 264 // Update last modified time. 265 slaCalc.setLastModifiedTime(summaryBean.getLastModifiedTime()); 266 reloadExpectedTimeAndConfig(slaCalc); 267 LOG.debug("Last modified time has changed for job " + jobId + " reloading config from DB"); 268 } 269 if (isChanged(slaCalc)) { 270 LOG.debug("{0} job has SLA event change. EventProc = {1}, status = {2}", slaCalc.getId(), 271 slaCalc.getEventProcessed(), slaCalc.getJobStatus()); 272 try { 273 SLAXCommandFactory.getSLAEventXCommand(slaCalc).call(); 274 checkEventProc(slaCalc); 275 } 276 catch (XException e) { 277 if (e.getErrorCode().equals(ErrorCode.E0604) || e.getErrorCode().equals(ErrorCode.E0605)) { 278 LOG.debug("job [{0}] is is not in DB, removing from Memory", slaCalc.getId()); 279 slaMap.remove(jobId); 280 } 281 } 282 } 283 284 } 285 } 286 } 287 288 private boolean isChanged(SLACalcStatus slaCalc) { 289 SLARegistrationBean reg = slaCalc.getSLARegistrationBean(); 290 byte eventProc = slaCalc.getEventProcessed(); 291 292 if ((eventProc & 1) == 0) { // first bit (start-processed) unset 293 if (reg.getExpectedStart() != null) { 294 if (reg.getExpectedStart().getTime() + jobEventLatency < System.currentTimeMillis()) { 295 return true; 296 } 297 } 298 else { 299 return true; 300 } 301 } 302 if (eventProc != 8 && ((eventProc >> 1) & 1) == 0) { 303 if (reg.getExpectedDuration() == -1) { 304 return true; 305 } 306 else if (slaCalc.getActualStart() != null) { 307 if ((reg.getExpectedDuration() + jobEventLatency) < (System.currentTimeMillis() - slaCalc 308 .getActualStart().getTime())) { 309 return true; 310 } 311 } 312 } 313 if (eventProc < 4) { 314 if (reg.getExpectedEnd().getTime() + jobEventLatency < System.currentTimeMillis()) { 315 return true; 316 } 317 } 318 return false; 319 } 320 321 @SuppressWarnings("rawtypes") 322 private void updateDBSlaConfig(SLACalcStatus slaCalc, List<UpdateEntry> updateList) throws JPAExecutorException { 323 updateList.add(new UpdateEntry<SLARegQuery>(SLARegQuery.UPDATE_SLA_CONFIG, slaCalc.getSLARegistrationBean())); 324 slaCalc.setLastModifiedTime(new Date()); 325 updateList.add(new UpdateEntry<SLASummaryQuery>(SLASummaryQuery.UPDATE_SLA_SUMMARY_LAST_MODIFIED_TIME, 326 new SLASummaryBean(slaCalc))); 327 } 328 329 @SuppressWarnings("rawtypes") 330 private void updateDBSlaExpectedValues(SLACalcStatus slaCalc, List<UpdateEntry> updateList) 331 throws JPAExecutorException { 332 slaCalc.setLastModifiedTime(new Date()); 333 updateList.add(new UpdateEntry<SLARegQuery>(SLARegQuery.UPDATE_SLA_EXPECTED_VALUE, slaCalc 334 .getSLARegistrationBean())); 335 updateList.add(new UpdateEntry<SLASummaryQuery>(SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_EXPECTED_TIMES, 336 new SLASummaryBean(slaCalc))); 337 } 338 339 @SuppressWarnings("rawtypes") 340 private void executeBatchQuery(List<UpdateEntry> updateList) throws JPAExecutorException { 341 BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null); 342 } 343 344 /** 345 * Periodically run by the SLAService worker threads to update SLA status by 346 * iterating through all the jobs in the map 347 */ 348 @Override 349 public void updateAllSlaStatus() { 350 LOG.info("Running periodic SLA check"); 351 Iterator<String> iterator = slaMap.keySet().iterator(); 352 while (iterator.hasNext()) { 353 String jobId = iterator.next(); 354 try { 355 LOG.trace("Processing SLA for jobid={0}", jobId); 356 updateJobSla(jobId); 357 } 358 catch (Exception e) { 359 setLogPrefix(jobId); 360 LOG.error("Exception in SLA processing for job [{0}]", jobId, e); 361 LogUtils.clearLogPrefix(); 362 } 363 } 364 } 365 366 /** 367 * Register a new job into the map for SLA tracking 368 */ 369 @Override 370 public boolean addRegistration(String jobId, SLARegistrationBean reg) throws JPAExecutorException { 371 try { 372 if (slaMap.size() < capacity) { 373 SLACalcStatus slaCalc = new SLACalcStatus(reg); 374 slaCalc.setSLAStatus(SLAStatus.NOT_STARTED); 375 slaCalc.setJobStatus(getJobStatus(reg.getAppType())); 376 slaMap.put(jobId, slaCalc); 377 List<JsonBean> insertList = new ArrayList<JsonBean>(); 378 final SLASummaryBean summaryBean = new SLASummaryBean(slaCalc); 379 final Timestamp currentTime = DateUtils.convertDateToTimestamp(new Date()); 380 reg.setCreatedTimestamp(currentTime); 381 summaryBean.setCreatedTimestamp(currentTime); 382 insertList.add(reg); 383 insertList.add(summaryBean); 384 BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, null, null); 385 LOG.trace("SLA Registration Event - Job:" + jobId); 386 return true; 387 } 388 else { 389 setLogPrefix(reg.getId()); 390 LOG.error( 391 "SLACalculator memory capacity reached. Cannot add or update new SLA Registration entry for job [{0}]", 392 reg.getId()); 393 LogUtils.clearLogPrefix(); 394 } 395 } 396 catch (JPAExecutorException jpa) { 397 throw jpa; 398 } 399 return false; 400 } 401 402 private String getJobStatus(AppType appType) { 403 String status = null; 404 switch (appType) { 405 case COORDINATOR_ACTION: 406 status = CoordinatorAction.Status.WAITING.name(); 407 break; 408 case WORKFLOW_ACTION: 409 status = WorkflowAction.Status.PREP.name(); 410 break; 411 case WORKFLOW_JOB: 412 status = WorkflowJob.Status.PREP.name(); 413 break; 414 default: 415 break; 416 } 417 return status; 418 } 419 420 /** 421 * Update job into the map for SLA tracking 422 */ 423 @Override 424 public boolean updateRegistration(String jobId, SLARegistrationBean reg) throws JPAExecutorException { 425 try { 426 if (slaMap.size() < capacity) { 427 SLACalcStatus slaCalc = new SLACalcStatus(reg); 428 slaCalc.setSLAStatus(SLAStatus.NOT_STARTED); 429 slaCalc.setJobStatus(getJobStatus(reg.getAppType())); 430 slaMap.put(jobId, slaCalc); 431 432 @SuppressWarnings("rawtypes") 433 List<UpdateEntry> updateList = new ArrayList<UpdateEntry>(); 434 updateList.add(new UpdateEntry<SLARegQuery>(SLARegQuery.UPDATE_SLA_REG_ALL, reg)); 435 updateList.add(new UpdateEntry<SLASummaryQuery>(SLASummaryQuery.UPDATE_SLA_SUMMARY_ALL, 436 new SLASummaryBean(slaCalc))); 437 BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null); 438 LOG.trace("SLA Registration Event - Job:" + jobId); 439 return true; 440 } 441 else { 442 setLogPrefix(reg.getId()); 443 LOG.error( 444 "SLACalculator memory capacity reached. Cannot add or update new SLA Registration entry for job [{0}]", 445 reg.getId()); 446 LogUtils.clearLogPrefix(); 447 } 448 } 449 catch (JPAExecutorException jpa) { 450 throw jpa; 451 } 452 return false; 453 } 454 455 /** 456 * Remove job from being tracked in map 457 */ 458 @Override 459 public void removeRegistration(String jobId) { 460 if (slaMap.remove(jobId) == null) { 461 historySet.remove(jobId); 462 } 463 } 464 465 /** 466 * Triggered after receiving Job status change event, update SLA status 467 * accordingly 468 */ 469 @Override 470 public boolean addJobStatus(String jobId, String jobStatus, JobEvent.EventStatus jobEventStatus, Date startTime, 471 Date endTime) throws JPAExecutorException, ServiceException { 472 LOG.debug( 473 "Received addJobStatus request for job [{0}] jobStatus = [{1}], jobEventStatus = [{2}], startTime = [{3}], " 474 + "endTime = [{4}] ", jobId, jobStatus, jobEventStatus, startTime, endTime); 475 SLACalcStatus slaCalc = slaMap.get(jobId); 476 477 if (slaCalc == null) { 478 SLARegistrationBean slaRegBean = SLARegistrationQueryExecutor.getInstance().get( 479 SLARegQuery.GET_SLA_REG_ALL, jobId); 480 if (slaRegBean != null) { // filter out jobs picked by SLA job event listener 481 // but not actually configured for SLA 482 SLASummaryBean slaSummaryBean = SLASummaryQueryExecutor.getInstance().get( 483 SLASummaryQuery.GET_SLA_SUMMARY, jobId); 484 slaCalc = new SLACalcStatus(slaSummaryBean, slaRegBean); 485 slaMap.put(jobId, slaCalc); 486 } 487 } 488 else { 489 SLASummaryBean summaryBean = ((SLASummaryQueryExecutor) SLASummaryQueryExecutor.getInstance()).get( 490 SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED_LAST_MODIFIED, jobId); 491 byte eventProc = summaryBean.getEventProcessed(); 492 if (!slaCalc.getLastModifiedTime().equals(summaryBean.getLastModifiedTime())) { 493 // Update last modified time. 494 slaCalc.setLastModifiedTime(summaryBean.getLastModifiedTime()); 495 reloadExpectedTimeAndConfig(slaCalc); 496 LOG.debug("Last modified time has changed for job " + jobId + " reloading config from DB"); 497 498 } 499 slaCalc.setEventProcessed(eventProc); 500 } 501 if (slaCalc != null) { 502 try { 503 SLAXCommandFactory.getSLAEventXCommand(slaCalc, 504 ConfigurationService.getLong(SLAService.CONF_SLA_CALC_LOCK_TIMEOUT, 20 * 1000)).call(); 505 checkEventProc(slaCalc); 506 } 507 catch (XException e) { 508 LOG.error(e); 509 throw new ServiceException(e); 510 } 511 return true; 512 } 513 else { 514 return false; 515 } 516 } 517 518 private void checkEventProc(SLACalcStatus slaCalc){ 519 byte eventProc = slaCalc.getEventProcessed(); 520 if (slaCalc.getEventProcessed() >= 8) { 521 slaMap.remove(slaCalc.getId()); 522 LOG.debug("Removed Job [{0}] from map after Event-processed=8", slaCalc.getId()); 523 } 524 if (eventProc == 7) { 525 historySet.add(slaCalc.getId()); 526 slaMap.remove(slaCalc.getId()); 527 LOG.debug("Removed Job [{0}] from map after Event-processed=7", slaCalc.getId()); 528 } 529 } 530 531 public void reloadExpectedTimeAndConfig(SLACalcStatus slaCalc) throws JPAExecutorException { 532 SLARegistrationBean regBean = SLARegistrationQueryExecutor.getInstance().get( 533 SLARegQuery.GET_SLA_EXPECTED_VALUE_CONFIG, slaCalc.getId()); 534 535 if (regBean.getExpectedDuration() > 0) { 536 slaCalc.getSLARegistrationBean().setExpectedDuration(regBean.getExpectedDuration()); 537 } 538 if (regBean.getExpectedEnd() != null) { 539 slaCalc.getSLARegistrationBean().setExpectedEnd(regBean.getExpectedEnd()); 540 } 541 if (regBean.getExpectedStart() != null) { 542 slaCalc.getSLARegistrationBean().setExpectedStart(regBean.getExpectedStart()); 543 } 544 if (regBean.getSLAConfigMap().containsKey(OozieClient.SLA_DISABLE_ALERT)) { 545 slaCalc.getSLARegistrationBean().addToSLAConfigMap(OozieClient.SLA_DISABLE_ALERT, 546 regBean.getSLAConfigMap().get(OozieClient.SLA_DISABLE_ALERT)); 547 } 548 if (regBean.getNominalTime() != null) { 549 slaCalc.getSLARegistrationBean().setNominalTime(regBean.getNominalTime()); 550 } 551 } 552 553 @VisibleForTesting 554 public boolean isJobIdInSLAMap(String jobId) { 555 return this.slaMap.containsKey(jobId); 556 } 557 558 @VisibleForTesting 559 public boolean isJobIdInHistorySet(String jobId) { 560 return this.historySet.contains(jobId); 561 } 562 563 private void setLogPrefix(String jobId) { 564 LOG = LogUtils.setLogInfo(LOG, jobId, null, null); 565 } 566 567 @Override 568 public boolean enableAlert(List<String> jobIds) throws JPAExecutorException, ServiceException { 569 boolean isJobFound = false; 570 @SuppressWarnings("rawtypes") 571 List<UpdateEntry> updateList = new ArrayList<BatchQueryExecutor.UpdateEntry>(); 572 for (String jobId : jobIds) { 573 SLACalcStatus slaCalc = getSLACalcStatus(jobId); 574 if (slaCalc != null) { 575 slaCalc.getSLARegistrationBean().removeFromSLAConfigMap(OozieClient.SLA_DISABLE_ALERT); 576 updateDBSlaConfig(slaCalc, updateList); 577 isJobFound = true; 578 } 579 } 580 executeBatchQuery(updateList); 581 return isJobFound; 582 } 583 584 @Override 585 public boolean enableChildJobAlert(List<String> parentJobIds) throws JPAExecutorException, ServiceException { 586 return enableAlert(getSLAJobsforParents(parentJobIds)); 587 } 588 589 @Override 590 public boolean disableAlert(List<String> jobIds) throws JPAExecutorException, ServiceException { 591 boolean isJobFound = false; 592 @SuppressWarnings("rawtypes") 593 List<UpdateEntry> updateList = new ArrayList<BatchQueryExecutor.UpdateEntry>(); 594 595 for (String jobId : jobIds) { 596 SLACalcStatus slaCalc = getSLACalcStatus(jobId); 597 if (slaCalc != null) { 598 slaCalc.getSLARegistrationBean().addToSLAConfigMap(OozieClient.SLA_DISABLE_ALERT, 599 Boolean.toString(true)); 600 updateDBSlaConfig(slaCalc, updateList); 601 isJobFound = true; 602 } 603 } 604 executeBatchQuery(updateList); 605 return isJobFound; 606 } 607 608 @Override 609 public boolean disableChildJobAlert(List<String> parentJobIds) throws JPAExecutorException, ServiceException { 610 return disableAlert(getSLAJobsforParents(parentJobIds)); 611 } 612 613 @Override 614 public boolean changeDefinition(List<Pair<String, Map<String, String>>> jobIdsSLAPair) throws JPAExecutorException, 615 ServiceException { 616 boolean isJobFound = false; 617 @SuppressWarnings("rawtypes") 618 List<UpdateEntry> updateList = new ArrayList<BatchQueryExecutor.UpdateEntry>(); 619 for (Pair<String, Map<String, String>> jobIdSLAPair : jobIdsSLAPair) { 620 SLACalcStatus slaCalc = getSLACalcStatus(jobIdSLAPair.getFist()); 621 if (slaCalc != null) { 622 updateParams(slaCalc, jobIdSLAPair.getSecond()); 623 updateDBSlaExpectedValues(slaCalc, updateList); 624 isJobFound = true; 625 } 626 } 627 executeBatchQuery(updateList); 628 return isJobFound; 629 } 630 631 private void updateParams(SLACalcStatus slaCalc, Map<String, String> newParams) throws ServiceException { 632 SLARegistrationBean reg = slaCalc.getSLARegistrationBean(); 633 if (newParams != null) { 634 try { 635 Date newNominal = SLAOperations.setNominalTime(newParams.get(RestConstants.SLA_NOMINAL_TIME), reg); 636 SLAOperations.setExpectedStart(newParams.get(RestConstants.SLA_SHOULD_START), newNominal, reg); 637 SLAOperations.setExpectedEnd(newParams.get(RestConstants.SLA_SHOULD_END), newNominal, reg); 638 SLAOperations.setExpectedDuration(newParams.get(RestConstants.SLA_MAX_DURATION), reg); 639 } 640 catch (CommandException ce) { 641 throw new ServiceException(ce); 642 } 643 } 644 } 645 646 private List<String> getSLAJobsforParents(List<String> parentJobIds) throws JPAExecutorException { 647 List<String> childJobIds = new ArrayList<String>(); 648 for (String jobId : parentJobIds) { 649 List<SLARegistrationBean> registrationBeanList = SLARegistrationQueryExecutor.getInstance().getList( 650 SLARegQuery.GET_SLA_REG_FOR_PARENT_ID, jobId); 651 for (SLARegistrationBean bean : registrationBeanList) { 652 childJobIds.add(bean.getId()); 653 } 654 } 655 return childJobIds; 656 } 657 658}