This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.sla;
019
020 import java.util.ArrayList;
021 import java.util.Collections;
022 import java.util.Date;
023 import java.util.HashSet;
024 import java.util.Iterator;
025 import java.util.List;
026 import java.util.Map;
027 import java.util.Set;
028 import java.util.concurrent.ConcurrentHashMap;
029
030 import org.apache.hadoop.conf.Configuration;
031 import org.apache.oozie.AppType;
032 import org.apache.oozie.CoordinatorActionBean;
033 import org.apache.oozie.ErrorCode;
034 import org.apache.oozie.WorkflowActionBean;
035 import org.apache.oozie.WorkflowJobBean;
036 import org.apache.oozie.client.CoordinatorAction;
037 import org.apache.oozie.client.WorkflowAction;
038 import org.apache.oozie.client.WorkflowJob;
039 import org.apache.oozie.client.event.JobEvent;
040 import org.apache.oozie.client.event.SLAEvent.EventStatus;
041 import org.apache.oozie.client.event.SLAEvent.SLAStatus;
042 import org.apache.oozie.client.rest.JsonBean;
043 import org.apache.oozie.executor.jpa.CoordActionGetForSLAJPAExecutor;
044 import org.apache.oozie.executor.jpa.JPAExecutorException;
045 import org.apache.oozie.executor.jpa.WorkflowActionGetForSLAJPAExecutor;
046 import org.apache.oozie.executor.jpa.WorkflowJobGetForSLAJPAExecutor;
047
048 import org.apache.oozie.executor.jpa.sla.SLACalculationInsertUpdateJPAExecutor;
049 import org.apache.oozie.executor.jpa.sla.SLARegistrationGetOnRestartJPAExecutor;
050 import org.apache.oozie.executor.jpa.sla.SLASummaryGetJPAExecutor;
051 import org.apache.oozie.executor.jpa.sla.SLASummaryGetRecordsOnRestartJPAExecutor;
052 import org.apache.oozie.executor.jpa.sla.SLASummaryUpdateForSLAStatusActualTimesJPAExecutor;
053 import org.apache.oozie.service.EventHandlerService;
054 import org.apache.oozie.service.JPAService;
055 import org.apache.oozie.service.ServiceException;
056 import org.apache.oozie.service.Services;
057 import org.apache.oozie.sla.service.SLAService;
058 import org.apache.oozie.util.XLog;
059
060
061 /**
062 * Implementation class for SLACalculator that calculates SLA related to
063 * start/end/duration of jobs using a memory-based map
064 */
065 public class SLACalculatorMemory implements SLACalculator {
066
067 private static final XLog LOG = XLog.getLog(SLACalculatorMemory.class);
068 // TODO optimization priority based insertion/processing/bumping up-down
069 private static Map<String, SLACalcStatus> slaMap;
070 private static Set<String> historySet;
071 private static int capacity;
072 private static JPAService jpaService;
073 private EventHandlerService eventHandler;
074 private static int modifiedAfter;
075 private static long jobEventLatency;
076
077 @Override
078 public void init(Configuration conf) throws ServiceException {
079 capacity = conf.getInt(SLAService.CONF_CAPACITY, 5000);
080 jobEventLatency = conf.getInt(SLAService.CONF_JOB_EVENT_LATENCY, 90 * 1000);
081 slaMap = new ConcurrentHashMap<String, SLACalcStatus>();
082 historySet = Collections.synchronizedSet(new HashSet<String>());
083 jpaService = Services.get().get(JPAService.class);
084 eventHandler = Services.get().get(EventHandlerService.class);
085 // load events modified after
086 modifiedAfter = conf.getInt(SLAService.CONF_EVENTS_MODIFIED_AFTER, 7);
087 loadOnRestart();
088
089 }
090
091 private void loadOnRestart() {
092 boolean isJobModified = false;
093 try {
094 long slaPendingCount = 0;
095 long statusPendingCount = 0;
096 List<SLASummaryBean> summaryBeans = jpaService.execute(new SLASummaryGetRecordsOnRestartJPAExecutor(
097 modifiedAfter));
098 for (SLASummaryBean summaryBean : summaryBeans) {
099 String jobId = summaryBean.getId();
100 try {
101 switch (summaryBean.getAppType()) {
102 case COORDINATOR_ACTION:
103 isJobModified = processSummaryBeanForCoordAction(summaryBean, jobId);
104 break;
105 case WORKFLOW_ACTION:
106 isJobModified = processSummaryBeanForWorkflowAction(summaryBean, jobId);
107 break;
108 case WORKFLOW_JOB:
109 isJobModified = processSummaryBeanForWorkflowJob(summaryBean, jobId);
110 break;
111 default:
112 break;
113 }
114 if (isJobModified) {
115 jpaService.execute(new SLASummaryUpdateForSLAStatusActualTimesJPAExecutor(summaryBean));
116 }
117 }
118 catch (Exception e) {
119 LOG.warn("Failed to load records for " + jobId, e);
120 }
121 try {
122 if (summaryBean.getEventProcessed() == 7) {
123 historySet.add(jobId);
124 statusPendingCount++;
125 }
126 else if (summaryBean.getEventProcessed() <= 7) {
127 SLARegistrationBean slaRegBean = jpaService.execute(new SLARegistrationGetOnRestartJPAExecutor(
128 jobId));
129 SLACalcStatus slaCalcStatus = new SLACalcStatus(summaryBean, slaRegBean);
130 slaMap.put(jobId, slaCalcStatus);
131 slaPendingCount++;
132 }
133 }
134 catch (Exception e) {
135 LOG.warn("Failed to fetch/update records for " + jobId, e);
136 }
137
138 }
139 LOG.info("Loaded SLASummary pendingSLA=" + slaPendingCount + ", pendingStatusUpdate=" + statusPendingCount);
140
141 }
142 catch (Exception e) {
143 LOG.warn("Failed to retrieve SLASummary records on restart", e);
144 }
145 }
146
147 private boolean processSummaryBeanForCoordAction(SLASummaryBean summaryBean, String jobId)
148 throws JPAExecutorException {
149 boolean isJobModified = false;
150 CoordinatorActionBean coordAction = null;
151 coordAction = jpaService.execute(new CoordActionGetForSLAJPAExecutor(jobId));
152 if (!coordAction.getStatusStr().equals(summaryBean.getJobStatus())) {
153 LOG.trace("Coordinator action status is " + coordAction.getStatusStr() + " and summary bean status is "
154 + summaryBean.getJobStatus());
155 isJobModified = true;
156 summaryBean.setJobStatus(coordAction.getStatusStr());
157 if (coordAction.isTerminalStatus()) {
158 WorkflowJobBean wfJob = jpaService.execute(new WorkflowJobGetForSLAJPAExecutor(coordAction
159 .getExternalId()));
160 setEndForSLASummaryBean(summaryBean, wfJob.getStartTime(), coordAction.getLastModifiedTime(),
161 coordAction.getStatusStr());
162 }
163 else if (coordAction.getStatus() != CoordinatorAction.Status.WAITING) {
164 WorkflowJobBean wfJob = jpaService.execute(new WorkflowJobGetForSLAJPAExecutor(coordAction
165 .getExternalId()));
166 setStartForSLASummaryBean(summaryBean, summaryBean.getEventProcessed(), wfJob.getStartTime());
167 }
168 }
169 return isJobModified;
170 }
171
172 private boolean processSummaryBeanForWorkflowAction(SLASummaryBean summaryBean, String jobId)
173 throws JPAExecutorException {
174 boolean isJobModified = false;
175 WorkflowActionBean wfAction = null;
176 wfAction = jpaService.execute(new WorkflowActionGetForSLAJPAExecutor(jobId));
177 if (!wfAction.getStatusStr().equals(summaryBean.getJobStatus())) {
178 LOG.trace("Workflow action status is " + wfAction.getStatusStr() + "and summary bean status is "
179 + summaryBean.getJobStatus());
180 isJobModified = true;
181 summaryBean.setJobStatus(wfAction.getStatusStr());
182 if (wfAction.inTerminalState()) {
183 setEndForSLASummaryBean(summaryBean, wfAction.getStartTime(), wfAction.getEndTime(), wfAction.getStatusStr());
184 }
185 else if (wfAction.getStatus() != WorkflowAction.Status.PREP) {
186 setStartForSLASummaryBean(summaryBean, summaryBean.getEventProcessed(), wfAction.getStartTime());
187 }
188 }
189 return isJobModified;
190 }
191
192 private boolean processSummaryBeanForWorkflowJob(SLASummaryBean summaryBean, String jobId)
193 throws JPAExecutorException {
194 boolean isJobModified = false;
195 WorkflowJobBean wfJob = null;
196 wfJob = jpaService.execute(new WorkflowJobGetForSLAJPAExecutor(jobId));
197 if (!wfJob.getStatusStr().equals(summaryBean.getJobStatus())) {
198 LOG.trace("Workflow job status is " + wfJob.getStatusStr() + "and summary bean status is "
199 + summaryBean.getJobStatus());
200 isJobModified = true;
201 summaryBean.setJobStatus(wfJob.getStatusStr());
202 if (wfJob.inTerminalState()) {
203 setEndForSLASummaryBean(summaryBean, wfJob.getStartTime(), wfJob.getEndTime(), wfJob.getStatusStr());
204 }
205 else if (wfJob.getStatus() != WorkflowJob.Status.PREP) {
206 setStartForSLASummaryBean(summaryBean, summaryBean.getEventProcessed(), wfJob.getStartTime());
207 }
208 }
209 return isJobModified;
210 }
211
212 private void setEndForSLASummaryBean(SLASummaryBean summaryBean, Date startTime, Date endTime, String status) {
213 byte eventProc = summaryBean.getEventProcessed();
214 summaryBean.setEventProcessed(8);
215 summaryBean.setActualStart(startTime);
216 summaryBean.setActualEnd(endTime);
217 long actualDuration = endTime.getTime() - startTime.getTime();
218 summaryBean.setActualDuration(actualDuration);
219 if (eventProc < 4) {
220 if (status.equals(WorkflowJob.Status.SUCCEEDED.name()) || status.equals(WorkflowAction.Status.OK.name())
221 || status.equals(CoordinatorAction.Status.SUCCEEDED.name())) {
222 if (endTime.getTime() <= summaryBean.getExpectedEnd().getTime()) {
223 summaryBean.setSLAStatus(SLAStatus.MET);
224 }
225 else {
226 summaryBean.setSLAStatus(SLAStatus.MISS);
227 }
228 }
229 else {
230 summaryBean.setSLAStatus(SLAStatus.MISS);
231 }
232 }
233
234 }
235
236 private void setStartForSLASummaryBean(SLASummaryBean summaryBean, byte eventProc, Date startTime) {
237 if (((eventProc & 1) == 0)) {
238 eventProc += 1;
239 summaryBean.setEventProcessed(eventProc);
240 }
241 if (summaryBean.getSLAStatus().equals(SLAStatus.NOT_STARTED)) {
242 summaryBean.setSLAStatus(SLAStatus.IN_PROCESS);
243 }
244 summaryBean.setActualStart(startTime);
245 }
246
247 @Override
248 public int size() {
249 return slaMap.size();
250 }
251
252 @Override
253 public SLACalcStatus get(String jobId) throws JPAExecutorException {
254 SLACalcStatus memObj;
255 memObj = slaMap.get(jobId);
256 if (memObj == null && historySet.contains(jobId)) {
257 memObj = new SLACalcStatus(jpaService.execute(new SLASummaryGetJPAExecutor(jobId)),
258 jpaService.execute(new SLARegistrationGetOnRestartJPAExecutor(jobId)));
259 }
260 return memObj;
261 }
262
263 @Override
264 public Iterator<String> iterator() {
265 return slaMap.keySet().iterator();
266 }
267
268 @Override
269 public boolean isEmpty() {
270 return slaMap.isEmpty();
271 }
272
273 @Override
274 public void clear() {
275 slaMap.clear();
276 historySet.clear();
277 }
278
279 /**
280 * Invoked via periodic run, update the SLA for registered jobs
281 */
282 protected void updateJobSla(String jobId) throws JPAExecutorException, ServiceException {
283 SLACalcStatus slaCalc = slaMap.get(jobId);
284 synchronized (slaCalc) {
285 boolean change = false;
286 byte eventProc = slaCalc.getEventProcessed();
287 SLARegistrationBean reg = slaCalc.getSLARegistrationBean();
288 // calculation w.r.t current time and status
289 if ((eventProc & 1) == 0) { // first bit (start-processed) unset
290 if (reg.getExpectedStart() != null) {
291 if (reg.getExpectedStart().getTime() + jobEventLatency < System.currentTimeMillis()) {
292 confirmWithDB(slaCalc);
293 eventProc = slaCalc.getEventProcessed();
294 if (eventProc != 8 && (eventProc & 1 ) == 0) {
295 //Some DB exception
296 slaCalc.setEventStatus(EventStatus.START_MISS);
297 eventHandler.queueEvent(new SLACalcStatus(slaCalc));
298 eventProc++;
299 }
300 change = true;
301 }
302 }
303 else {
304 eventProc++; //disable further processing for optional start sla condition
305 change = true;
306 }
307 }
308 if (((eventProc >> 1) & 1) == 0 && eventProc != 8) { // check if second bit (duration-processed) is unset
309 if (reg.getExpectedDuration() == -1) {
310 eventProc += 2;
311 change = true;
312 }
313 else if (slaCalc.getActualStart() != null) {
314 if ((reg.getExpectedDuration() + jobEventLatency) < (System.currentTimeMillis() - slaCalc
315 .getActualStart().getTime())) {
316 slaCalc.setEventProcessed(eventProc);
317 confirmWithDB(slaCalc);
318 eventProc = slaCalc.getEventProcessed();
319 if (eventProc != 8 && ((eventProc >> 1) & 1 ) == 0) {
320 //Some DB exception
321 slaCalc.setEventStatus(EventStatus.DURATION_MISS);
322 eventHandler.queueEvent(new SLACalcStatus(slaCalc));
323 eventProc += 2;
324 }
325 change = true;
326 }
327 }
328 }
329 if (eventProc < 4) {
330 if (reg.getExpectedEnd().getTime() + jobEventLatency < System.currentTimeMillis()) {
331 slaCalc.setEventProcessed(eventProc);
332 confirmWithDB(slaCalc);
333 eventProc = slaCalc.getEventProcessed();
334 change = true;
335 }
336 }
337 if (change) {
338 if (slaCalc.getEventProcessed() >= 8) { //no more processing, no transfer to history set
339 eventProc = 8;
340 slaCalc.setEventProcessed(8); // Should not be > 8. But to handle any corner cases.
341 slaMap.remove(jobId);
342 }
343 else {
344 slaCalc.setEventProcessed(eventProc);
345 }
346 SLASummaryBean slaSummaryBean = new SLASummaryBean();
347 slaSummaryBean.setId(slaCalc.getId());
348 slaSummaryBean.setEventProcessed(eventProc);
349 slaSummaryBean.setSLAStatus(slaCalc.getSLAStatus());
350 slaSummaryBean.setEventStatus(slaCalc.getEventStatus());
351 slaSummaryBean.setActualEnd(slaCalc.getActualEnd());
352 slaSummaryBean.setActualStart(slaCalc.getActualStart());
353 slaSummaryBean.setActualDuration(slaCalc.getActualDuration());
354 slaSummaryBean.setJobStatus(slaCalc.getJobStatus());
355 jpaService.execute(new SLASummaryUpdateForSLAStatusActualTimesJPAExecutor(slaSummaryBean));
356 if (eventProc == 7) {
357 historySet.add(jobId);
358 slaMap.remove(jobId);
359 LOG.trace("Removed Job [{0}] from map after End-processed", jobId);
360 }
361
362 }
363 }
364 }
365
366 /**
367 * Periodically run by the SLAService worker threads to update SLA status by
368 * iterating through all the jobs in the map
369 */
370 @Override
371 public void updateAllSlaStatus() {
372 LOG.info("Running periodic SLA check");
373 Iterator<String> iterator = slaMap.keySet().iterator();
374 while (iterator.hasNext()) {
375 String jobId = iterator.next();
376 try {
377 LOG.trace("Processing SLA for jobid={0}", jobId);
378 updateJobSla(jobId);
379 }
380 catch (Exception e) {
381 LOG.error("Exception in SLA processing for job [{0}]", jobId, e);
382 }
383 }
384 }
385
386 /**
387 * Register a new job into the map for SLA tracking
388 */
389 @Override
390 public boolean addRegistration(String jobId, SLARegistrationBean reg) throws JPAExecutorException {
391 try {
392 if (slaMap.size() < capacity) {
393 SLACalcStatus slaCalc = new SLACalcStatus(reg);
394 slaCalc.setSLAStatus(SLAStatus.NOT_STARTED);
395 slaCalc.setJobStatus(getJobStatus(reg.getAppType()));
396 slaMap.put(jobId, slaCalc);
397 List<JsonBean> insertList = new ArrayList<JsonBean>();
398 insertList.add(reg);
399 insertList.add(new SLASummaryBean(slaCalc));
400 jpaService.execute(new SLACalculationInsertUpdateJPAExecutor(insertList, null));
401 LOG.trace("SLA Registration Event - Job:" + jobId);
402 return true;
403 }
404 else {
405 LOG.error(
406 "SLACalculator memory capacity reached. Cannot add or update new SLA Registration entry for job [{0}]",
407 reg.getId());
408 }
409 }
410 catch (JPAExecutorException jpa) {
411 throw jpa;
412 }
413 return false;
414 }
415
416 private String getJobStatus(AppType appType) {
417 String status = null;
418 switch (appType) {
419 case COORDINATOR_ACTION:
420 status = CoordinatorAction.Status.WAITING.name();
421 break;
422 case WORKFLOW_ACTION:
423 status = WorkflowAction.Status.PREP.name();
424 break;
425 case WORKFLOW_JOB:
426 status = WorkflowJob.Status.PREP.name();
427 break;
428 default:
429 break;
430 }
431 return status;
432 }
433
434 /**
435 * Update job into the map for SLA tracking
436 */
437 @Override
438 public boolean updateRegistration(String jobId, SLARegistrationBean reg) throws JPAExecutorException {
439 try {
440 if (slaMap.size() < capacity) {
441 SLACalcStatus slaCalc = new SLACalcStatus(reg);
442 slaCalc.setSLAStatus(SLAStatus.NOT_STARTED);
443 slaCalc.setJobStatus(getJobStatus(reg.getAppType()));
444 slaMap.put(jobId, slaCalc);
445 List<JsonBean> updateList = new ArrayList<JsonBean>();
446 updateList.add(reg);
447 updateList.add(new SLASummaryBean(slaCalc));
448 jpaService.execute(new SLACalculationInsertUpdateJPAExecutor(null, updateList));
449 LOG.trace("SLA Registration Event - Job:" + jobId);
450 return true;
451 }
452 else {
453 LOG.error(
454 "SLACalculator memory capacity reached. Cannot add or update new SLA Registration entry for job [{0}]",
455 reg.getId());
456 }
457 }
458 catch (JPAExecutorException jpa) {
459 throw jpa;
460 }
461 return false;
462 }
463
464 /**
465 * Remove job from being tracked in map
466 */
467 @Override
468 public void removeRegistration(String jobId) {
469 if (slaMap.remove(jobId) == null) {
470 historySet.remove(jobId);
471 }
472 }
473
474 /**
475 * Triggered after receiving Job status change event, update SLA status
476 * accordingly
477 */
478 @Override
479 public boolean addJobStatus(String jobId, String jobStatus, JobEvent.EventStatus jobEventStatus, Date startTime,
480 Date endTime) throws JPAExecutorException, ServiceException {
481 SLACalcStatus slaCalc = slaMap.get(jobId);
482 SLASummaryBean slaInfo = null;
483 boolean hasSla = false;
484 if (slaCalc != null) {
485 synchronized (slaCalc) {
486 slaCalc.setJobStatus(jobStatus);
487 switch (jobEventStatus) {
488 case STARTED:
489 slaInfo = processJobStartSLA(slaCalc, startTime);
490 break;
491 case SUCCESS:
492 slaInfo = processJobEndSuccessSLA(slaCalc, startTime, endTime);
493 break;
494 case FAILURE:
495 slaInfo = processJobEndFailureSLA(slaCalc, startTime, endTime);
496 break;
497 default:
498 LOG.debug("Unknown Job Status for SLA purpose[{0}]", jobEventStatus);
499 slaInfo = getSLASummaryBean(slaCalc);
500 }
501
502 if (slaCalc.getEventProcessed() == 7) {
503 slaInfo.setEventProcessed(8);
504 slaMap.remove(jobId);
505 }
506 hasSla = true;
507 }
508 LOG.trace("SLA Status Event - Job:" + jobId + " Status:" + slaCalc.getSLAStatus());
509 }
510 else if (historySet.contains(jobId)) {
511 slaInfo = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
512 if (slaInfo == null) {
513 throw new JPAExecutorException(ErrorCode.E0604, jobId);
514 }
515 slaInfo.setJobStatus(jobStatus);
516 slaInfo.setActualStart(startTime);
517 slaInfo.setActualEnd(endTime);
518 if (endTime != null) {
519 slaInfo.setActualDuration(endTime.getTime() - startTime.getTime());
520 }
521 slaInfo.setEventProcessed(8);
522 historySet.remove(jobId);
523 hasSla = true;
524 }
525
526 if (hasSla) {
527 jpaService.execute(new SLASummaryUpdateForSLAStatusActualTimesJPAExecutor(slaInfo));
528 }
529 return hasSla;
530 }
531
532 /**
533 * Process SLA for jobs that started running. Also update actual-start time
534 *
535 * @param slaCalc
536 * @param actualStart
537 * @return SLASummaryBean
538 */
539 private SLASummaryBean processJobStartSLA(SLACalcStatus slaCalc, Date actualStart) {
540 slaCalc.setActualStart(actualStart);
541 if (slaCalc.getSLAStatus().equals(SLAStatus.NOT_STARTED)) {
542 slaCalc.setSLAStatus(SLAStatus.IN_PROCESS);
543 }
544 SLARegistrationBean reg = slaCalc.getSLARegistrationBean();
545 Date expecStart = reg.getExpectedStart();
546 byte eventProc = slaCalc.getEventProcessed();
547 // set event proc here
548 if (((eventProc & 1) == 0)) {
549 if (expecStart != null) {
550 if (actualStart.getTime() > expecStart.getTime()) {
551 slaCalc.setEventStatus(EventStatus.START_MISS);
552 }
553 else {
554 slaCalc.setEventStatus(EventStatus.START_MET);
555 }
556 eventHandler.queueEvent(new SLACalcStatus(slaCalc));
557 }
558 eventProc += 1;
559 slaCalc.setEventProcessed(eventProc);
560 }
561 return getSLASummaryBean(slaCalc);
562 }
563
564 /**
565 * Process SLA for jobs that ended successfully. Also update actual-start
566 * and end time
567 *
568 * @param slaCalc
569 * @param actualStart
570 * @param actualEnd
571 * @return SLASummaryBean
572 * @throws JPAExecutorException
573 */
574 private SLASummaryBean processJobEndSuccessSLA(SLACalcStatus slaCalc, Date actualStart, Date actualEnd) throws JPAExecutorException {
575 SLARegistrationBean reg = slaCalc.getSLARegistrationBean();
576 slaCalc.setActualStart(actualStart);
577 slaCalc.setActualEnd(actualEnd);
578 long expectedDuration = reg.getExpectedDuration();
579 long actualDuration = actualEnd.getTime() - actualStart.getTime();
580 slaCalc.setActualDuration(actualDuration);
581 //check event proc
582 byte eventProc = slaCalc.getEventProcessed();
583 if (((eventProc >> 1) & 1) == 0) {
584 processDurationSLA(expectedDuration, actualDuration, slaCalc);
585 eventProc += 2;
586 slaCalc.setEventProcessed(eventProc);
587 }
588
589 if (eventProc < 4) {
590 Date expectedEnd = reg.getExpectedEnd();
591 if (actualEnd.getTime() > expectedEnd.getTime()) {
592 slaCalc.setEventStatus(EventStatus.END_MISS);
593 slaCalc.setSLAStatus(SLAStatus.MISS);
594 }
595 else {
596 slaCalc.setEventStatus(EventStatus.END_MET);
597 slaCalc.setSLAStatus(SLAStatus.MET);
598 }
599 eventProc += 4;
600 slaCalc.setEventProcessed(eventProc);
601 eventHandler.queueEvent(new SLACalcStatus(slaCalc));
602 }
603 return getSLASummaryBean(slaCalc);
604 }
605
606 /**
607 * Process SLA for jobs that ended in failure. Also update actual-start and
608 * end time
609 *
610 * @param slaCalc
611 * @param actualStart
612 * @param actualEnd
613 * @return SLASummaryBean
614 * @throws JPAExecutorException
615 */
616 private SLASummaryBean processJobEndFailureSLA(SLACalcStatus slaCalc, Date actualStart, Date actualEnd) throws JPAExecutorException {
617 slaCalc.setActualStart(actualStart);
618 slaCalc.setActualEnd(actualEnd);
619 if (actualStart == null) { // job failed before starting
620 if (slaCalc.getEventProcessed() < 4) {
621 slaCalc.setEventStatus(EventStatus.END_MISS);
622 slaCalc.setSLAStatus(SLAStatus.MISS);
623 eventHandler.queueEvent(new SLACalcStatus(slaCalc));
624 slaCalc.setEventProcessed(7);
625 return getSLASummaryBean(slaCalc);
626 }
627 }
628 SLARegistrationBean reg = slaCalc.getSLARegistrationBean();
629 long expectedDuration = reg.getExpectedDuration();
630 long actualDuration = actualEnd.getTime() - actualStart.getTime();
631 slaCalc.setActualDuration(actualDuration);
632
633 byte eventProc = slaCalc.getEventProcessed();
634 if (((eventProc >> 1) & 1) == 0) {
635 if (expectedDuration != -1) {
636 slaCalc.setEventStatus(EventStatus.DURATION_MISS);
637 eventHandler.queueEvent(new SLACalcStatus(slaCalc));
638 }
639 eventProc += 2;
640 slaCalc.setEventProcessed(eventProc);
641 }
642 if (eventProc < 4) {
643 slaCalc.setEventStatus(EventStatus.END_MISS);
644 slaCalc.setSLAStatus(SLAStatus.MISS);
645 eventProc += 4;
646 slaCalc.setEventProcessed(eventProc);
647 eventHandler.queueEvent(new SLACalcStatus(slaCalc));
648 }
649 return getSLASummaryBean(slaCalc);
650 }
651
652 private SLASummaryBean getSLASummaryBean (SLACalcStatus slaCalc) {
653 SLASummaryBean slaSummaryBean = new SLASummaryBean();
654 slaSummaryBean.setActualStart(slaCalc.getActualStart());
655 slaSummaryBean.setActualEnd(slaCalc.getActualEnd());
656 slaSummaryBean.setActualDuration(slaCalc.getActualDuration());
657 slaSummaryBean.setSLAStatus(slaCalc.getSLAStatus());
658 slaSummaryBean.setEventStatus(slaCalc.getEventStatus());
659 slaSummaryBean.setEventProcessed(slaCalc.getEventProcessed());
660 slaSummaryBean.setId(slaCalc.getId());
661 slaSummaryBean.setJobStatus(slaCalc.getJobStatus());
662 return slaSummaryBean;
663 }
664
665 private void processDurationSLA(long expected, long actual, SLACalcStatus slaCalc) {
666 if (expected != -1 && actual > expected) {
667 slaCalc.setEventStatus(EventStatus.DURATION_MISS);
668 eventHandler.queueEvent(new SLACalcStatus(slaCalc));
669 }
670 else if (expected != -1 && actual <= expected) {
671 slaCalc.setEventStatus(EventStatus.DURATION_MET);
672 eventHandler.queueEvent(new SLACalcStatus(slaCalc));
673 }
674 }
675
676 /*
677 * Confirm alerts against source of truth - DB. Also required in case of High Availability
678 */
679 private void confirmWithDB(SLACalcStatus slaCalc) {
680 boolean ended = false, isEndMiss = false;
681 try {
682 switch (slaCalc.getAppType()) {
683 case WORKFLOW_JOB:
684 WorkflowJobBean wf = jpaService.execute(new WorkflowJobGetForSLAJPAExecutor(slaCalc.getId()));
685 if (wf.getEndTime() != null) {
686 ended = true;
687 if (wf.getStatus() == WorkflowJob.Status.KILLED || wf.getStatus() == WorkflowJob.Status.FAILED
688 || wf.getEndTime().getTime() > slaCalc.getExpectedEnd().getTime()) {
689 isEndMiss = true;
690 }
691 }
692 slaCalc.setActualStart(wf.getStartTime());
693 slaCalc.setActualEnd(wf.getEndTime());
694 slaCalc.setJobStatus(wf.getStatusStr());
695 break;
696 case WORKFLOW_ACTION:
697 WorkflowActionBean wa = jpaService.execute(new WorkflowActionGetForSLAJPAExecutor(slaCalc.getId()));
698 if (wa.getEndTime() != null) {
699 ended = true;
700 if (wa.isTerminalWithFailure()
701 || wa.getEndTime().getTime() > slaCalc.getExpectedEnd().getTime()) {
702 isEndMiss = true;
703 }
704 }
705 slaCalc.setActualStart(wa.getStartTime());
706 slaCalc.setActualEnd(wa.getEndTime());
707 slaCalc.setJobStatus(wa.getStatusStr());
708 break;
709 case COORDINATOR_ACTION:
710 CoordinatorActionBean ca = jpaService.execute(new CoordActionGetForSLAJPAExecutor(slaCalc.getId()));
711 if (ca.isTerminalWithFailure()) {
712 isEndMiss = ended = true;
713 slaCalc.setActualStart(null);
714 slaCalc.setActualEnd(ca.getLastModifiedTime());
715 }
716 if (ca.getExternalId() != null) {
717 wf = jpaService.execute(new WorkflowJobGetForSLAJPAExecutor(ca.getExternalId()));
718 if (wf.getEndTime() != null) {
719 ended = true;
720 if (wf.getEndTime().getTime() > slaCalc.getExpectedEnd().getTime()) {
721 isEndMiss = true;
722 }
723 }
724 slaCalc.setActualEnd(wf.getEndTime());
725 slaCalc.setActualStart(wf.getStartTime());
726 }
727 slaCalc.setJobStatus(ca.getStatusStr());
728 break;
729 default:
730 LOG.debug("Unsupported App-type for SLA - " + slaCalc.getAppType());
731 }
732
733 byte eventProc = slaCalc.getEventProcessed();
734 if (ended) {
735 if (isEndMiss) {
736 slaCalc.setSLAStatus(SLAStatus.MISS);
737 }
738 else {
739 slaCalc.setSLAStatus(SLAStatus.MET);
740 }
741 if (slaCalc.getActualStart() != null) {
742 if ((eventProc & 1) == 0) {
743 if (slaCalc.getExpectedStart().getTime() < slaCalc.getActualStart().getTime()) {
744 slaCalc.setEventStatus(EventStatus.START_MISS);
745 }
746 else {
747 slaCalc.setEventStatus(EventStatus.START_MET);
748 }
749 eventHandler.queueEvent(new SLACalcStatus(slaCalc));
750 }
751 slaCalc.setActualDuration(slaCalc.getActualEnd().getTime() - slaCalc.getActualStart().getTime());
752 if (((eventProc >> 1) & 1) == 0) {
753 processDurationSLA(slaCalc.getExpectedDuration(), slaCalc.getActualDuration(), slaCalc);
754 }
755 }
756 if (eventProc < 4) {
757 if (isEndMiss) {
758 slaCalc.setEventStatus(EventStatus.END_MISS);
759 }
760 else {
761 slaCalc.setEventStatus(EventStatus.END_MET);
762 }
763 eventHandler.queueEvent(new SLACalcStatus(slaCalc));
764 }
765 slaCalc.setEventProcessed(8);
766 }
767 else {
768 if (slaCalc.getActualStart() != null) {
769 slaCalc.setSLAStatus(SLAStatus.IN_PROCESS);
770 }
771 if ((eventProc & 1) == 0) {
772 if (slaCalc.getActualStart() != null) {
773 if (slaCalc.getExpectedStart().getTime() < slaCalc.getActualStart().getTime()) {
774 slaCalc.setEventStatus(EventStatus.START_MISS);
775 }
776 else {
777 slaCalc.setEventStatus(EventStatus.START_MET);
778 }
779 eventHandler.queueEvent(new SLACalcStatus(slaCalc));
780 eventProc++;
781 }
782 else if (slaCalc.getExpectedStart().getTime() < System.currentTimeMillis()) {
783 slaCalc.setEventStatus(EventStatus.START_MISS);
784 eventHandler.queueEvent(new SLACalcStatus(slaCalc));
785 eventProc++;
786 }
787 }
788 if (((eventProc >> 1) & 1) == 0 && slaCalc.getActualStart() != null
789 && slaCalc.getExpectedDuration() != -1) {
790 if (System.currentTimeMillis() - slaCalc.getActualStart().getTime() > slaCalc.getExpectedDuration()) {
791 slaCalc.setEventStatus(EventStatus.DURATION_MISS);
792 eventHandler.queueEvent(new SLACalcStatus(slaCalc));
793 eventProc += 2;
794 }
795 }
796 if (eventProc < 4 && slaCalc.getExpectedEnd().getTime() < System.currentTimeMillis()) {
797 slaCalc.setEventStatus(EventStatus.END_MISS);
798 slaCalc.setSLAStatus(SLAStatus.MISS);
799 eventHandler.queueEvent(new SLACalcStatus(slaCalc));
800 eventProc += 4;
801 }
802 slaCalc.setEventProcessed(eventProc);
803 }
804 }
805 catch (Exception e) {
806 LOG.warn("Error while confirming SLA against DB for jobid= " + slaCalc.getId() + ". Exception is "
807 + e.getClass().getName() + ": " + e.getMessage());
808 if (slaCalc.getEventProcessed() < 4 && slaCalc.getExpectedEnd().getTime() < System.currentTimeMillis()) {
809 slaCalc.setEventStatus(EventStatus.END_MISS);
810 slaCalc.setSLAStatus(SLAStatus.MISS);
811 eventHandler.queueEvent(new SLACalcStatus(slaCalc));
812 slaCalc.setEventProcessed(slaCalc.getEventProcessed() + 4);
813 }
814 }
815 }
816
817 }