001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.sla;
019    
020    import java.util.ArrayList;
021    import java.util.Collections;
022    import java.util.Date;
023    import java.util.HashSet;
024    import java.util.Iterator;
025    import java.util.List;
026    import java.util.Map;
027    import java.util.Set;
028    import java.util.concurrent.ConcurrentHashMap;
029    
030    import org.apache.hadoop.conf.Configuration;
031    import org.apache.oozie.AppType;
032    import org.apache.oozie.CoordinatorActionBean;
033    import org.apache.oozie.ErrorCode;
034    import org.apache.oozie.WorkflowActionBean;
035    import org.apache.oozie.WorkflowJobBean;
036    import org.apache.oozie.client.CoordinatorAction;
037    import org.apache.oozie.client.WorkflowAction;
038    import org.apache.oozie.client.WorkflowJob;
039    import org.apache.oozie.client.event.JobEvent;
040    import org.apache.oozie.client.event.SLAEvent.EventStatus;
041    import org.apache.oozie.client.event.SLAEvent.SLAStatus;
042    import org.apache.oozie.client.rest.JsonBean;
043    import org.apache.oozie.executor.jpa.CoordActionGetForSLAJPAExecutor;
044    import org.apache.oozie.executor.jpa.JPAExecutorException;
045    import org.apache.oozie.executor.jpa.WorkflowActionGetForSLAJPAExecutor;
046    import org.apache.oozie.executor.jpa.WorkflowJobGetForSLAJPAExecutor;
047    
048    import org.apache.oozie.executor.jpa.sla.SLACalculationInsertUpdateJPAExecutor;
049    import org.apache.oozie.executor.jpa.sla.SLARegistrationGetOnRestartJPAExecutor;
050    import org.apache.oozie.executor.jpa.sla.SLASummaryGetJPAExecutor;
051    import org.apache.oozie.executor.jpa.sla.SLASummaryGetRecordsOnRestartJPAExecutor;
052    import org.apache.oozie.executor.jpa.sla.SLASummaryUpdateForSLAStatusActualTimesJPAExecutor;
053    import org.apache.oozie.service.EventHandlerService;
054    import org.apache.oozie.service.JPAService;
055    import org.apache.oozie.service.ServiceException;
056    import org.apache.oozie.service.Services;
057    import org.apache.oozie.sla.service.SLAService;
058    import org.apache.oozie.util.XLog;
059    
060    
061    /**
062     * Implementation class for SLACalculator that calculates SLA related to
063     * start/end/duration of jobs using a memory-based map
064     */
065    public class SLACalculatorMemory implements SLACalculator {
066    
067        private static final XLog LOG = XLog.getLog(SLACalculatorMemory.class);
068        // TODO optimization priority based insertion/processing/bumping up-down
069        private static Map<String, SLACalcStatus> slaMap;
070        private static Set<String> historySet;
071        private static int capacity;
072        private static JPAService jpaService;
073        private EventHandlerService eventHandler;
074        private static int modifiedAfter;
075        private static long jobEventLatency;
076    
077        @Override
078        public void init(Configuration conf) throws ServiceException {
079            capacity = conf.getInt(SLAService.CONF_CAPACITY, 5000);
080            jobEventLatency = conf.getInt(SLAService.CONF_JOB_EVENT_LATENCY, 90 * 1000);
081            slaMap = new ConcurrentHashMap<String, SLACalcStatus>();
082            historySet = Collections.synchronizedSet(new HashSet<String>());
083            jpaService = Services.get().get(JPAService.class);
084            eventHandler = Services.get().get(EventHandlerService.class);
085            // load events modified after
086            modifiedAfter = conf.getInt(SLAService.CONF_EVENTS_MODIFIED_AFTER, 7);
087            loadOnRestart();
088    
089        }
090    
091        private void loadOnRestart() {
092            boolean isJobModified = false;
093            try {
094                long slaPendingCount = 0;
095                long statusPendingCount = 0;
096                List<SLASummaryBean> summaryBeans = jpaService.execute(new SLASummaryGetRecordsOnRestartJPAExecutor(
097                        modifiedAfter));
098                for (SLASummaryBean summaryBean : summaryBeans) {
099                    String jobId = summaryBean.getId();
100                    try {
101                        switch (summaryBean.getAppType()) {
102                            case COORDINATOR_ACTION:
103                                isJobModified = processSummaryBeanForCoordAction(summaryBean, jobId);
104                                break;
105                            case WORKFLOW_ACTION:
106                                isJobModified = processSummaryBeanForWorkflowAction(summaryBean, jobId);
107                                break;
108                            case WORKFLOW_JOB:
109                                isJobModified = processSummaryBeanForWorkflowJob(summaryBean, jobId);
110                                break;
111                            default:
112                                break;
113                        }
114                        if (isJobModified) {
115                            jpaService.execute(new SLASummaryUpdateForSLAStatusActualTimesJPAExecutor(summaryBean));
116                        }
117                    }
118                    catch (Exception e) {
119                        LOG.warn("Failed to load records for " + jobId, e);
120                    }
121                    try {
122                        if (summaryBean.getEventProcessed() == 7) {
123                            historySet.add(jobId);
124                            statusPendingCount++;
125                        }
126                        else if (summaryBean.getEventProcessed() <= 7) {
127                            SLARegistrationBean slaRegBean = jpaService.execute(new SLARegistrationGetOnRestartJPAExecutor(
128                                    jobId));
129                            SLACalcStatus slaCalcStatus = new SLACalcStatus(summaryBean, slaRegBean);
130                            slaMap.put(jobId, slaCalcStatus);
131                            slaPendingCount++;
132                        }
133                    }
134                    catch (Exception e) {
135                        LOG.warn("Failed to fetch/update records for " + jobId, e);
136                    }
137    
138                }
139                LOG.info("Loaded SLASummary pendingSLA=" + slaPendingCount + ", pendingStatusUpdate=" + statusPendingCount);
140    
141            }
142            catch (Exception e) {
143                LOG.warn("Failed to retrieve SLASummary records on restart", e);
144            }
145        }
146    
147        private boolean processSummaryBeanForCoordAction(SLASummaryBean summaryBean, String jobId)
148                throws JPAExecutorException {
149            boolean isJobModified = false;
150            CoordinatorActionBean coordAction = null;
151            coordAction = jpaService.execute(new CoordActionGetForSLAJPAExecutor(jobId));
152            if (!coordAction.getStatusStr().equals(summaryBean.getJobStatus())) {
153                LOG.trace("Coordinator action status is " + coordAction.getStatusStr() + " and summary bean status is "
154                        + summaryBean.getJobStatus());
155                isJobModified = true;
156                summaryBean.setJobStatus(coordAction.getStatusStr());
157                if (coordAction.isTerminalStatus()) {
158                    WorkflowJobBean wfJob = jpaService.execute(new WorkflowJobGetForSLAJPAExecutor(coordAction
159                            .getExternalId()));
160                    setEndForSLASummaryBean(summaryBean, wfJob.getStartTime(), coordAction.getLastModifiedTime(),
161                            coordAction.getStatusStr());
162                }
163                else if (coordAction.getStatus() != CoordinatorAction.Status.WAITING) {
164                    WorkflowJobBean wfJob = jpaService.execute(new WorkflowJobGetForSLAJPAExecutor(coordAction
165                            .getExternalId()));
166                    setStartForSLASummaryBean(summaryBean, summaryBean.getEventProcessed(), wfJob.getStartTime());
167                }
168            }
169            return isJobModified;
170        }
171    
172        private boolean processSummaryBeanForWorkflowAction(SLASummaryBean summaryBean, String jobId)
173                throws JPAExecutorException {
174            boolean isJobModified = false;
175            WorkflowActionBean wfAction = null;
176            wfAction = jpaService.execute(new WorkflowActionGetForSLAJPAExecutor(jobId));
177            if (!wfAction.getStatusStr().equals(summaryBean.getJobStatus())) {
178                LOG.trace("Workflow action status is " + wfAction.getStatusStr() + "and summary bean status is "
179                        + summaryBean.getJobStatus());
180                isJobModified = true;
181                summaryBean.setJobStatus(wfAction.getStatusStr());
182                if (wfAction.inTerminalState()) {
183                    setEndForSLASummaryBean(summaryBean, wfAction.getStartTime(), wfAction.getEndTime(), wfAction.getStatusStr());
184                }
185                else if (wfAction.getStatus() != WorkflowAction.Status.PREP) {
186                    setStartForSLASummaryBean(summaryBean, summaryBean.getEventProcessed(), wfAction.getStartTime());
187                }
188            }
189            return isJobModified;
190        }
191    
192        private boolean processSummaryBeanForWorkflowJob(SLASummaryBean summaryBean, String jobId)
193                throws JPAExecutorException {
194            boolean isJobModified = false;
195            WorkflowJobBean wfJob = null;
196            wfJob = jpaService.execute(new WorkflowJobGetForSLAJPAExecutor(jobId));
197            if (!wfJob.getStatusStr().equals(summaryBean.getJobStatus())) {
198                LOG.trace("Workflow job status is " + wfJob.getStatusStr() + "and summary bean status is "
199                        + summaryBean.getJobStatus());
200                isJobModified = true;
201                summaryBean.setJobStatus(wfJob.getStatusStr());
202                if (wfJob.inTerminalState()) {
203                    setEndForSLASummaryBean(summaryBean, wfJob.getStartTime(), wfJob.getEndTime(), wfJob.getStatusStr());
204                }
205                else if (wfJob.getStatus() != WorkflowJob.Status.PREP) {
206                    setStartForSLASummaryBean(summaryBean, summaryBean.getEventProcessed(), wfJob.getStartTime());
207                }
208            }
209            return isJobModified;
210        }
211    
212        private void setEndForSLASummaryBean(SLASummaryBean summaryBean, Date startTime, Date endTime, String status) {
213            byte eventProc = summaryBean.getEventProcessed();
214            summaryBean.setEventProcessed(8);
215            summaryBean.setActualStart(startTime);
216            summaryBean.setActualEnd(endTime);
217            long actualDuration = endTime.getTime() - startTime.getTime();
218            summaryBean.setActualDuration(actualDuration);
219            if (eventProc < 4) {
220                if (status.equals(WorkflowJob.Status.SUCCEEDED.name()) || status.equals(WorkflowAction.Status.OK.name())
221                        || status.equals(CoordinatorAction.Status.SUCCEEDED.name())) {
222                    if (endTime.getTime() <= summaryBean.getExpectedEnd().getTime()) {
223                        summaryBean.setSLAStatus(SLAStatus.MET);
224                    }
225                    else {
226                        summaryBean.setSLAStatus(SLAStatus.MISS);
227                    }
228                }
229                else {
230                    summaryBean.setSLAStatus(SLAStatus.MISS);
231                }
232            }
233    
234        }
235    
236        private void setStartForSLASummaryBean(SLASummaryBean summaryBean, byte eventProc, Date startTime) {
237            if (((eventProc & 1) == 0)) {
238                eventProc += 1;
239                summaryBean.setEventProcessed(eventProc);
240            }
241            if (summaryBean.getSLAStatus().equals(SLAStatus.NOT_STARTED)) {
242                summaryBean.setSLAStatus(SLAStatus.IN_PROCESS);
243            }
244            summaryBean.setActualStart(startTime);
245        }
246    
247        @Override
248        public int size() {
249            return slaMap.size();
250        }
251    
252        @Override
253        public SLACalcStatus get(String jobId) throws JPAExecutorException {
254            SLACalcStatus memObj;
255            memObj = slaMap.get(jobId);
256            if (memObj == null && historySet.contains(jobId)) {
257                memObj = new SLACalcStatus(jpaService.execute(new SLASummaryGetJPAExecutor(jobId)),
258                        jpaService.execute(new SLARegistrationGetOnRestartJPAExecutor(jobId)));
259            }
260            return memObj;
261        }
262    
263        @Override
264        public Iterator<String> iterator() {
265            return slaMap.keySet().iterator();
266        }
267    
268        @Override
269        public boolean isEmpty() {
270            return slaMap.isEmpty();
271        }
272    
273        @Override
274        public void clear() {
275            slaMap.clear();
276            historySet.clear();
277        }
278    
279        /**
280         * Invoked via periodic run, update the SLA for registered jobs
281         */
282        protected void updateJobSla(String jobId) throws JPAExecutorException, ServiceException {
283            SLACalcStatus slaCalc = slaMap.get(jobId);
284            synchronized (slaCalc) {
285                boolean change = false;
286                byte eventProc = slaCalc.getEventProcessed();
287                SLARegistrationBean reg = slaCalc.getSLARegistrationBean();
288                // calculation w.r.t current time and status
289                if ((eventProc & 1) == 0) { // first bit (start-processed) unset
290                    if (reg.getExpectedStart() != null) {
291                        if (reg.getExpectedStart().getTime() + jobEventLatency < System.currentTimeMillis()) {
292                            confirmWithDB(slaCalc);
293                            eventProc = slaCalc.getEventProcessed();
294                            if (eventProc != 8 && (eventProc & 1 ) == 0) {
295                                //Some DB exception
296                                slaCalc.setEventStatus(EventStatus.START_MISS);
297                                eventHandler.queueEvent(new SLACalcStatus(slaCalc));
298                                eventProc++;
299                            }
300                            change = true;
301                        }
302                    }
303                    else {
304                        eventProc++; //disable further processing for optional start sla condition
305                        change = true;
306                    }
307                }
308                if (((eventProc >> 1) & 1) == 0 && eventProc != 8) { // check if second bit (duration-processed) is unset
309                    if (reg.getExpectedDuration() == -1) {
310                        eventProc += 2;
311                        change = true;
312                    }
313                    else if (slaCalc.getActualStart() != null) {
314                        if ((reg.getExpectedDuration() + jobEventLatency) < (System.currentTimeMillis() - slaCalc
315                                .getActualStart().getTime())) {
316                            slaCalc.setEventProcessed(eventProc);
317                            confirmWithDB(slaCalc);
318                            eventProc = slaCalc.getEventProcessed();
319                            if (eventProc != 8 && ((eventProc >> 1) & 1 ) == 0) {
320                                //Some DB exception
321                                slaCalc.setEventStatus(EventStatus.DURATION_MISS);
322                                eventHandler.queueEvent(new SLACalcStatus(slaCalc));
323                                eventProc += 2;
324                            }
325                            change = true;
326                        }
327                    }
328                }
329                if (eventProc < 4) {
330                    if (reg.getExpectedEnd().getTime() + jobEventLatency < System.currentTimeMillis()) {
331                        slaCalc.setEventProcessed(eventProc);
332                        confirmWithDB(slaCalc);
333                        eventProc = slaCalc.getEventProcessed();
334                        change = true;
335                    }
336                }
337                if (change) {
338                    if (slaCalc.getEventProcessed() >= 8) { //no more processing, no transfer to history set
339                        eventProc = 8;
340                        slaCalc.setEventProcessed(8); // Should not be > 8. But to handle any corner cases.
341                        slaMap.remove(jobId);
342                    }
343                    else {
344                        slaCalc.setEventProcessed(eventProc);
345                    }
346                    SLASummaryBean slaSummaryBean = new SLASummaryBean();
347                    slaSummaryBean.setId(slaCalc.getId());
348                    slaSummaryBean.setEventProcessed(eventProc);
349                    slaSummaryBean.setSLAStatus(slaCalc.getSLAStatus());
350                    slaSummaryBean.setEventStatus(slaCalc.getEventStatus());
351                    slaSummaryBean.setActualEnd(slaCalc.getActualEnd());
352                    slaSummaryBean.setActualStart(slaCalc.getActualStart());
353                    slaSummaryBean.setActualDuration(slaCalc.getActualDuration());
354                    slaSummaryBean.setJobStatus(slaCalc.getJobStatus());
355                    jpaService.execute(new SLASummaryUpdateForSLAStatusActualTimesJPAExecutor(slaSummaryBean));
356                    if (eventProc == 7) {
357                        historySet.add(jobId);
358                        slaMap.remove(jobId);
359                        LOG.trace("Removed Job [{0}] from map after End-processed", jobId);
360                    }
361    
362                }
363            }
364        }
365    
366        /**
367         * Periodically run by the SLAService worker threads to update SLA status by
368         * iterating through all the jobs in the map
369         */
370        @Override
371        public void updateAllSlaStatus() {
372            LOG.info("Running periodic SLA check");
373            Iterator<String> iterator = slaMap.keySet().iterator();
374            while (iterator.hasNext()) {
375                String jobId = iterator.next();
376                try {
377                    LOG.trace("Processing SLA for jobid={0}", jobId);
378                    updateJobSla(jobId);
379                }
380                catch (Exception e) {
381                    LOG.error("Exception in SLA processing for job [{0}]", jobId, e);
382                }
383            }
384        }
385    
386        /**
387         * Register a new job into the map for SLA tracking
388         */
389        @Override
390        public boolean addRegistration(String jobId, SLARegistrationBean reg) throws JPAExecutorException {
391            try {
392                if (slaMap.size() < capacity) {
393                    SLACalcStatus slaCalc = new SLACalcStatus(reg);
394                    slaCalc.setSLAStatus(SLAStatus.NOT_STARTED);
395                    slaCalc.setJobStatus(getJobStatus(reg.getAppType()));
396                    slaMap.put(jobId, slaCalc);
397                    List<JsonBean> insertList = new ArrayList<JsonBean>();
398                    insertList.add(reg);
399                    insertList.add(new SLASummaryBean(slaCalc));
400                    jpaService.execute(new SLACalculationInsertUpdateJPAExecutor(insertList, null));
401                    LOG.trace("SLA Registration Event - Job:" + jobId);
402                    return true;
403                }
404                else {
405                    LOG.error(
406                            "SLACalculator memory capacity reached. Cannot add or update new SLA Registration entry for job [{0}]",
407                            reg.getId());
408                }
409            }
410            catch (JPAExecutorException jpa) {
411                throw jpa;
412            }
413            return false;
414        }
415    
416        private String getJobStatus(AppType appType) {
417            String status = null;
418            switch (appType) {
419                case COORDINATOR_ACTION:
420                    status = CoordinatorAction.Status.WAITING.name();
421                    break;
422                case WORKFLOW_ACTION:
423                    status = WorkflowAction.Status.PREP.name();
424                    break;
425                case WORKFLOW_JOB:
426                    status = WorkflowJob.Status.PREP.name();
427                    break;
428                default:
429                    break;
430            }
431            return status;
432        }
433    
434        /**
435         * Update job into the map for SLA tracking
436         */
437        @Override
438        public boolean updateRegistration(String jobId, SLARegistrationBean reg) throws JPAExecutorException {
439            try {
440                if (slaMap.size() < capacity) {
441                    SLACalcStatus slaCalc = new SLACalcStatus(reg);
442                    slaCalc.setSLAStatus(SLAStatus.NOT_STARTED);
443                    slaCalc.setJobStatus(getJobStatus(reg.getAppType()));
444                    slaMap.put(jobId, slaCalc);
445                    List<JsonBean> updateList = new ArrayList<JsonBean>();
446                    updateList.add(reg);
447                    updateList.add(new SLASummaryBean(slaCalc));
448                    jpaService.execute(new SLACalculationInsertUpdateJPAExecutor(null, updateList));
449                    LOG.trace("SLA Registration Event - Job:" + jobId);
450                    return true;
451                }
452                else {
453                    LOG.error(
454                            "SLACalculator memory capacity reached. Cannot add or update new SLA Registration entry for job [{0}]",
455                            reg.getId());
456                }
457            }
458            catch (JPAExecutorException jpa) {
459                throw jpa;
460            }
461            return false;
462        }
463    
464        /**
465         * Remove job from being tracked in map
466         */
467        @Override
468        public void removeRegistration(String jobId) {
469            if (slaMap.remove(jobId) == null) {
470                historySet.remove(jobId);
471            }
472        }
473    
474        /**
475         * Triggered after receiving Job status change event, update SLA status
476         * accordingly
477         */
478        @Override
479        public boolean addJobStatus(String jobId, String jobStatus, JobEvent.EventStatus jobEventStatus, Date startTime,
480                Date endTime) throws JPAExecutorException, ServiceException {
481            SLACalcStatus slaCalc = slaMap.get(jobId);
482            SLASummaryBean slaInfo = null;
483            boolean hasSla = false;
484            if (slaCalc != null) {
485                synchronized (slaCalc) {
486                    slaCalc.setJobStatus(jobStatus);
487                    switch (jobEventStatus) {
488                        case STARTED:
489                            slaInfo = processJobStartSLA(slaCalc, startTime);
490                            break;
491                        case SUCCESS:
492                            slaInfo = processJobEndSuccessSLA(slaCalc, startTime, endTime);
493                            break;
494                        case FAILURE:
495                            slaInfo = processJobEndFailureSLA(slaCalc, startTime, endTime);
496                            break;
497                        default:
498                            LOG.debug("Unknown Job Status for SLA purpose[{0}]", jobEventStatus);
499                            slaInfo = getSLASummaryBean(slaCalc);
500                    }
501    
502                    if (slaCalc.getEventProcessed() == 7) {
503                        slaInfo.setEventProcessed(8);
504                        slaMap.remove(jobId);
505                    }
506                    hasSla = true;
507                }
508                LOG.trace("SLA Status Event - Job:" + jobId + " Status:" + slaCalc.getSLAStatus());
509            }
510            else if (historySet.contains(jobId)) {
511                slaInfo = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
512                if (slaInfo == null) {
513                    throw new JPAExecutorException(ErrorCode.E0604, jobId);
514                }
515                slaInfo.setJobStatus(jobStatus);
516                slaInfo.setActualStart(startTime);
517                slaInfo.setActualEnd(endTime);
518                if (endTime != null) {
519                    slaInfo.setActualDuration(endTime.getTime() - startTime.getTime());
520                }
521                slaInfo.setEventProcessed(8);
522                historySet.remove(jobId);
523                hasSla = true;
524            }
525    
526            if (hasSla) {
527                jpaService.execute(new SLASummaryUpdateForSLAStatusActualTimesJPAExecutor(slaInfo));
528            }
529            return hasSla;
530        }
531    
532        /**
533         * Process SLA for jobs that started running. Also update actual-start time
534         *
535         * @param slaCalc
536         * @param actualStart
537         * @return SLASummaryBean
538         */
539        private SLASummaryBean processJobStartSLA(SLACalcStatus slaCalc, Date actualStart) {
540            slaCalc.setActualStart(actualStart);
541            if (slaCalc.getSLAStatus().equals(SLAStatus.NOT_STARTED)) {
542                slaCalc.setSLAStatus(SLAStatus.IN_PROCESS);
543            }
544            SLARegistrationBean reg = slaCalc.getSLARegistrationBean();
545            Date expecStart = reg.getExpectedStart();
546            byte eventProc = slaCalc.getEventProcessed();
547            // set event proc here
548            if (((eventProc & 1) == 0)) {
549                if (expecStart != null) {
550                    if (actualStart.getTime() > expecStart.getTime()) {
551                        slaCalc.setEventStatus(EventStatus.START_MISS);
552                    }
553                    else {
554                        slaCalc.setEventStatus(EventStatus.START_MET);
555                    }
556                    eventHandler.queueEvent(new SLACalcStatus(slaCalc));
557                }
558                eventProc += 1;
559                slaCalc.setEventProcessed(eventProc);
560            }
561            return getSLASummaryBean(slaCalc);
562        }
563    
564        /**
565         * Process SLA for jobs that ended successfully. Also update actual-start
566         * and end time
567         *
568         * @param slaCalc
569         * @param actualStart
570         * @param actualEnd
571         * @return SLASummaryBean
572         * @throws JPAExecutorException
573         */
574        private SLASummaryBean processJobEndSuccessSLA(SLACalcStatus slaCalc, Date actualStart, Date actualEnd) throws JPAExecutorException {
575            SLARegistrationBean reg = slaCalc.getSLARegistrationBean();
576            slaCalc.setActualStart(actualStart);
577            slaCalc.setActualEnd(actualEnd);
578            long expectedDuration = reg.getExpectedDuration();
579            long actualDuration = actualEnd.getTime() - actualStart.getTime();
580            slaCalc.setActualDuration(actualDuration);
581            //check event proc
582            byte eventProc = slaCalc.getEventProcessed();
583            if (((eventProc >> 1) & 1) == 0) {
584                processDurationSLA(expectedDuration, actualDuration, slaCalc);
585                eventProc += 2;
586                slaCalc.setEventProcessed(eventProc);
587            }
588    
589            if (eventProc < 4) {
590                Date expectedEnd = reg.getExpectedEnd();
591                if (actualEnd.getTime() > expectedEnd.getTime()) {
592                    slaCalc.setEventStatus(EventStatus.END_MISS);
593                    slaCalc.setSLAStatus(SLAStatus.MISS);
594                }
595                else {
596                    slaCalc.setEventStatus(EventStatus.END_MET);
597                    slaCalc.setSLAStatus(SLAStatus.MET);
598                }
599                eventProc += 4;
600                slaCalc.setEventProcessed(eventProc);
601                eventHandler.queueEvent(new SLACalcStatus(slaCalc));
602            }
603            return getSLASummaryBean(slaCalc);
604        }
605    
606        /**
607         * Process SLA for jobs that ended in failure. Also update actual-start and
608         * end time
609         *
610         * @param slaCalc
611         * @param actualStart
612         * @param actualEnd
613         * @return SLASummaryBean
614         * @throws JPAExecutorException
615         */
616        private SLASummaryBean processJobEndFailureSLA(SLACalcStatus slaCalc, Date actualStart, Date actualEnd) throws JPAExecutorException {
617            slaCalc.setActualStart(actualStart);
618            slaCalc.setActualEnd(actualEnd);
619            if (actualStart == null) { // job failed before starting
620                if (slaCalc.getEventProcessed() < 4) {
621                    slaCalc.setEventStatus(EventStatus.END_MISS);
622                    slaCalc.setSLAStatus(SLAStatus.MISS);
623                    eventHandler.queueEvent(new SLACalcStatus(slaCalc));
624                    slaCalc.setEventProcessed(7);
625                    return getSLASummaryBean(slaCalc);
626                }
627            }
628            SLARegistrationBean reg = slaCalc.getSLARegistrationBean();
629            long expectedDuration = reg.getExpectedDuration();
630            long actualDuration = actualEnd.getTime() - actualStart.getTime();
631            slaCalc.setActualDuration(actualDuration);
632    
633            byte eventProc = slaCalc.getEventProcessed();
634            if (((eventProc >> 1) & 1) == 0) {
635                if (expectedDuration != -1) {
636                    slaCalc.setEventStatus(EventStatus.DURATION_MISS);
637                    eventHandler.queueEvent(new SLACalcStatus(slaCalc));
638                }
639                eventProc += 2;
640                slaCalc.setEventProcessed(eventProc);
641            }
642            if (eventProc < 4) {
643                slaCalc.setEventStatus(EventStatus.END_MISS);
644                slaCalc.setSLAStatus(SLAStatus.MISS);
645                eventProc += 4;
646                slaCalc.setEventProcessed(eventProc);
647                eventHandler.queueEvent(new SLACalcStatus(slaCalc));
648            }
649            return getSLASummaryBean(slaCalc);
650        }
651    
652        private SLASummaryBean getSLASummaryBean (SLACalcStatus slaCalc) {
653            SLASummaryBean slaSummaryBean = new SLASummaryBean();
654            slaSummaryBean.setActualStart(slaCalc.getActualStart());
655            slaSummaryBean.setActualEnd(slaCalc.getActualEnd());
656            slaSummaryBean.setActualDuration(slaCalc.getActualDuration());
657            slaSummaryBean.setSLAStatus(slaCalc.getSLAStatus());
658            slaSummaryBean.setEventStatus(slaCalc.getEventStatus());
659            slaSummaryBean.setEventProcessed(slaCalc.getEventProcessed());
660            slaSummaryBean.setId(slaCalc.getId());
661            slaSummaryBean.setJobStatus(slaCalc.getJobStatus());
662            return slaSummaryBean;
663        }
664    
665        private void processDurationSLA(long expected, long actual, SLACalcStatus slaCalc) {
666            if (expected != -1 && actual > expected) {
667                slaCalc.setEventStatus(EventStatus.DURATION_MISS);
668                eventHandler.queueEvent(new SLACalcStatus(slaCalc));
669            }
670            else if (expected != -1 && actual <= expected) {
671                slaCalc.setEventStatus(EventStatus.DURATION_MET);
672                eventHandler.queueEvent(new SLACalcStatus(slaCalc));
673            }
674        }
675    
676        /*
677         * Confirm alerts against source of truth - DB. Also required in case of High Availability
678         */
679        private void confirmWithDB(SLACalcStatus slaCalc) {
680            boolean ended = false, isEndMiss = false;
681            try {
682                switch (slaCalc.getAppType()) {
683                    case WORKFLOW_JOB:
684                        WorkflowJobBean wf = jpaService.execute(new WorkflowJobGetForSLAJPAExecutor(slaCalc.getId()));
685                        if (wf.getEndTime() != null) {
686                            ended = true;
687                            if (wf.getStatus() == WorkflowJob.Status.KILLED || wf.getStatus() == WorkflowJob.Status.FAILED
688                                    || wf.getEndTime().getTime() > slaCalc.getExpectedEnd().getTime()) {
689                                isEndMiss = true;
690                            }
691                        }
692                        slaCalc.setActualStart(wf.getStartTime());
693                        slaCalc.setActualEnd(wf.getEndTime());
694                        slaCalc.setJobStatus(wf.getStatusStr());
695                        break;
696                    case WORKFLOW_ACTION:
697                        WorkflowActionBean wa = jpaService.execute(new WorkflowActionGetForSLAJPAExecutor(slaCalc.getId()));
698                        if (wa.getEndTime() != null) {
699                            ended = true;
700                            if (wa.isTerminalWithFailure()
701                                    || wa.getEndTime().getTime() > slaCalc.getExpectedEnd().getTime()) {
702                                isEndMiss = true;
703                            }
704                        }
705                        slaCalc.setActualStart(wa.getStartTime());
706                        slaCalc.setActualEnd(wa.getEndTime());
707                        slaCalc.setJobStatus(wa.getStatusStr());
708                        break;
709                    case COORDINATOR_ACTION:
710                        CoordinatorActionBean ca = jpaService.execute(new CoordActionGetForSLAJPAExecutor(slaCalc.getId()));
711                        if (ca.isTerminalWithFailure()) {
712                            isEndMiss = ended = true;
713                            slaCalc.setActualStart(null);
714                            slaCalc.setActualEnd(ca.getLastModifiedTime());
715                        }
716                        if (ca.getExternalId() != null) {
717                            wf = jpaService.execute(new WorkflowJobGetForSLAJPAExecutor(ca.getExternalId()));
718                            if (wf.getEndTime() != null) {
719                                ended = true;
720                                if (wf.getEndTime().getTime() > slaCalc.getExpectedEnd().getTime()) {
721                                    isEndMiss = true;
722                                }
723                            }
724                            slaCalc.setActualEnd(wf.getEndTime());
725                            slaCalc.setActualStart(wf.getStartTime());
726                        }
727                        slaCalc.setJobStatus(ca.getStatusStr());
728                        break;
729                    default:
730                        LOG.debug("Unsupported App-type for SLA - " + slaCalc.getAppType());
731                }
732    
733                byte eventProc = slaCalc.getEventProcessed();
734                if (ended) {
735                    if (isEndMiss) {
736                        slaCalc.setSLAStatus(SLAStatus.MISS);
737                    }
738                    else {
739                        slaCalc.setSLAStatus(SLAStatus.MET);
740                    }
741                    if (slaCalc.getActualStart() != null) {
742                        if ((eventProc & 1) == 0) {
743                            if (slaCalc.getExpectedStart().getTime() < slaCalc.getActualStart().getTime()) {
744                                slaCalc.setEventStatus(EventStatus.START_MISS);
745                            }
746                            else {
747                                slaCalc.setEventStatus(EventStatus.START_MET);
748                            }
749                            eventHandler.queueEvent(new SLACalcStatus(slaCalc));
750                        }
751                        slaCalc.setActualDuration(slaCalc.getActualEnd().getTime() - slaCalc.getActualStart().getTime());
752                        if (((eventProc >> 1) & 1) == 0) {
753                            processDurationSLA(slaCalc.getExpectedDuration(), slaCalc.getActualDuration(), slaCalc);
754                        }
755                    }
756                    if (eventProc < 4) {
757                        if (isEndMiss) {
758                            slaCalc.setEventStatus(EventStatus.END_MISS);
759                        }
760                        else {
761                            slaCalc.setEventStatus(EventStatus.END_MET);
762                        }
763                        eventHandler.queueEvent(new SLACalcStatus(slaCalc));
764                    }
765                    slaCalc.setEventProcessed(8);
766                }
767                else {
768                    if (slaCalc.getActualStart() != null) {
769                        slaCalc.setSLAStatus(SLAStatus.IN_PROCESS);
770                    }
771                    if ((eventProc & 1) == 0) {
772                        if (slaCalc.getActualStart() != null) {
773                            if (slaCalc.getExpectedStart().getTime() < slaCalc.getActualStart().getTime()) {
774                                slaCalc.setEventStatus(EventStatus.START_MISS);
775                            }
776                            else {
777                                slaCalc.setEventStatus(EventStatus.START_MET);
778                            }
779                            eventHandler.queueEvent(new SLACalcStatus(slaCalc));
780                            eventProc++;
781                        }
782                        else if (slaCalc.getExpectedStart().getTime() < System.currentTimeMillis()) {
783                            slaCalc.setEventStatus(EventStatus.START_MISS);
784                            eventHandler.queueEvent(new SLACalcStatus(slaCalc));
785                            eventProc++;
786                        }
787                    }
788                    if (((eventProc >> 1) & 1) == 0 && slaCalc.getActualStart() != null
789                            && slaCalc.getExpectedDuration() != -1) {
790                        if (System.currentTimeMillis() - slaCalc.getActualStart().getTime() > slaCalc.getExpectedDuration()) {
791                            slaCalc.setEventStatus(EventStatus.DURATION_MISS);
792                            eventHandler.queueEvent(new SLACalcStatus(slaCalc));
793                            eventProc += 2;
794                        }
795                    }
796                    if (eventProc < 4 && slaCalc.getExpectedEnd().getTime() < System.currentTimeMillis()) {
797                        slaCalc.setEventStatus(EventStatus.END_MISS);
798                        slaCalc.setSLAStatus(SLAStatus.MISS);
799                        eventHandler.queueEvent(new SLACalcStatus(slaCalc));
800                        eventProc += 4;
801                    }
802                    slaCalc.setEventProcessed(eventProc);
803                }
804            }
805            catch (Exception e) {
806                LOG.warn("Error while confirming SLA against DB for jobid= " + slaCalc.getId() + ". Exception is "
807                        + e.getClass().getName() + ": " + e.getMessage());
808                if (slaCalc.getEventProcessed() < 4 && slaCalc.getExpectedEnd().getTime() < System.currentTimeMillis()) {
809                    slaCalc.setEventStatus(EventStatus.END_MISS);
810                    slaCalc.setSLAStatus(SLAStatus.MISS);
811                    eventHandler.queueEvent(new SLACalcStatus(slaCalc));
812                    slaCalc.setEventProcessed(slaCalc.getEventProcessed() + 4);
813                }
814            }
815        }
816    
817    }