001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.sla;
020
021import java.sql.Timestamp;
022import java.util.ArrayList;
023import java.util.Collections;
024import java.util.Date;
025import java.util.HashSet;
026import java.util.Iterator;
027import java.util.List;
028import java.util.Map;
029import java.util.Set;
030import java.util.concurrent.ConcurrentHashMap;
031
032import org.apache.hadoop.conf.Configuration;
033import org.apache.oozie.AppType;
034import org.apache.oozie.ErrorCode;
035import org.apache.oozie.XException;
036import org.apache.oozie.client.CoordinatorAction;
037import org.apache.oozie.client.OozieClient;
038import org.apache.oozie.client.WorkflowAction;
039import org.apache.oozie.client.WorkflowJob;
040import org.apache.oozie.client.event.JobEvent;
041import org.apache.oozie.client.event.SLAEvent.SLAStatus;
042import org.apache.oozie.client.rest.JsonBean;
043import org.apache.oozie.client.rest.RestConstants;
044import org.apache.oozie.command.CommandException;
045import org.apache.oozie.executor.jpa.BatchQueryExecutor;
046import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
047import org.apache.oozie.executor.jpa.JPAExecutorException;
048import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor;
049import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor.SLARegQuery;
050import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor;
051import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery;
052import org.apache.oozie.executor.jpa.sla.SLASummaryGetRecordsOnRestartJPAExecutor;
053import org.apache.oozie.service.ConfigurationService;
054import org.apache.oozie.service.EventHandlerService;
055import org.apache.oozie.service.InstrumentationService;
056import org.apache.oozie.service.JPAService;
057import org.apache.oozie.service.SchedulerService;
058import org.apache.oozie.service.ServiceException;
059import org.apache.oozie.service.Services;
060import org.apache.oozie.sla.service.SLAService;
061
062import com.google.common.annotations.VisibleForTesting;
063import org.apache.oozie.util.DateUtils;
064import org.apache.oozie.util.Instrumentation;
065import org.apache.oozie.util.LogUtils;
066import org.apache.oozie.util.Pair;
067import org.apache.oozie.util.XLog;
068
069/**
070 * Implementation class for SLACalculator that calculates SLA related to
071 * start/end/duration of jobs using a memory-based map
072 */
073public class SLACalculatorMemory implements SLACalculator {
074
075    private static XLog LOG = XLog.getLog(SLACalculatorMemory.class);
076    // TODO optimization priority based insertion/processing/bumping up-down
077    protected Map<String, SLACalcStatus> slaMap;
078    protected Set<String> historySet;
079    private static int capacity;
080    private static JPAService jpaService;
081    protected EventHandlerService eventHandler;
082    private static int modifiedAfter;
083    private static long jobEventLatency;
084    private Instrumentation instrumentation;
085    public static final String INSTRUMENTATION_GROUP = "sla-calculator";
086    public static final String SLA_MAP = "sla-map";
087
088    @Override
089    public void init(Configuration conf) throws ServiceException {
090        capacity = ConfigurationService.getInt(conf, SLAService.CONF_CAPACITY);
091        jobEventLatency = ConfigurationService.getInt(conf, SLAService.CONF_JOB_EVENT_LATENCY);
092        slaMap = new ConcurrentHashMap<String, SLACalcStatus>();
093        historySet = Collections.synchronizedSet(new HashSet<String>());
094        jpaService = Services.get().get(JPAService.class);
095        eventHandler = Services.get().get(EventHandlerService.class);
096        instrumentation = Services.get().get(InstrumentationService.class).get();
097        // load events modified after
098        modifiedAfter = conf.getInt(SLAService.CONF_EVENTS_MODIFIED_AFTER, 7);
099        loadOnRestart();
100        Runnable purgeThread = new HistoryPurgeWorker();
101        // schedule runnable by default 1 hours
102        Services.get()
103                .get(SchedulerService.class)
104                .schedule(purgeThread, 3600, Services.get().getConf().getInt(SLAService.CONF_SLA_HISTORY_PURGE_INTERVAL, 3600),
105                        SchedulerService.Unit.SEC);
106    }
107
108    public class HistoryPurgeWorker extends Thread {
109
110        public HistoryPurgeWorker() {
111        }
112
113        @Override
114        public void run() {
115            if (Thread.currentThread().isInterrupted()) {
116                return;
117            }
118            Iterator<String> jobItr = historySet.iterator();
119            while (jobItr.hasNext()) {
120                String jobId = jobItr.next();
121                LOG.debug(" Running HistoryPurgeWorker for " + jobId);
122                try {
123                    boolean isDone = SLAXCommandFactory.getSLAJobHistoryXCommand(jobId).call();
124                    if (isDone) {
125                        LOG.debug("[{0}] job is finished and processed. Removing from history");
126                        jobItr.remove();
127                    }
128                }
129                catch (CommandException e) {
130                    if (e.getErrorCode().equals(ErrorCode.E0604) || e.getErrorCode().equals(ErrorCode.E0605)) {
131                        LOG.warn("Job is not found in db: " + jobId, e);
132                        jobItr.remove();
133                    }
134                    else {
135                        LOG.error("Failed to fetch the job: " + jobId, e);
136                    }
137                }
138            }
139        }
140    }
141
142    private void loadOnRestart() {
143        try {
144            List<SLASummaryBean> summaryBeans = jpaService
145                    .execute(new SLASummaryGetRecordsOnRestartJPAExecutor(modifiedAfter));
146            for (SLASummaryBean summaryBean : summaryBeans) {
147                String jobId = summaryBean.getId();
148                putAndIncrement(jobId, new SLACalcStatus(summaryBean));
149            }
150            LOG.info("Loaded {0} SLASummary object after restart", slaMap.size());
151        }
152        catch (Exception e) {
153            LOG.warn("Failed to retrieve SLASummary records on restart", e);
154        }
155    }
156
157    @Override
158    public int size() {
159        return slaMap.size();
160    }
161
162    @VisibleForTesting
163    public Set<String> getHistorySet(){
164        return historySet;
165    }
166
167    @Override
168    public SLACalcStatus get(String jobId) throws JPAExecutorException {
169        SLACalcStatus memObj;
170        memObj = slaMap.get(jobId);
171        if (memObj == null && historySet.contains(jobId)) {
172            memObj = new SLACalcStatus(SLASummaryQueryExecutor.getInstance()
173                    .get(SLASummaryQuery.GET_SLA_SUMMARY, jobId), SLARegistrationQueryExecutor.getInstance().get(
174                    SLARegQuery.GET_SLA_REG_ON_RESTART, jobId));
175        }
176        return memObj;
177    }
178
179    /**
180     * Get SLACalcStatus from map if SLARegistration is not null, else create a new SLACalcStatus
181     * This function deosn't update  slaMap
182     * @param jobId
183     * @return SLACalcStatus returns SLACalcStatus from map if SLARegistration is not null,
184     * else create a new SLACalcStatus
185     * @throws JPAExecutorException
186     */
187    private SLACalcStatus getOrCreateSLACalcStatus(String jobId) throws JPAExecutorException {
188        SLACalcStatus memObj;
189        memObj = slaMap.get(jobId);
190        // if the request came from immediately after restart don't use map SLACalcStatus.
191        if (memObj == null || memObj.getSLARegistrationBean() == null) {
192            SLARegistrationBean registrationBean = SLARegistrationQueryExecutor.getInstance()
193                    .get(SLARegQuery.GET_SLA_REG_ON_RESTART, jobId);
194            SLASummaryBean summaryBean = memObj == null
195                    ? SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId)
196                    : memObj.getSLASummaryBean();
197            return new SLACalcStatus(summaryBean, registrationBean);
198        }
199        return memObj;
200    }
201
202    @Override
203    public Iterator<String> iterator() {
204        return slaMap.keySet().iterator();
205    }
206
207    @Override
208    public boolean isEmpty() {
209        return slaMap.isEmpty();
210    }
211
212    @Override
213    public void clear() {
214        final int originalSize = slaMap.size();
215        slaMap.clear();
216        historySet.clear();
217        instrumentation.decr(INSTRUMENTATION_GROUP, SLA_MAP, originalSize);
218    }
219
220    /**
221     * Invoked via periodic run, update the SLA for registered jobs
222     */
223    protected void updateJobSla(String jobId) throws Exception {
224        SLACalcStatus slaCalc = slaMap.get(jobId);
225
226        if (slaCalc == null) {
227            // job might be processed and removed from map by addJobStatus
228            return;
229        }
230        boolean firstCheckAfterRetstart = checkAndUpdateSLACalcAfterRestart(slaCalc);
231        // get eventProcessed on DB for validation in HA
232        SLASummaryBean summaryBean = null;
233        try {
234            summaryBean = ((SLASummaryQueryExecutor) SLASummaryQueryExecutor.getInstance())
235                    .get(SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED_LAST_MODIFIED, jobId);
236        }
237        catch (JPAExecutorException e) {
238            if (e.getErrorCode().equals(ErrorCode.E0604) || e.getErrorCode().equals(ErrorCode.E0605)) {
239                LOG.debug("job [{0}] is is not in DB, removing from Memory", jobId);
240                removeAndDecrement(jobId);
241                return;
242            }
243            throw e;
244        }
245        byte eventProc = summaryBean.getEventProcessed();
246        slaCalc.setEventProcessed(eventProc);
247        if (eventProc >= 7) {
248            if (eventProc == 7) {
249                historySet.add(jobId);
250            }
251            removeAndDecrement(jobId);
252            LOG.trace("Removed Job [{0}] from map as SLA processed", jobId);
253        }
254        else {
255            if (!slaCalc.getLastModifiedTime().equals(summaryBean.getLastModifiedTime())) {
256                // Update last modified time.
257                slaCalc.setLastModifiedTime(summaryBean.getLastModifiedTime());
258                reloadExpectedTimeAndConfig(slaCalc);
259                LOG.debug("Last modified time has changed for job " + jobId + " reloading config from DB");
260            }
261            if (firstCheckAfterRetstart || isChanged(slaCalc)) {
262                LOG.debug("{0} job has SLA event change. EventProc = {1}, status = {2}", slaCalc.getId(),
263                        slaCalc.getEventProcessed(), slaCalc.getJobStatus());
264                try {
265                    SLAXCommandFactory.getSLAEventXCommand(slaCalc).call();
266                    checkEventProc(slaCalc);
267                }
268                catch (XException e) {
269                    if (e.getErrorCode().equals(ErrorCode.E0604) || e.getErrorCode().equals(ErrorCode.E0605)) {
270                        LOG.debug("job [{0}] is is not in DB, removing from Memory", slaCalc.getId());
271                        removeAndDecrement(jobId);
272                    }
273                    else {
274                        if (firstCheckAfterRetstart) {
275                            slaCalc.setSLARegistrationBean(null);
276                        }
277                    }
278                }
279            }
280        }
281    }
282
283    private boolean isChanged(SLACalcStatus slaCalc) {
284        SLARegistrationBean reg = slaCalc.getSLARegistrationBean();
285        byte eventProc = slaCalc.getEventProcessed();
286
287        if ((eventProc & 1) == 0) { // first bit (start-processed) unset
288            if (reg.getExpectedStart() != null) {
289                if (reg.getExpectedStart().getTime() + jobEventLatency < System.currentTimeMillis()) {
290                    return true;
291                }
292            }
293            else {
294                return true;
295            }
296        }
297        if (eventProc != 8 && ((eventProc >> 1) & 1) == 0) {
298            if (reg.getExpectedDuration() == -1) {
299                return true;
300            }
301            else if (slaCalc.getActualStart() != null) {
302                if ((reg.getExpectedDuration() + jobEventLatency) < (System.currentTimeMillis() - slaCalc
303                        .getActualStart().getTime())) {
304                    return true;
305                }
306            }
307        }
308        if (eventProc < 4) {
309            if (reg.getExpectedEnd().getTime() + jobEventLatency < System.currentTimeMillis()) {
310                return true;
311            }
312        }
313        return false;
314    }
315
316    @SuppressWarnings("rawtypes")
317    private void updateDBSlaConfig(SLACalcStatus slaCalc, List<UpdateEntry> updateList) throws JPAExecutorException {
318        updateList.add(new UpdateEntry<SLARegQuery>(SLARegQuery.UPDATE_SLA_CONFIG, slaCalc.getSLARegistrationBean()));
319        slaCalc.setLastModifiedTime(new Date());
320        updateList.add(new UpdateEntry<SLASummaryQuery>(SLASummaryQuery.UPDATE_SLA_SUMMARY_LAST_MODIFIED_TIME,
321                new SLASummaryBean(slaCalc)));
322    }
323
324    @SuppressWarnings("rawtypes")
325    private void updateDBSlaExpectedValues(SLACalcStatus slaCalc, List<UpdateEntry> updateList)
326            throws JPAExecutorException {
327        slaCalc.setLastModifiedTime(new Date());
328        updateList.add(new UpdateEntry<SLARegQuery>(SLARegQuery.UPDATE_SLA_EXPECTED_VALUE, slaCalc
329                .getSLARegistrationBean()));
330        updateList.add(new UpdateEntry<SLASummaryQuery>(SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_EXPECTED_TIMES,
331                new SLASummaryBean(slaCalc)));
332    }
333
334    @SuppressWarnings("rawtypes")
335    private void executeBatchQuery(List<UpdateEntry> updateList) throws JPAExecutorException {
336        BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null);
337    }
338
339    /**
340     * Periodically run by the SLAService worker threads to update SLA status by
341     * iterating through all the jobs in the map
342     */
343    @Override
344    public void updateAllSlaStatus() {
345        LOG.info("Running periodic SLA check");
346        Iterator<String> iterator = slaMap.keySet().iterator();
347        while (iterator.hasNext()) {
348            String jobId = iterator.next();
349            try {
350                LOG.trace("Processing SLA for jobid={0}", jobId);
351                updateJobSla(jobId);
352            }
353            catch (Exception e) {
354                setLogPrefix(jobId);
355                LOG.error("Exception in SLA processing for job [{0}]", jobId, e);
356                LogUtils.clearLogPrefix();
357            }
358        }
359    }
360
361    /**
362     * Register a new job into the map for SLA tracking
363     * @return true if successful
364     */
365    @Override
366    public boolean addRegistration(String jobId, SLARegistrationBean reg) throws JPAExecutorException {
367        try {
368            if (slaMap.size() < capacity) {
369                SLACalcStatus slaCalc = new SLACalcStatus(reg);
370                slaCalc.setSLAStatus(SLAStatus.NOT_STARTED);
371                slaCalc.setJobStatus(getJobStatus(reg.getAppType()));
372                putAndIncrement(jobId, slaCalc);
373                List<JsonBean> insertList = new ArrayList<JsonBean>();
374                final SLASummaryBean summaryBean = new SLASummaryBean(slaCalc);
375                final Timestamp currentTime = DateUtils.convertDateToTimestamp(new Date());
376                reg.setCreatedTimestamp(currentTime);
377                summaryBean.setCreatedTimestamp(currentTime);
378                insertList.add(reg);
379                insertList.add(summaryBean);
380                BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, null, null);
381                LOG.trace("SLA Registration Event - Job:" + jobId);
382                return true;
383            }
384            else {
385                setLogPrefix(reg.getId());
386                LOG.error(
387                        "SLACalculator memory capacity reached. Cannot add or update new SLA Registration entry for job [{0}]",
388                        reg.getId());
389                LogUtils.clearLogPrefix();
390            }
391        }
392        catch (JPAExecutorException jpa) {
393            throw jpa;
394        }
395        return false;
396    }
397
398    private String getJobStatus(AppType appType) {
399        String status = null;
400        switch (appType) {
401            case COORDINATOR_ACTION:
402                status = CoordinatorAction.Status.WAITING.name();
403                break;
404            case WORKFLOW_ACTION:
405                status = WorkflowAction.Status.PREP.name();
406                break;
407            case WORKFLOW_JOB:
408                status = WorkflowJob.Status.PREP.name();
409                break;
410            default:
411                break;
412        }
413        return status;
414    }
415
416    /**
417     * Update job into the map for SLA tracking
418     */
419    @Override
420    public boolean updateRegistration(String jobId, SLARegistrationBean reg) throws JPAExecutorException {
421        try {
422            if (slaMap.size() < capacity) {
423                SLACalcStatus slaCalc = new SLACalcStatus(reg);
424                slaCalc.setSLAStatus(SLAStatus.NOT_STARTED);
425                slaCalc.setJobStatus(getJobStatus(reg.getAppType()));
426                putAndIncrement(jobId, slaCalc);
427
428                @SuppressWarnings("rawtypes")
429                List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
430                updateList.add(new UpdateEntry<SLARegQuery>(SLARegQuery.UPDATE_SLA_REG_ALL, reg));
431                updateList.add(new UpdateEntry<SLASummaryQuery>(SLASummaryQuery.UPDATE_SLA_SUMMARY_ALL,
432                        new SLASummaryBean(slaCalc)));
433                BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null);
434                LOG.trace("SLA Registration Event - Job:" + jobId);
435                return true;
436            }
437            else {
438                setLogPrefix(reg.getId());
439                LOG.error(
440                        "SLACalculator memory capacity reached. Cannot add or update new SLA Registration entry for job [{0}]",
441                        reg.getId());
442                LogUtils.clearLogPrefix();
443            }
444        }
445        catch (JPAExecutorException jpa) {
446            throw jpa;
447        }
448        return false;
449    }
450
451    /**
452     * Remove job from being tracked in map
453     */
454    @Override
455    public void removeRegistration(String jobId) {
456        if (!removeAndDecrement(jobId)) {
457            historySet.remove(jobId);
458        }
459    }
460
461    /**
462     * Triggered after receiving Job status change event, update SLA status
463     * accordingly
464     */
465    @Override
466    public boolean addJobStatus(String jobId, String jobStatus, JobEvent.EventStatus jobEventStatus, Date startTime,
467            Date endTime) throws JPAExecutorException, ServiceException {
468        LOG.debug(
469                "Received addJobStatus request for job  [{0}] jobStatus = [{1}], jobEventStatus = [{2}], startTime = [{3}], "
470                        + "endTime = [{4}] ", jobId, jobStatus, jobEventStatus, startTime, endTime);
471        SLACalcStatus slaCalc = slaMap.get(jobId);
472        boolean firstCheckAfterRetstart = checkAndUpdateSLACalcAfterRestart(slaCalc);
473        if (slaCalc == null) {
474            SLARegistrationBean slaRegBean = SLARegistrationQueryExecutor.getInstance().get(
475                    SLARegQuery.GET_SLA_REG_ALL, jobId);
476            if (slaRegBean != null) { // filter out jobs picked by SLA job event listener
477                                      // but not actually configured for SLA
478                SLASummaryBean slaSummaryBean = SLASummaryQueryExecutor.getInstance().get(
479                        SLASummaryQuery.GET_SLA_SUMMARY, jobId);
480                slaCalc = new SLACalcStatus(slaSummaryBean, slaRegBean);
481                putAndIncrement(jobId, slaCalc);
482            }
483        }
484        else {
485            SLASummaryBean summaryBean = ((SLASummaryQueryExecutor) SLASummaryQueryExecutor.getInstance()).get(
486                    SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED_LAST_MODIFIED, jobId);
487            byte eventProc = summaryBean.getEventProcessed();
488            if (!slaCalc.getLastModifiedTime().equals(summaryBean.getLastModifiedTime())) {
489                // Update last modified time.
490                slaCalc.setLastModifiedTime(summaryBean.getLastModifiedTime());
491                reloadExpectedTimeAndConfig(slaCalc);
492                LOG.debug("Last modified time has changed for job " + jobId + " reloading config from DB");
493
494            }
495            slaCalc.setEventProcessed(eventProc);
496        }
497        if (slaCalc != null) {
498            try {
499                SLAXCommandFactory.getSLAEventXCommand(slaCalc,
500                        ConfigurationService.getLong(SLAService.CONF_SLA_CALC_LOCK_TIMEOUT, 20 * 1000)).call();
501                checkEventProc(slaCalc);
502            }
503            catch (XException e) {
504                if (firstCheckAfterRetstart) {
505                    slaCalc.setSLARegistrationBean(null);
506                }
507                LOG.error(e);
508                throw new ServiceException(e);
509            }
510            return true;
511        }
512        else {
513            return false;
514        }
515    }
516
517    private void checkEventProc(SLACalcStatus slaCalc){
518        byte eventProc = slaCalc.getEventProcessed();
519        if (slaCalc.getEventProcessed() >= 8) {
520            removeAndDecrement(slaCalc.getId());
521            LOG.debug("Removed Job [{0}] from map after Event-processed=8", slaCalc.getId());
522        }
523        if (eventProc == 7) {
524            historySet.add(slaCalc.getId());
525            removeAndDecrement(slaCalc.getId());
526            LOG.debug("Removed Job [{0}] from map after Event-processed=7", slaCalc.getId());
527        }
528    }
529
530    public void reloadExpectedTimeAndConfig(SLACalcStatus slaCalc) throws JPAExecutorException {
531        SLARegistrationBean regBean = SLARegistrationQueryExecutor.getInstance().get(
532                SLARegQuery.GET_SLA_EXPECTED_VALUE_CONFIG, slaCalc.getId());
533
534        if (regBean.getExpectedDuration() > 0) {
535            slaCalc.getSLARegistrationBean().setExpectedDuration(regBean.getExpectedDuration());
536        }
537        if (regBean.getExpectedEnd() != null) {
538            slaCalc.getSLARegistrationBean().setExpectedEnd(regBean.getExpectedEnd());
539        }
540        if (regBean.getExpectedStart() != null) {
541            slaCalc.getSLARegistrationBean().setExpectedStart(regBean.getExpectedStart());
542        }
543        if (regBean.getSLAConfigMap().containsKey(OozieClient.SLA_DISABLE_ALERT)) {
544            slaCalc.getSLARegistrationBean().addToSLAConfigMap(OozieClient.SLA_DISABLE_ALERT,
545                    regBean.getSLAConfigMap().get(OozieClient.SLA_DISABLE_ALERT));
546        }
547        if (regBean.getNominalTime() != null) {
548            slaCalc.getSLARegistrationBean().setNominalTime(regBean.getNominalTime());
549        }
550    }
551
552    @VisibleForTesting
553    public boolean isJobIdInSLAMap(String jobId) {
554        return this.slaMap.containsKey(jobId);
555    }
556
557    @VisibleForTesting
558    public boolean isJobIdInHistorySet(String jobId) {
559        return this.historySet.contains(jobId);
560    }
561
562    private void setLogPrefix(String jobId) {
563        LOG = LogUtils.setLogInfo(LOG, jobId, null, null);
564    }
565
566    @Override
567    public boolean enableAlert(List<String> jobIds) throws JPAExecutorException, ServiceException {
568        boolean isJobFound = false;
569        @SuppressWarnings("rawtypes")
570        List<UpdateEntry> updateList = new ArrayList<BatchQueryExecutor.UpdateEntry>();
571        for (String jobId : jobIds) {
572            SLACalcStatus slaCalc = getOrCreateSLACalcStatus(jobId);
573            slaCalc.getSLARegistrationBean().removeFromSLAConfigMap(OozieClient.SLA_DISABLE_ALERT);
574            updateDBSlaConfig(slaCalc, updateList);
575            isJobFound = true;
576        }
577        executeBatchQuery(updateList);
578        return isJobFound;
579    }
580
581    @Override
582    public boolean enableChildJobAlert(List<String> parentJobIds) throws JPAExecutorException, ServiceException {
583        return enableAlert(getSLAJobsforParents(parentJobIds));
584    }
585
586    @Override
587    public boolean disableAlert(List<String> jobIds) throws JPAExecutorException, ServiceException {
588        boolean isJobFound = false;
589        @SuppressWarnings("rawtypes")
590        List<UpdateEntry> updateList = new ArrayList<BatchQueryExecutor.UpdateEntry>();
591
592        for (String jobId : jobIds) {
593            SLACalcStatus slaCalc = getOrCreateSLACalcStatus(jobId);
594            slaCalc.getSLARegistrationBean().addToSLAConfigMap(OozieClient.SLA_DISABLE_ALERT, Boolean.toString(true));
595            updateDBSlaConfig(slaCalc, updateList);
596            isJobFound = true;
597        }
598        executeBatchQuery(updateList);
599        return isJobFound;
600    }
601
602    @Override
603    public boolean disableChildJobAlert(List<String> parentJobIds) throws JPAExecutorException, ServiceException {
604        return disableAlert(getSLAJobsforParents(parentJobIds));
605    }
606
607    @Override
608    public boolean changeDefinition(List<Pair<String, Map<String, String>>> jobIdsSLAPair) throws JPAExecutorException,
609            ServiceException {
610        boolean isJobFound = false;
611        @SuppressWarnings("rawtypes")
612        List<UpdateEntry> updateList = new ArrayList<BatchQueryExecutor.UpdateEntry>();
613        for (Pair<String, Map<String, String>> jobIdSLAPair : jobIdsSLAPair) {
614            SLACalcStatus slaCalc = getOrCreateSLACalcStatus(jobIdSLAPair.getFirst());
615            updateParams(slaCalc, jobIdSLAPair.getSecond());
616            updateDBSlaExpectedValues(slaCalc, updateList);
617            isJobFound = true;
618        }
619        executeBatchQuery(updateList);
620        return isJobFound;
621    }
622
623    private void updateParams(SLACalcStatus slaCalc, Map<String, String> newParams) throws ServiceException {
624        SLARegistrationBean reg = slaCalc.getSLARegistrationBean();
625        if (newParams != null) {
626            try {
627                Date newNominal = SLAOperations.setNominalTime(newParams.get(RestConstants.SLA_NOMINAL_TIME), reg);
628                SLAOperations.setExpectedStart(newParams.get(RestConstants.SLA_SHOULD_START), newNominal, reg);
629                SLAOperations.setExpectedEnd(newParams.get(RestConstants.SLA_SHOULD_END), newNominal, reg);
630                SLAOperations.setExpectedDuration(newParams.get(RestConstants.SLA_MAX_DURATION), reg);
631            }
632            catch (CommandException ce) {
633                throw new ServiceException(ce);
634            }
635        }
636    }
637
638    private List<String> getSLAJobsforParents(List<String> parentJobIds) throws JPAExecutorException {
639        List<String> childJobIds = new ArrayList<String>();
640        for (String jobId : parentJobIds) {
641            List<SLARegistrationBean> registrationBeanList = SLARegistrationQueryExecutor.getInstance().getList(
642                    SLARegQuery.GET_SLA_REG_FOR_PARENT_ID, jobId);
643            for (SLARegistrationBean bean : registrationBeanList) {
644                childJobIds.add(bean.getId());
645            }
646        }
647        return childJobIds;
648    }
649
650    private boolean checkAndUpdateSLACalcAfterRestart(SLACalcStatus slaCalc) throws JPAExecutorException {
651        if (slaCalc != null && slaCalc.getSLARegistrationBean() == null) {
652            return updateSLARegistartion(slaCalc);
653        }
654        return false;
655    }
656
657    public boolean updateSLARegistartion(SLACalcStatus slaCalc) throws JPAExecutorException {
658        if (slaCalc.getSLARegistrationBean() == null) {
659            synchronized (slaCalc) {
660                if (slaCalc.getSLARegistrationBean() == null) {
661                    SLARegistrationBean slaRegBean = SLARegistrationQueryExecutor.getInstance()
662                            .get(SLARegQuery.GET_SLA_REG_ON_RESTART, slaCalc.getId());
663                    slaCalc.updateSLARegistrationBean(slaRegBean);
664                    return true;
665                }
666            }
667        }
668        return false;
669    }
670
671    private boolean putAndIncrement(final String jobId, final SLACalcStatus newStatus) {
672        if (slaMap.put(jobId, newStatus) == null) {
673            LOG.trace("Added a new item to SLA map. [jobId={0}]", jobId);
674            instrumentation.incr(INSTRUMENTATION_GROUP, SLA_MAP, 1);
675            return true;
676        }
677
678        LOG.trace("Updated an existing item in SLA map. [jobId={0}]", jobId);
679        return false;
680    }
681
682    private boolean removeAndDecrement(final String jobId) {
683        if (slaMap.remove(jobId) != null) {
684            LOG.trace("Removed an existing item from SLA map. [jobId={0}]", jobId);
685            instrumentation.decr(INSTRUMENTATION_GROUP, SLA_MAP, 1);
686            return true;
687        }
688
689        LOG.trace("Tried to remove a non-existing item from SLA map. [jobId={0}]", jobId);
690        return false;
691    }
692}