001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.sla;
019
020import java.sql.Timestamp;
021import java.util.ArrayList;
022import java.util.Collections;
023import java.util.Date;
024import java.util.HashSet;
025import java.util.Iterator;
026import java.util.List;
027import java.util.Map;
028import java.util.Set;
029import java.util.concurrent.ConcurrentHashMap;
030
031import org.apache.hadoop.conf.Configuration;
032import org.apache.oozie.AppType;
033import org.apache.oozie.CoordinatorActionBean;
034import org.apache.oozie.CoordinatorJobBean;
035import org.apache.oozie.ErrorCode;
036import org.apache.oozie.WorkflowActionBean;
037import org.apache.oozie.WorkflowJobBean;
038import org.apache.oozie.XException;
039import org.apache.oozie.client.CoordinatorAction;
040import org.apache.oozie.client.WorkflowAction;
041import org.apache.oozie.client.WorkflowJob;
042import org.apache.oozie.client.event.JobEvent;
043import org.apache.oozie.client.event.SLAEvent.EventStatus;
044import org.apache.oozie.client.event.SLAEvent.SLAStatus;
045import org.apache.oozie.client.rest.JsonBean;
046import org.apache.oozie.executor.jpa.BatchQueryExecutor;
047import org.apache.oozie.executor.jpa.CoordActionGetForSLAJPAExecutor;
048import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
049import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
050import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
051import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
052import org.apache.oozie.executor.jpa.JPAExecutorException;
053import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor;
054import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor;
055import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
056import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor.SLARegQuery;
057import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor;
058import org.apache.oozie.executor.jpa.WorkflowActionGetForSLAJPAExecutor;
059import org.apache.oozie.executor.jpa.WorkflowJobGetForSLAJPAExecutor;
060import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
061import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
062import org.apache.oozie.executor.jpa.sla.SLASummaryGetRecordsOnRestartJPAExecutor;
063import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery;
064import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
065import org.apache.oozie.lock.LockToken;
066import org.apache.oozie.service.EventHandlerService;
067import org.apache.oozie.service.JPAService;
068import org.apache.oozie.service.JobsConcurrencyService;
069import org.apache.oozie.service.MemoryLocksService;
070import org.apache.oozie.service.SchedulerService;
071import org.apache.oozie.service.ServiceException;
072import org.apache.oozie.service.Services;
073import org.apache.oozie.sla.service.SLAService;
074import org.apache.oozie.util.DateUtils;
075import org.apache.oozie.util.LogUtils;
076import org.apache.oozie.util.XLog;
077
078import com.google.common.annotations.VisibleForTesting;
079
080
081/**
082 * Implementation class for SLACalculator that calculates SLA related to
083 * start/end/duration of jobs using a memory-based map
084 */
085public class SLACalculatorMemory implements SLACalculator {
086
087    private static XLog LOG = XLog.getLog(SLACalculatorMemory.class);
088    // TODO optimization priority based insertion/processing/bumping up-down
089    protected Map<String, SLACalcStatus> slaMap;
090    protected Set<String> historySet;
091    private static int capacity;
092    private static JPAService jpaService;
093    protected EventHandlerService eventHandler;
094    private static int modifiedAfter;
095    private static long jobEventLatency;
096
097    @Override
098    public void init(Configuration conf) throws ServiceException {
099        capacity = conf.getInt(SLAService.CONF_CAPACITY, 5000);
100        jobEventLatency = conf.getInt(SLAService.CONF_JOB_EVENT_LATENCY, 90 * 1000);
101        slaMap = new ConcurrentHashMap<String, SLACalcStatus>();
102        historySet = Collections.synchronizedSet(new HashSet<String>());
103        jpaService = Services.get().get(JPAService.class);
104        eventHandler = Services.get().get(EventHandlerService.class);
105        // load events modified after
106        modifiedAfter = conf.getInt(SLAService.CONF_EVENTS_MODIFIED_AFTER, 7);
107        loadOnRestart();
108        Runnable purgeThread = new HistoryPurgeWorker();
109        // schedule runnable by default 1 day
110        Services.get()
111                .get(SchedulerService.class)
112                .schedule(purgeThread, 86400, Services.get().getConf().getInt(SLAService.CONF_SLA_HISTORY_PURGE_INTERVAL, 86400),
113                        SchedulerService.Unit.SEC);
114    }
115
116    public class HistoryPurgeWorker implements Runnable {
117
118        public HistoryPurgeWorker() {
119        }
120
121        @Override
122        public void run() {
123            if (Thread.currentThread().isInterrupted()) {
124                return;
125            }
126            Iterator<String> jobItr = historySet.iterator();
127            while (jobItr.hasNext()) {
128                String jobId = jobItr.next();
129
130                if (jobId.endsWith("-W")) {
131                    WorkflowJobBean wfJob = null;
132                    try {
133                        wfJob = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_STATUS, jobId);
134                    }
135                    catch (JPAExecutorException e) {
136                        if (e.getErrorCode().equals(ErrorCode.E0604)) {
137                            jobItr.remove();
138                        }
139                        else {
140                            LOG.info("Failed to fetch the workflow job: " + jobId, e);
141                        }
142                    }
143                    if (wfJob != null && wfJob.inTerminalState()) {
144                        try {
145                            updateSLASummary(wfJob.getId(), wfJob.getStartTime(), wfJob.getEndTime());
146                            jobItr.remove();
147                        }
148                        catch (JPAExecutorException e) {
149                            LOG.info("Failed to update SLASummaryBean when purging history set entry for " + jobId, e);
150                        }
151
152                    }
153                }
154                else if (jobId.contains("-W@")) {
155                    WorkflowActionBean wfAction = null;
156                    try {
157                        wfAction = WorkflowActionQueryExecutor.getInstance().get(
158                                WorkflowActionQuery.GET_ACTION_COMPLETED, jobId);
159                    }
160                    catch (JPAExecutorException e) {
161                        if (e.getErrorCode().equals(ErrorCode.E0605)) {
162                            jobItr.remove();
163                        }
164                        else {
165                            LOG.info("Failed to fetch the workflow action: " + jobId, e);
166                        }
167                    }
168                    if (wfAction != null && (wfAction.isComplete() || wfAction.isTerminalWithFailure())) {
169                        try {
170                            updateSLASummary(wfAction.getId(), wfAction.getStartTime(), wfAction.getEndTime());
171                            jobItr.remove();
172                        }
173                        catch (JPAExecutorException e) {
174                            LOG.info("Failed to update SLASummaryBean when purging history set entry for " + jobId, e);
175                        }
176                    }
177                }
178                else if (jobId.contains("-C@")) {
179                    CoordinatorActionBean cAction = null;
180                    try {
181                        cAction = CoordActionQueryExecutor.getInstance().get(CoordActionQuery.GET_COORD_ACTION, jobId);
182                    }
183                    catch (JPAExecutorException e) {
184                        if (e.getErrorCode().equals(ErrorCode.E0605)) {
185                            jobItr.remove();
186                        }
187                        else {
188                            LOG.info("Failed to fetch the coord action: " + jobId, e);
189                        }
190                    }
191                    if (cAction != null && cAction.isTerminalStatus()) {
192                        try {
193                            updateSLASummaryForCoordAction(cAction);
194                            jobItr.remove();
195                        }
196                        catch (JPAExecutorException e) {
197                            XLog.getLog(SLACalculatorMemory.class).info(
198                                    "Failed to update SLASummaryBean when purging history set entry for " + jobId, e);
199                        }
200
201                    }
202                }
203                else if (jobId.endsWith("-C")) {
204                    CoordinatorJobBean cJob = null;
205                    try {
206                        cJob = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB_STATUS_PARENTID,
207                                jobId);
208                    }
209                    catch (JPAExecutorException e) {
210                        if (e.getErrorCode().equals(ErrorCode.E0604)) {
211                            jobItr.remove();
212                        }
213                        else {
214                            LOG.info("Failed to fetch the coord job: " + jobId, e);
215                        }
216                    }
217                    if (cJob != null && cJob.isTerminalStatus()) {
218                        try {
219                            updateSLASummary(cJob.getId(), cJob.getStartTime(), cJob.getEndTime());
220                            jobItr.remove();
221                        }
222                        catch (JPAExecutorException e) {
223                            LOG.info("Failed to update SLASummaryBean when purging history set entry for " + jobId, e);
224                        }
225
226                    }
227                }
228            }
229        }
230
231        private void updateSLASummary(String id, Date startTime, Date endTime) throws JPAExecutorException {
232            SLASummaryBean sla = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, id);
233            if (sla != null) {
234                sla.setActualStart(startTime);
235                sla.setActualEnd(endTime);
236                if (startTime != null && endTime != null) {
237                    sla.setActualDuration(endTime.getTime() - startTime.getTime());
238                }
239                sla.setLastModifiedTime(new Date());
240                sla.setEventProcessed(8);
241                SLASummaryQueryExecutor.getInstance().executeUpdate(
242                        SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_ACTUAL_TIMES, sla);
243            }
244        }
245
246        private void updateSLASummaryForCoordAction(CoordinatorActionBean bean) throws JPAExecutorException {
247            String wrkflowId = bean.getExternalId();
248            if (wrkflowId != null) {
249                WorkflowJobBean wrkflow = WorkflowJobQueryExecutor.getInstance().get(
250                        WorkflowJobQuery.GET_WORKFLOW_START_END_TIME, wrkflowId);
251                if (wrkflow != null) {
252                    updateSLASummary(bean.getId(), wrkflow.getStartTime(), wrkflow.getEndTime());
253                }
254            }
255        }
256    }
257
258    private void loadOnRestart() {
259        boolean isJobModified = false;
260        try {
261            long slaPendingCount = 0;
262            long statusPendingCount = 0;
263            List<SLASummaryBean> summaryBeans = jpaService.execute(new SLASummaryGetRecordsOnRestartJPAExecutor(
264                    modifiedAfter));
265            for (SLASummaryBean summaryBean : summaryBeans) {
266                String jobId = summaryBean.getId();
267                LockToken lock = null;
268                switch (summaryBean.getAppType()) {
269                    case COORDINATOR_ACTION:
270                        isJobModified = processSummaryBeanForCoordAction(summaryBean, jobId);
271                        break;
272                    case WORKFLOW_ACTION:
273                        isJobModified = processSummaryBeanForWorkflowAction(summaryBean, jobId);
274                        break;
275                    case WORKFLOW_JOB:
276                        isJobModified = processSummaryBeanForWorkflowJob(summaryBean, jobId);
277                        break;
278                    default:
279                        break;
280                }
281                if (isJobModified) {
282                    try {
283                        boolean update = true;
284                        if (Services.get().get(JobsConcurrencyService.class).isHighlyAvailableMode()) {
285                            lock = Services
286                                    .get()
287                                    .get(MemoryLocksService.class)
288                                    .getWriteLock(
289                                            SLACalcStatus.SLA_ENTITYKEY_PREFIX + jobId,
290                                            Services.get().getConf()
291                                                    .getLong(SLAService.CONF_SLA_CALC_LOCK_TIMEOUT, 5 * 1000));
292                            if (lock == null) {
293                                update = false;
294                            }
295                        }
296                        if (update) {
297                            summaryBean.setLastModifiedTime(new Date());
298                            SLASummaryQueryExecutor.getInstance().executeUpdate(
299                                    SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES, summaryBean);
300                        }
301                    }
302                    catch (Exception e) {
303                        LOG.warn("Failed to load records for " + jobId, e);
304                    }
305                    finally {
306                        if (lock != null) {
307                            lock.release();
308                            lock = null;
309                        }
310                    }
311                }
312                try {
313                    if (summaryBean.getEventProcessed() == 7) {
314                        historySet.add(jobId);
315                        statusPendingCount++;
316                    }
317                    else if (summaryBean.getEventProcessed() <= 7) {
318                        SLARegistrationBean slaRegBean = SLARegistrationQueryExecutor.getInstance().get(
319                                SLARegQuery.GET_SLA_REG_ON_RESTART, jobId);
320                        SLACalcStatus slaCalcStatus = new SLACalcStatus(summaryBean, slaRegBean);
321                        slaMap.put(jobId, slaCalcStatus);
322                        slaPendingCount++;
323                    }
324                }
325                catch (Exception e) {
326                    LOG.warn("Failed to fetch/update records for " + jobId, e);
327                }
328
329            }
330            LOG.info("Loaded SLASummary pendingSLA=" + slaPendingCount + ", pendingStatusUpdate=" + statusPendingCount);
331
332        }
333        catch (Exception e) {
334            LOG.warn("Failed to retrieve SLASummary records on restart", e);
335        }
336    }
337
338    private boolean processSummaryBeanForCoordAction(SLASummaryBean summaryBean, String jobId)
339            throws JPAExecutorException {
340        boolean isJobModified = false;
341        CoordinatorActionBean coordAction = null;
342        coordAction = jpaService.execute(new CoordActionGetForSLAJPAExecutor(jobId));
343        if (!coordAction.getStatusStr().equals(summaryBean.getJobStatus())) {
344            LOG.trace("Coordinator action status is " + coordAction.getStatusStr() + " and summary bean status is "
345                    + summaryBean.getJobStatus());
346            isJobModified = true;
347            summaryBean.setJobStatus(coordAction.getStatusStr());
348            if (coordAction.isTerminalStatus()) {
349                WorkflowJobBean wfJob = jpaService.execute(new WorkflowJobGetForSLAJPAExecutor(coordAction
350                        .getExternalId()));
351                setEndForSLASummaryBean(summaryBean, wfJob.getStartTime(), coordAction.getLastModifiedTime(),
352                        coordAction.getStatusStr());
353            }
354            else if (coordAction.getStatus() != CoordinatorAction.Status.WAITING) {
355                WorkflowJobBean wfJob = jpaService.execute(new WorkflowJobGetForSLAJPAExecutor(coordAction
356                        .getExternalId()));
357                setStartForSLASummaryBean(summaryBean, summaryBean.getEventProcessed(), wfJob.getStartTime());
358            }
359        }
360        return isJobModified;
361    }
362
363    private boolean processSummaryBeanForWorkflowAction(SLASummaryBean summaryBean, String jobId)
364            throws JPAExecutorException {
365        boolean isJobModified = false;
366        WorkflowActionBean wfAction = null;
367        wfAction = jpaService.execute(new WorkflowActionGetForSLAJPAExecutor(jobId));
368        if (!wfAction.getStatusStr().equals(summaryBean.getJobStatus())) {
369            LOG.trace("Workflow action status is " + wfAction.getStatusStr() + "and summary bean status is "
370                    + summaryBean.getJobStatus());
371            isJobModified = true;
372            summaryBean.setJobStatus(wfAction.getStatusStr());
373            if (wfAction.inTerminalState()) {
374                setEndForSLASummaryBean(summaryBean, wfAction.getStartTime(), wfAction.getEndTime(), wfAction.getStatusStr());
375            }
376            else if (wfAction.getStatus() != WorkflowAction.Status.PREP) {
377                setStartForSLASummaryBean(summaryBean, summaryBean.getEventProcessed(), wfAction.getStartTime());
378            }
379        }
380        return isJobModified;
381    }
382
383    private boolean processSummaryBeanForWorkflowJob(SLASummaryBean summaryBean, String jobId)
384            throws JPAExecutorException {
385        boolean isJobModified = false;
386        WorkflowJobBean wfJob = null;
387        wfJob = jpaService.execute(new WorkflowJobGetForSLAJPAExecutor(jobId));
388        if (!wfJob.getStatusStr().equals(summaryBean.getJobStatus())) {
389            LOG.trace("Workflow job status is " + wfJob.getStatusStr() + "and summary bean status is "
390                    + summaryBean.getJobStatus());
391            isJobModified = true;
392            summaryBean.setJobStatus(wfJob.getStatusStr());
393            if (wfJob.inTerminalState()) {
394                setEndForSLASummaryBean(summaryBean, wfJob.getStartTime(), wfJob.getEndTime(), wfJob.getStatusStr());
395            }
396            else if (wfJob.getStatus() != WorkflowJob.Status.PREP) {
397                setStartForSLASummaryBean(summaryBean, summaryBean.getEventProcessed(), wfJob.getStartTime());
398            }
399        }
400        return isJobModified;
401    }
402
403    private void setEndForSLASummaryBean(SLASummaryBean summaryBean, Date startTime, Date endTime, String status) {
404        byte eventProc = summaryBean.getEventProcessed();
405        summaryBean.setEventProcessed(8);
406        summaryBean.setActualStart(startTime);
407        summaryBean.setActualEnd(endTime);
408        long actualDuration = endTime.getTime() - startTime.getTime();
409        summaryBean.setActualDuration(actualDuration);
410        if (eventProc < 4) {
411            if (status.equals(WorkflowJob.Status.SUCCEEDED.name()) || status.equals(WorkflowAction.Status.OK.name())
412                    || status.equals(CoordinatorAction.Status.SUCCEEDED.name())) {
413                if (endTime.getTime() <= summaryBean.getExpectedEnd().getTime()) {
414                    summaryBean.setSLAStatus(SLAStatus.MET);
415                }
416                else {
417                    summaryBean.setSLAStatus(SLAStatus.MISS);
418                }
419            }
420            else {
421                summaryBean.setSLAStatus(SLAStatus.MISS);
422            }
423        }
424
425    }
426
427    private void setStartForSLASummaryBean(SLASummaryBean summaryBean, byte eventProc, Date startTime) {
428        if (((eventProc & 1) == 0)) {
429            eventProc += 1;
430            summaryBean.setEventProcessed(eventProc);
431        }
432        if (summaryBean.getSLAStatus().equals(SLAStatus.NOT_STARTED)) {
433            summaryBean.setSLAStatus(SLAStatus.IN_PROCESS);
434        }
435        summaryBean.setActualStart(startTime);
436    }
437
438    @Override
439    public int size() {
440        return slaMap.size();
441    }
442
443    @Override
444    public SLACalcStatus get(String jobId) throws JPAExecutorException {
445        SLACalcStatus memObj;
446        memObj = slaMap.get(jobId);
447        if (memObj == null && historySet.contains(jobId)) {
448            memObj = new SLACalcStatus(SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId),
449                    SLARegistrationQueryExecutor.getInstance().get(SLARegQuery.GET_SLA_REG_ON_RESTART, jobId));
450        }
451        return memObj;
452    }
453
454    @Override
455    public Iterator<String> iterator() {
456        return slaMap.keySet().iterator();
457    }
458
459    @Override
460    public boolean isEmpty() {
461        return slaMap.isEmpty();
462    }
463
464    @Override
465    public void clear() {
466        slaMap.clear();
467        historySet.clear();
468    }
469
470    /**
471     * Invoked via periodic run, update the SLA for registered jobs
472     */
473    protected void updateJobSla(String jobId) throws Exception {
474        SLACalcStatus slaCalc = slaMap.get(jobId);
475        synchronized (slaCalc) {
476            boolean change = false;
477            // get eventProcessed on DB for validation in HA
478            Object eventProcObj = ((SLASummaryQueryExecutor) SLASummaryQueryExecutor.getInstance()).getSingleValue(
479                    SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED, jobId);
480            byte eventProc = ((Byte) eventProcObj).byteValue();
481            if (eventProc >= 7) {
482                if (eventProc == 7) {
483                    historySet.add(jobId);
484                }
485                slaMap.remove(jobId);
486                LOG.trace("Removed Job [{0}] from map as SLA processed", jobId);
487            }
488            else {
489                slaCalc.setEventProcessed(eventProc);
490                SLARegistrationBean reg = slaCalc.getSLARegistrationBean();
491                // calculation w.r.t current time and status
492                if ((eventProc & 1) == 0) { // first bit (start-processed) unset
493                    if (reg.getExpectedStart() != null) {
494                        if (reg.getExpectedStart().getTime() + jobEventLatency < System.currentTimeMillis()) {
495                            confirmWithDB(slaCalc);
496                            eventProc = slaCalc.getEventProcessed();
497                            if (eventProc != 8 && (eventProc & 1) == 0) {
498                                // Some DB exception
499                                slaCalc.setEventStatus(EventStatus.START_MISS);
500                                eventHandler.queueEvent(new SLACalcStatus(slaCalc));
501                                eventProc++;
502                            }
503                            change = true;
504                        }
505                    }
506                    else {
507                        eventProc++; // disable further processing for optional start sla condition
508                        change = true;
509                    }
510                }
511                // check if second bit (duration-processed) is unset
512                if (eventProc != 8 && ((eventProc >> 1) & 1) == 0) {
513                    if (reg.getExpectedDuration() == -1) {
514                        eventProc += 2;
515                        change = true;
516                    }
517                    else if (slaCalc.getActualStart() != null) {
518                        if ((reg.getExpectedDuration() + jobEventLatency) < (System.currentTimeMillis() - slaCalc
519                                .getActualStart().getTime())) {
520                            slaCalc.setEventProcessed(eventProc);
521                            confirmWithDB(slaCalc);
522                            eventProc = slaCalc.getEventProcessed();
523                            if (eventProc != 8 && ((eventProc >> 1) & 1) == 0) {
524                                // Some DB exception
525                                slaCalc.setEventStatus(EventStatus.DURATION_MISS);
526                                eventHandler.queueEvent(new SLACalcStatus(slaCalc));
527                                eventProc += 2;
528                            }
529                            change = true;
530                        }
531                    }
532                }
533                if (eventProc < 4) {
534                    if (reg.getExpectedEnd().getTime() + jobEventLatency < System.currentTimeMillis()) {
535                        slaCalc.setEventProcessed(eventProc);
536                        confirmWithDB(slaCalc);
537                        eventProc = slaCalc.getEventProcessed();
538                        change = true;
539                    }
540                }
541                if (change) {
542                    try {
543                        boolean locked = true;
544                        slaCalc.acquireLock();
545                        locked = slaCalc.isLocked();
546                        if (locked) {
547                            // no more processing, no transfer to history set
548                            if (slaCalc.getEventProcessed() >= 8) {
549                                eventProc = 8;
550                                // Should not be > 8. But to handle any corner cases
551                                slaCalc.setEventProcessed(8);
552                                slaMap.remove(jobId);
553                            }
554                            else {
555                                slaCalc.setEventProcessed(eventProc);
556                            }
557                            SLASummaryBean slaSummaryBean = new SLASummaryBean();
558                            slaSummaryBean.setId(slaCalc.getId());
559                            slaSummaryBean.setEventProcessed(eventProc);
560                            slaSummaryBean.setSLAStatus(slaCalc.getSLAStatus());
561                            slaSummaryBean.setEventStatus(slaCalc.getEventStatus());
562                            slaSummaryBean.setActualEnd(slaCalc.getActualEnd());
563                            slaSummaryBean.setActualStart(slaCalc.getActualStart());
564                            slaSummaryBean.setActualDuration(slaCalc.getActualDuration());
565                            slaSummaryBean.setJobStatus(slaCalc.getJobStatus());
566                            slaSummaryBean.setLastModifiedTime(new Date());
567                            SLASummaryQueryExecutor.getInstance().executeUpdate(
568                                    SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES, slaSummaryBean);
569                            if (eventProc == 7) {
570                                historySet.add(jobId);
571                                slaMap.remove(jobId);
572                                LOG.trace("Removed Job [{0}] from map after End-processed", jobId);
573                            }
574                        }
575                    }
576                    catch (InterruptedException e) {
577                        throw new XException(ErrorCode.E0606, slaCalc.getId(), slaCalc.getLockTimeOut());
578                    }
579                    finally {
580                        slaCalc.releaseLock();
581                    }
582                }
583            }
584        }
585    }
586
587    /**
588     * Periodically run by the SLAService worker threads to update SLA status by
589     * iterating through all the jobs in the map
590     */
591    @Override
592    public void updateAllSlaStatus() {
593        LOG.info("Running periodic SLA check");
594        Iterator<String> iterator = slaMap.keySet().iterator();
595        while (iterator.hasNext()) {
596            String jobId = iterator.next();
597            try {
598                LOG.trace("Processing SLA for jobid={0}", jobId);
599                updateJobSla(jobId);
600            }
601            catch (Exception e) {
602                setLogPrefix(jobId);
603                LOG.error("Exception in SLA processing for job [{0}]", jobId, e);
604                LogUtils.clearLogPrefix();
605            }
606        }
607    }
608
609    /**
610     * Register a new job into the map for SLA tracking
611     */
612    @Override
613    public boolean addRegistration(String jobId, SLARegistrationBean reg) throws JPAExecutorException {
614        try {
615            if (slaMap.size() < capacity) {
616                SLACalcStatus slaCalc = new SLACalcStatus(reg);
617                slaCalc.setSLAStatus(SLAStatus.NOT_STARTED);
618                slaCalc.setJobStatus(getJobStatus(reg.getAppType()));
619                slaMap.put(jobId, slaCalc);
620                List<JsonBean> insertList = new ArrayList<JsonBean>();
621                final SLASummaryBean summaryBean = new SLASummaryBean(slaCalc);
622                final Timestamp currentTime = DateUtils.convertDateToTimestamp(new Date());
623                reg.setCreatedTimestamp(currentTime);
624                summaryBean.setCreatedTimestamp(currentTime);
625                insertList.add(reg);
626                insertList.add(summaryBean);
627                BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, null, null);
628                LOG.trace("SLA Registration Event - Job:" + jobId);
629                return true;
630            }
631            else {
632                setLogPrefix(reg.getId());
633                LOG.error(
634                        "SLACalculator memory capacity reached. Cannot add or update new SLA Registration entry for job [{0}]",
635                        reg.getId());
636                LogUtils.clearLogPrefix();
637            }
638        }
639        catch (JPAExecutorException jpa) {
640            throw jpa;
641        }
642        return false;
643    }
644
645    private String getJobStatus(AppType appType) {
646        String status = null;
647        switch (appType) {
648            case COORDINATOR_ACTION:
649                status = CoordinatorAction.Status.WAITING.name();
650                break;
651            case WORKFLOW_ACTION:
652                status = WorkflowAction.Status.PREP.name();
653                break;
654            case WORKFLOW_JOB:
655                status = WorkflowJob.Status.PREP.name();
656                break;
657            default:
658                break;
659        }
660        return status;
661    }
662
663    /**
664     * Update job into the map for SLA tracking
665     */
666    @Override
667    public boolean updateRegistration(String jobId, SLARegistrationBean reg) throws JPAExecutorException {
668        try {
669            if (slaMap.size() < capacity) {
670                SLACalcStatus slaCalc = new SLACalcStatus(reg);
671                slaCalc.setSLAStatus(SLAStatus.NOT_STARTED);
672                slaCalc.setJobStatus(getJobStatus(reg.getAppType()));
673                slaMap.put(jobId, slaCalc);
674                List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
675                updateList.add(new UpdateEntry<SLARegQuery>(SLARegQuery.UPDATE_SLA_REG_ALL, reg));
676                updateList.add(new UpdateEntry<SLASummaryQuery>(SLASummaryQuery.UPDATE_SLA_SUMMARY_ALL,
677                        new SLASummaryBean(slaCalc)));
678                BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null);
679                LOG.trace("SLA Registration Event - Job:" + jobId);
680                return true;
681            }
682            else {
683                setLogPrefix(reg.getId());
684                LOG.error(
685                        "SLACalculator memory capacity reached. Cannot add or update new SLA Registration entry for job [{0}]",
686                        reg.getId());
687                LogUtils.clearLogPrefix();
688            }
689        }
690        catch (JPAExecutorException jpa) {
691            throw jpa;
692        }
693        return false;
694    }
695
696    /**
697     * Remove job from being tracked in map
698     */
699    @Override
700    public void removeRegistration(String jobId) {
701        if (slaMap.remove(jobId) == null) {
702            historySet.remove(jobId);
703        }
704    }
705
706    /**
707     * Triggered after receiving Job status change event, update SLA status
708     * accordingly
709     */
710    @Override
711    public boolean addJobStatus(String jobId, String jobStatus, JobEvent.EventStatus jobEventStatus, Date startTime,
712            Date endTime) throws JPAExecutorException, ServiceException {
713        SLACalcStatus slaCalc = slaMap.get(jobId);
714        SLASummaryBean slaInfo = null;
715        boolean hasSla = false;
716        if (slaCalc == null) {
717            if (historySet.contains(jobId)) {
718                slaInfo = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
719                if (slaInfo == null) {
720                    throw new JPAExecutorException(ErrorCode.E0604, jobId);
721                }
722                slaInfo.setJobStatus(jobStatus);
723                slaInfo.setActualStart(startTime);
724                slaInfo.setActualEnd(endTime);
725                if (endTime != null) {
726                    slaInfo.setActualDuration(endTime.getTime() - startTime.getTime());
727                }
728                slaInfo.setEventProcessed(8);
729                historySet.remove(jobId);
730                slaInfo.setLastModifiedTime(new Date());
731                SLASummaryQueryExecutor.getInstance().executeUpdate(
732                        SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES, slaInfo);
733                hasSla = true;
734            }
735            else if (Services.get().get(JobsConcurrencyService.class).isHighlyAvailableMode()) {
736                // jobid might not exist in slaMap in HA Setting
737                SLARegistrationBean slaRegBean = SLARegistrationQueryExecutor.getInstance().get(
738                        SLARegQuery.GET_SLA_REG_ALL, jobId);
739                if (slaRegBean != null) { // filter out jobs picked by SLA job event listener
740                                          // but not actually configured for SLA
741                    SLASummaryBean slaSummaryBean = SLASummaryQueryExecutor.getInstance().get(
742                            SLASummaryQuery.GET_SLA_SUMMARY, jobId);
743                    if (slaSummaryBean.getEventProcessed() < 7) {
744                        slaCalc = new SLACalcStatus(slaSummaryBean, slaRegBean);
745                        slaMap.put(jobId, slaCalc);
746                    }
747                }
748            }
749        }
750        if (slaCalc != null) {
751            synchronized (slaCalc) {
752                try {
753                    // only get ZK lock when multiple servers running
754                    boolean locked = true;
755                    slaCalc.acquireLock();
756                    locked = slaCalc.isLocked();
757                    if (locked) {
758                        // get eventProcessed on DB for validation in HA
759                        Object eventProcObj = ((SLASummaryQueryExecutor) SLASummaryQueryExecutor.getInstance())
760                                .getSingleValue(SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED, jobId);
761                        byte eventProc = ((Byte) eventProcObj).byteValue();
762                        slaCalc.setEventProcessed(eventProc);
763                        slaCalc.setJobStatus(jobStatus);
764                        switch (jobEventStatus) {
765                            case STARTED:
766                                slaInfo = processJobStartSLA(slaCalc, startTime);
767                                break;
768                            case SUCCESS:
769                                slaInfo = processJobEndSuccessSLA(slaCalc, startTime, endTime);
770                                break;
771                            case FAILURE:
772                                slaInfo = processJobEndFailureSLA(slaCalc, startTime, endTime);
773                                break;
774                            default:
775                                LOG.debug("Unknown Job Status for SLA purpose[{0}]", jobEventStatus);
776                                slaInfo = getSLASummaryBean(slaCalc);
777                        }
778                        if (slaCalc.getEventProcessed() == 7) {
779                            slaInfo.setEventProcessed(8);
780                            slaMap.remove(jobId);
781                        }
782                        slaInfo.setLastModifiedTime(new Date());
783                        SLASummaryQueryExecutor.getInstance().executeUpdate(
784                                SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES, slaInfo);
785                        hasSla = true;
786                    }
787                }
788                catch (InterruptedException e) {
789                    throw new ServiceException(ErrorCode.E0606, slaCalc.getEntityKey(), slaCalc.getLockTimeOut());
790                }
791                finally {
792                    slaCalc.releaseLock();
793                }
794            }
795            LOG.trace("SLA Status Event - Job:" + jobId + " Status:" + slaCalc.getSLAStatus());
796        }
797
798        return hasSla;
799    }
800
801    /**
802     * Process SLA for jobs that started running. Also update actual-start time
803     *
804     * @param slaCalc
805     * @param actualStart
806     * @return SLASummaryBean
807     */
808    private SLASummaryBean processJobStartSLA(SLACalcStatus slaCalc, Date actualStart) {
809        slaCalc.setActualStart(actualStart);
810        if (slaCalc.getSLAStatus().equals(SLAStatus.NOT_STARTED)) {
811            slaCalc.setSLAStatus(SLAStatus.IN_PROCESS);
812        }
813        SLARegistrationBean reg = slaCalc.getSLARegistrationBean();
814        Date expecStart = reg.getExpectedStart();
815        byte eventProc = slaCalc.getEventProcessed();
816        // set event proc here
817        if (((eventProc & 1) == 0)) {
818            if (expecStart != null) {
819                if (actualStart.getTime() > expecStart.getTime()) {
820                    slaCalc.setEventStatus(EventStatus.START_MISS);
821                }
822                else {
823                    slaCalc.setEventStatus(EventStatus.START_MET);
824                }
825                eventHandler.queueEvent(new SLACalcStatus(slaCalc));
826            }
827            eventProc += 1;
828            slaCalc.setEventProcessed(eventProc);
829        }
830        return getSLASummaryBean(slaCalc);
831    }
832
833    /**
834     * Process SLA for jobs that ended successfully. Also update actual-start
835     * and end time
836     *
837     * @param slaCalc
838     * @param actualStart
839     * @param actualEnd
840     * @return SLASummaryBean
841     * @throws JPAExecutorException
842     */
843    private SLASummaryBean processJobEndSuccessSLA(SLACalcStatus slaCalc, Date actualStart, Date actualEnd) throws JPAExecutorException {
844        SLARegistrationBean reg = slaCalc.getSLARegistrationBean();
845        slaCalc.setActualStart(actualStart);
846        slaCalc.setActualEnd(actualEnd);
847        long expectedDuration = reg.getExpectedDuration();
848        long actualDuration = actualEnd.getTime() - actualStart.getTime();
849        slaCalc.setActualDuration(actualDuration);
850        //check event proc
851        byte eventProc = slaCalc.getEventProcessed();
852        if (((eventProc >> 1) & 1) == 0) {
853            processDurationSLA(expectedDuration, actualDuration, slaCalc);
854            eventProc += 2;
855            slaCalc.setEventProcessed(eventProc);
856        }
857
858        if (eventProc < 4) {
859            Date expectedEnd = reg.getExpectedEnd();
860            if (actualEnd.getTime() > expectedEnd.getTime()) {
861                slaCalc.setEventStatus(EventStatus.END_MISS);
862                slaCalc.setSLAStatus(SLAStatus.MISS);
863            }
864            else {
865                slaCalc.setEventStatus(EventStatus.END_MET);
866                slaCalc.setSLAStatus(SLAStatus.MET);
867            }
868            eventProc += 4;
869            slaCalc.setEventProcessed(eventProc);
870            eventHandler.queueEvent(new SLACalcStatus(slaCalc));
871        }
872        return getSLASummaryBean(slaCalc);
873    }
874
875    /**
876     * Process SLA for jobs that ended in failure. Also update actual-start and
877     * end time
878     *
879     * @param slaCalc
880     * @param actualStart
881     * @param actualEnd
882     * @return SLASummaryBean
883     * @throws JPAExecutorException
884     */
885    private SLASummaryBean processJobEndFailureSLA(SLACalcStatus slaCalc, Date actualStart, Date actualEnd) throws JPAExecutorException {
886        slaCalc.setActualStart(actualStart);
887        slaCalc.setActualEnd(actualEnd);
888        if (actualStart == null) { // job failed before starting
889            if (slaCalc.getEventProcessed() < 4) {
890                slaCalc.setEventStatus(EventStatus.END_MISS);
891                slaCalc.setSLAStatus(SLAStatus.MISS);
892                eventHandler.queueEvent(new SLACalcStatus(slaCalc));
893                slaCalc.setEventProcessed(7);
894                return getSLASummaryBean(slaCalc);
895            }
896        }
897        SLARegistrationBean reg = slaCalc.getSLARegistrationBean();
898        long expectedDuration = reg.getExpectedDuration();
899        long actualDuration = actualEnd.getTime() - actualStart.getTime();
900        slaCalc.setActualDuration(actualDuration);
901
902        byte eventProc = slaCalc.getEventProcessed();
903        if (((eventProc >> 1) & 1) == 0) {
904            if (expectedDuration != -1) {
905                slaCalc.setEventStatus(EventStatus.DURATION_MISS);
906                eventHandler.queueEvent(new SLACalcStatus(slaCalc));
907            }
908            eventProc += 2;
909            slaCalc.setEventProcessed(eventProc);
910        }
911        if (eventProc < 4) {
912            slaCalc.setEventStatus(EventStatus.END_MISS);
913            slaCalc.setSLAStatus(SLAStatus.MISS);
914            eventProc += 4;
915            slaCalc.setEventProcessed(eventProc);
916            eventHandler.queueEvent(new SLACalcStatus(slaCalc));
917        }
918        return getSLASummaryBean(slaCalc);
919    }
920
921    private SLASummaryBean getSLASummaryBean (SLACalcStatus slaCalc) {
922        SLASummaryBean slaSummaryBean = new SLASummaryBean();
923        slaSummaryBean.setActualStart(slaCalc.getActualStart());
924        slaSummaryBean.setActualEnd(slaCalc.getActualEnd());
925        slaSummaryBean.setActualDuration(slaCalc.getActualDuration());
926        slaSummaryBean.setSLAStatus(slaCalc.getSLAStatus());
927        slaSummaryBean.setEventStatus(slaCalc.getEventStatus());
928        slaSummaryBean.setEventProcessed(slaCalc.getEventProcessed());
929        slaSummaryBean.setId(slaCalc.getId());
930        slaSummaryBean.setJobStatus(slaCalc.getJobStatus());
931        return slaSummaryBean;
932    }
933
934    private void processDurationSLA(long expected, long actual, SLACalcStatus slaCalc) {
935        if (expected != -1 && actual > expected) {
936            slaCalc.setEventStatus(EventStatus.DURATION_MISS);
937            eventHandler.queueEvent(new SLACalcStatus(slaCalc));
938        }
939        else if (expected != -1 && actual <= expected) {
940            slaCalc.setEventStatus(EventStatus.DURATION_MET);
941            eventHandler.queueEvent(new SLACalcStatus(slaCalc));
942        }
943    }
944
945    /*
946     * Confirm alerts against source of truth - DB. Also required in case of High Availability
947     */
948    private void confirmWithDB(SLACalcStatus slaCalc) {
949        boolean ended = false, isEndMiss = false;
950        try {
951            switch (slaCalc.getAppType()) {
952                case WORKFLOW_JOB:
953                    WorkflowJobBean wf = jpaService.execute(new WorkflowJobGetForSLAJPAExecutor(slaCalc.getId()));
954                    if (wf.getEndTime() != null) {
955                        ended = true;
956                        if (wf.getStatus() == WorkflowJob.Status.KILLED || wf.getStatus() == WorkflowJob.Status.FAILED
957                                || wf.getEndTime().getTime() > slaCalc.getExpectedEnd().getTime()) {
958                            isEndMiss = true;
959                        }
960                    }
961                    slaCalc.setActualStart(wf.getStartTime());
962                    slaCalc.setActualEnd(wf.getEndTime());
963                    slaCalc.setJobStatus(wf.getStatusStr());
964                    break;
965                case WORKFLOW_ACTION:
966                    WorkflowActionBean wa = jpaService.execute(new WorkflowActionGetForSLAJPAExecutor(slaCalc.getId()));
967                    if (wa.getEndTime() != null) {
968                        ended = true;
969                        if (wa.isTerminalWithFailure()
970                                || wa.getEndTime().getTime() > slaCalc.getExpectedEnd().getTime()) {
971                            isEndMiss = true;
972                        }
973                    }
974                    slaCalc.setActualStart(wa.getStartTime());
975                    slaCalc.setActualEnd(wa.getEndTime());
976                    slaCalc.setJobStatus(wa.getStatusStr());
977                    break;
978                case COORDINATOR_ACTION:
979                    CoordinatorActionBean ca = jpaService.execute(new CoordActionGetForSLAJPAExecutor(slaCalc.getId()));
980                    if (ca.isTerminalWithFailure()) {
981                        isEndMiss = ended = true;
982                        slaCalc.setActualEnd(ca.getLastModifiedTime());
983                    }
984                    if (ca.getExternalId() != null) {
985                        wf = jpaService.execute(new WorkflowJobGetForSLAJPAExecutor(ca.getExternalId()));
986                        if (wf.getEndTime() != null) {
987                            ended = true;
988                            if (wf.getEndTime().getTime() > slaCalc.getExpectedEnd().getTime()) {
989                                isEndMiss = true;
990                            }
991                        }
992                        slaCalc.setActualEnd(wf.getEndTime());
993                        slaCalc.setActualStart(wf.getStartTime());
994                    }
995                    slaCalc.setJobStatus(ca.getStatusStr());
996                    break;
997                default:
998                    LOG.debug("Unsupported App-type for SLA - " + slaCalc.getAppType());
999            }
1000
1001            byte eventProc = slaCalc.getEventProcessed();
1002            if (ended) {
1003                if (isEndMiss) {
1004                    slaCalc.setSLAStatus(SLAStatus.MISS);
1005                }
1006                else {
1007                    slaCalc.setSLAStatus(SLAStatus.MET);
1008                }
1009                if (slaCalc.getActualStart() != null) {
1010                    if ((eventProc & 1) == 0) {
1011                        if (slaCalc.getExpectedStart().getTime() < slaCalc.getActualStart().getTime()) {
1012                            slaCalc.setEventStatus(EventStatus.START_MISS);
1013                        }
1014                        else {
1015                            slaCalc.setEventStatus(EventStatus.START_MET);
1016                        }
1017                        eventHandler.queueEvent(new SLACalcStatus(slaCalc));
1018                    }
1019                    slaCalc.setActualDuration(slaCalc.getActualEnd().getTime() - slaCalc.getActualStart().getTime());
1020                    if (((eventProc >> 1) & 1) == 0) {
1021                        processDurationSLA(slaCalc.getExpectedDuration(), slaCalc.getActualDuration(), slaCalc);
1022                    }
1023                }
1024                if (eventProc < 4) {
1025                    if (isEndMiss) {
1026                        slaCalc.setEventStatus(EventStatus.END_MISS);
1027                    }
1028                    else {
1029                        slaCalc.setEventStatus(EventStatus.END_MET);
1030                    }
1031                    eventHandler.queueEvent(new SLACalcStatus(slaCalc));
1032                }
1033                slaCalc.setEventProcessed(8);
1034            }
1035            else {
1036                if (slaCalc.getActualStart() != null) {
1037                    slaCalc.setSLAStatus(SLAStatus.IN_PROCESS);
1038                }
1039                if ((eventProc & 1) == 0) {
1040                    if (slaCalc.getActualStart() != null) {
1041                        if (slaCalc.getExpectedStart().getTime() < slaCalc.getActualStart().getTime()) {
1042                            slaCalc.setEventStatus(EventStatus.START_MISS);
1043                        }
1044                        else {
1045                            slaCalc.setEventStatus(EventStatus.START_MET);
1046                        }
1047                        eventHandler.queueEvent(new SLACalcStatus(slaCalc));
1048                        eventProc++;
1049                    }
1050                    else if (slaCalc.getExpectedStart().getTime() < System.currentTimeMillis()) {
1051                        slaCalc.setEventStatus(EventStatus.START_MISS);
1052                        eventHandler.queueEvent(new SLACalcStatus(slaCalc));
1053                        eventProc++;
1054                    }
1055                }
1056                if (((eventProc >> 1) & 1) == 0 && slaCalc.getActualStart() != null
1057                        && slaCalc.getExpectedDuration() != -1) {
1058                    if (System.currentTimeMillis() - slaCalc.getActualStart().getTime() > slaCalc.getExpectedDuration()) {
1059                        slaCalc.setEventStatus(EventStatus.DURATION_MISS);
1060                        eventHandler.queueEvent(new SLACalcStatus(slaCalc));
1061                        eventProc += 2;
1062                    }
1063                }
1064                if (eventProc < 4 && slaCalc.getExpectedEnd().getTime() < System.currentTimeMillis()) {
1065                    slaCalc.setEventStatus(EventStatus.END_MISS);
1066                    slaCalc.setSLAStatus(SLAStatus.MISS);
1067                    eventHandler.queueEvent(new SLACalcStatus(slaCalc));
1068                    eventProc += 4;
1069                }
1070                slaCalc.setEventProcessed(eventProc);
1071            }
1072        }
1073        catch (Exception e) {
1074            LOG.warn("Error while confirming SLA against DB for jobid= " + slaCalc.getId() + ". Exception is "
1075                    + e.getClass().getName() + ": " + e.getMessage());
1076            if (slaCalc.getEventProcessed() < 4 && slaCalc.getExpectedEnd().getTime() < System.currentTimeMillis()) {
1077                slaCalc.setEventStatus(EventStatus.END_MISS);
1078                slaCalc.setSLAStatus(SLAStatus.MISS);
1079                eventHandler.queueEvent(new SLACalcStatus(slaCalc));
1080                slaCalc.setEventProcessed(slaCalc.getEventProcessed() + 4);
1081            }
1082        }
1083    }
1084
1085    @VisibleForTesting
1086    public boolean isJobIdInSLAMap(String jobId) {
1087        return this.slaMap.containsKey(jobId);
1088    }
1089
1090    @VisibleForTesting
1091    public boolean isJobIdInHistorySet(String jobId) {
1092        return this.historySet.contains(jobId);
1093    }
1094
1095    private void setLogPrefix(String jobId) {
1096        LOG = LogUtils.setLogInfo(LOG, jobId, null, null);
1097    }
1098}