001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.sla;
020
021import java.sql.Timestamp;
022import java.util.ArrayList;
023import java.util.Collections;
024import java.util.Date;
025import java.util.HashSet;
026import java.util.Iterator;
027import java.util.List;
028import java.util.Map;
029import java.util.Set;
030import java.util.concurrent.ConcurrentHashMap;
031
032import org.apache.hadoop.conf.Configuration;
033import org.apache.oozie.AppType;
034import org.apache.oozie.ErrorCode;
035import org.apache.oozie.XException;
036import org.apache.oozie.client.CoordinatorAction;
037import org.apache.oozie.client.OozieClient;
038import org.apache.oozie.client.WorkflowAction;
039import org.apache.oozie.client.WorkflowJob;
040import org.apache.oozie.client.event.JobEvent;
041import org.apache.oozie.client.event.SLAEvent.SLAStatus;
042import org.apache.oozie.client.rest.JsonBean;
043import org.apache.oozie.client.rest.RestConstants;
044import org.apache.oozie.command.CommandException;
045import org.apache.oozie.executor.jpa.BatchQueryExecutor;
046import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
047import org.apache.oozie.executor.jpa.JPAExecutorException;
048import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor;
049import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor.SLARegQuery;
050import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor;
051import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery;
052import org.apache.oozie.executor.jpa.sla.SLASummaryGetRecordsOnRestartJPAExecutor;
053import org.apache.oozie.service.ConfigurationService;
054import org.apache.oozie.service.EventHandlerService;
055import org.apache.oozie.service.JPAService;
056import org.apache.oozie.service.SchedulerService;
057import org.apache.oozie.service.ServiceException;
058import org.apache.oozie.service.Services;
059import org.apache.oozie.sla.service.SLAService;
060import org.apache.oozie.util.DateUtils;
061import org.apache.oozie.util.LogUtils;
062import org.apache.oozie.util.XLog;
063import org.apache.oozie.util.Pair;
064
065import com.google.common.annotations.VisibleForTesting;
066
067/**
068 * Implementation class for SLACalculator that calculates SLA related to
069 * start/end/duration of jobs using a memory-based map
070 */
071public class SLACalculatorMemory implements SLACalculator {
072
073    private static XLog LOG = XLog.getLog(SLACalculatorMemory.class);
074    // TODO optimization priority based insertion/processing/bumping up-down
075    protected Map<String, SLACalcStatus> slaMap;
076    protected Set<String> historySet;
077    private static int capacity;
078    private static JPAService jpaService;
079    protected EventHandlerService eventHandler;
080    private static int modifiedAfter;
081    private static long jobEventLatency;
082
083    @Override
084    public void init(Configuration conf) throws ServiceException {
085        capacity = ConfigurationService.getInt(conf, SLAService.CONF_CAPACITY);
086        jobEventLatency = ConfigurationService.getInt(conf, SLAService.CONF_JOB_EVENT_LATENCY);
087        slaMap = new ConcurrentHashMap<String, SLACalcStatus>();
088        historySet = Collections.synchronizedSet(new HashSet<String>());
089        jpaService = Services.get().get(JPAService.class);
090        eventHandler = Services.get().get(EventHandlerService.class);
091        // load events modified after
092        modifiedAfter = conf.getInt(SLAService.CONF_EVENTS_MODIFIED_AFTER, 7);
093        loadOnRestart();
094        Runnable purgeThread = new HistoryPurgeWorker();
095        // schedule runnable by default 1 hours
096        Services.get()
097                .get(SchedulerService.class)
098                .schedule(purgeThread, 3600, Services.get().getConf().getInt(SLAService.CONF_SLA_HISTORY_PURGE_INTERVAL, 3600),
099                        SchedulerService.Unit.SEC);
100    }
101
102    public class HistoryPurgeWorker extends Thread {
103
104        public HistoryPurgeWorker() {
105        }
106
107        @Override
108        public void run() {
109            if (Thread.currentThread().isInterrupted()) {
110                return;
111            }
112            Iterator<String> jobItr = historySet.iterator();
113            while (jobItr.hasNext()) {
114                String jobId = jobItr.next();
115                LOG.debug(" Running HistoryPurgeWorker for " + jobId);
116                try {
117                    boolean isDone = SLAXCommandFactory.getSLAJobHistoryXCommand(jobId).call();
118                    if (isDone) {
119                        LOG.debug("[{0}] job is finished and processed. Removing from history");
120                        jobItr.remove();
121                    }
122                }
123                catch (CommandException e) {
124                    if (e.getErrorCode().equals(ErrorCode.E0604) || e.getErrorCode().equals(ErrorCode.E0605)) {
125                        LOG.warn("Job is not found in db: " + jobId, e);
126                        jobItr.remove();
127                    }
128                    else {
129                        LOG.error("Failed to fetch the job: " + jobId, e);
130                    }
131                }
132            }
133        }
134    }
135
136    private void loadOnRestart() {
137        long slaPendingCount = 0;
138        long statusPendingCount = 0;
139
140        try {
141            List<SLASummaryBean> summaryBeans = jpaService.execute(new SLASummaryGetRecordsOnRestartJPAExecutor(
142                    modifiedAfter));
143            for (SLASummaryBean summaryBean : summaryBeans) {
144                String jobId = summaryBean.getId();
145
146                SLARegistrationBean slaRegBean = SLARegistrationQueryExecutor.getInstance().get(
147                        SLARegQuery.GET_SLA_REG_ON_RESTART, jobId);
148                SLACalcStatus slaCalcStatus = new SLACalcStatus(summaryBean, slaRegBean);
149
150                // Processed missed jobs
151                try {
152                    SLAXCommandFactory.getSLAEventXCommand(slaCalcStatus).call();
153                }
154                catch (Throwable e) {
155                    LOG.error("Error while updating job {0}", slaCalcStatus.getId(), e);
156                }
157
158                if (slaCalcStatus.getEventProcessed() == 7) {
159                    historySet.add(jobId);
160                    statusPendingCount++;
161                    LOG.debug("Adding job [{0}] to historySet. EventProcessed is [{1}]", slaCalcStatus,
162                            slaCalcStatus);
163                }
164                else if (slaCalcStatus.getEventProcessed() < 7) {
165                    slaMap.put(jobId, slaCalcStatus);
166                    slaPendingCount++;
167                    LOG.debug("Adding job [{0}] to slamap. EventProcessed is [{1}]", slaCalcStatus,
168                            slaCalcStatus);
169
170                }
171            }
172            LOG.info("Loaded SLASummary pendingSLA=" + slaPendingCount + ", pendingStatusUpdate=" + statusPendingCount);
173        }
174        catch (Exception e) {
175            LOG.warn("Failed to retrieve SLASummary records on restart", e);
176        }
177    }
178
179    @Override
180    public int size() {
181        return slaMap.size();
182    }
183
184    @VisibleForTesting
185    public Set<String> getHistorySet(){
186        return historySet;
187    }
188
189    @Override
190    public SLACalcStatus get(String jobId) throws JPAExecutorException {
191        SLACalcStatus memObj;
192        memObj = slaMap.get(jobId);
193        if (memObj == null && historySet.contains(jobId)) {
194            memObj = new SLACalcStatus(SLASummaryQueryExecutor.getInstance()
195                    .get(SLASummaryQuery.GET_SLA_SUMMARY, jobId), SLARegistrationQueryExecutor.getInstance().get(
196                    SLARegQuery.GET_SLA_REG_ON_RESTART, jobId));
197        }
198        return memObj;
199    }
200
201    private SLACalcStatus getSLACalcStatus(String jobId) throws JPAExecutorException {
202        SLACalcStatus memObj;
203        memObj = slaMap.get(jobId);
204        if (memObj == null) {
205            memObj = new SLACalcStatus(SLASummaryQueryExecutor.getInstance()
206                    .get(SLASummaryQuery.GET_SLA_SUMMARY, jobId), SLARegistrationQueryExecutor.getInstance().get(
207                    SLARegQuery.GET_SLA_REG_ON_RESTART, jobId));
208        }
209        return memObj;
210    }
211
212    @Override
213    public Iterator<String> iterator() {
214        return slaMap.keySet().iterator();
215    }
216
217    @Override
218    public boolean isEmpty() {
219        return slaMap.isEmpty();
220    }
221
222    @Override
223    public void clear() {
224        slaMap.clear();
225        historySet.clear();
226    }
227
228    /**
229     * Invoked via periodic run, update the SLA for registered jobs
230     */
231    protected void updateJobSla(String jobId) throws Exception {
232        SLACalcStatus slaCalc = slaMap.get(jobId);
233
234        if (slaCalc == null) {
235            // job might be processed and removed from map by addJobStatus
236            return;
237        }
238        synchronized (slaCalc) {
239            // get eventProcessed on DB for validation in HA
240            SLASummaryBean summaryBean = null;
241            try {
242                summaryBean = ((SLASummaryQueryExecutor) SLASummaryQueryExecutor.getInstance()).get(
243                    SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED_LAST_MODIFIED, jobId);
244            }
245            catch (JPAExecutorException e) {
246                if (e.getErrorCode().equals(ErrorCode.E0604) || e.getErrorCode().equals(ErrorCode.E0605)) {
247                    LOG.debug("job [{0}] is is not in DB, removing from Memory", jobId);
248                    slaMap.remove(jobId);
249                    return;
250                }
251                throw e;
252            }
253            byte eventProc = summaryBean.getEventProcessed();
254            slaCalc.setEventProcessed(eventProc);
255            if (eventProc >= 7) {
256                if (eventProc == 7) {
257                    historySet.add(jobId);
258                }
259                slaMap.remove(jobId);
260                LOG.trace("Removed Job [{0}] from map as SLA processed", jobId);
261            }
262            else {
263                if (!slaCalc.getLastModifiedTime().equals(summaryBean.getLastModifiedTime())) {
264                    // Update last modified time.
265                    slaCalc.setLastModifiedTime(summaryBean.getLastModifiedTime());
266                    reloadExpectedTimeAndConfig(slaCalc);
267                    LOG.debug("Last modified time has changed for job " + jobId + " reloading config from DB");
268                }
269                if (isChanged(slaCalc)) {
270                    LOG.debug("{0} job has SLA event change. EventProc = {1}, status = {2}", slaCalc.getId(),
271                            slaCalc.getEventProcessed(), slaCalc.getJobStatus());
272                    try {
273                        SLAXCommandFactory.getSLAEventXCommand(slaCalc).call();
274                        checkEventProc(slaCalc);
275                    }
276                    catch (XException e) {
277                        if (e.getErrorCode().equals(ErrorCode.E0604) || e.getErrorCode().equals(ErrorCode.E0605)) {
278                            LOG.debug("job [{0}] is is not in DB, removing from Memory", slaCalc.getId());
279                            slaMap.remove(jobId);
280                        }
281                    }
282                }
283
284            }
285        }
286    }
287
288    private boolean isChanged(SLACalcStatus slaCalc) {
289        SLARegistrationBean reg = slaCalc.getSLARegistrationBean();
290        byte eventProc = slaCalc.getEventProcessed();
291
292        if ((eventProc & 1) == 0) { // first bit (start-processed) unset
293            if (reg.getExpectedStart() != null) {
294                if (reg.getExpectedStart().getTime() + jobEventLatency < System.currentTimeMillis()) {
295                    return true;
296                }
297            }
298            else {
299                return true;
300            }
301        }
302        if (eventProc != 8 && ((eventProc >> 1) & 1) == 0) {
303            if (reg.getExpectedDuration() == -1) {
304                return true;
305            }
306            else if (slaCalc.getActualStart() != null) {
307                if ((reg.getExpectedDuration() + jobEventLatency) < (System.currentTimeMillis() - slaCalc
308                        .getActualStart().getTime())) {
309                    return true;
310                }
311            }
312        }
313        if (eventProc < 4) {
314            if (reg.getExpectedEnd().getTime() + jobEventLatency < System.currentTimeMillis()) {
315                return true;
316            }
317        }
318        return false;
319    }
320
321    @SuppressWarnings("rawtypes")
322    private void updateDBSlaConfig(SLACalcStatus slaCalc, List<UpdateEntry> updateList) throws JPAExecutorException {
323        updateList.add(new UpdateEntry<SLARegQuery>(SLARegQuery.UPDATE_SLA_CONFIG, slaCalc.getSLARegistrationBean()));
324        slaCalc.setLastModifiedTime(new Date());
325        updateList.add(new UpdateEntry<SLASummaryQuery>(SLASummaryQuery.UPDATE_SLA_SUMMARY_LAST_MODIFIED_TIME,
326                new SLASummaryBean(slaCalc)));
327    }
328
329    @SuppressWarnings("rawtypes")
330    private void updateDBSlaExpectedValues(SLACalcStatus slaCalc, List<UpdateEntry> updateList)
331            throws JPAExecutorException {
332        slaCalc.setLastModifiedTime(new Date());
333        updateList.add(new UpdateEntry<SLARegQuery>(SLARegQuery.UPDATE_SLA_EXPECTED_VALUE, slaCalc
334                .getSLARegistrationBean()));
335        updateList.add(new UpdateEntry<SLASummaryQuery>(SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_EXPECTED_TIMES,
336                new SLASummaryBean(slaCalc)));
337    }
338
339    @SuppressWarnings("rawtypes")
340    private void executeBatchQuery(List<UpdateEntry> updateList) throws JPAExecutorException {
341        BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null);
342    }
343
344    /**
345     * Periodically run by the SLAService worker threads to update SLA status by
346     * iterating through all the jobs in the map
347     */
348    @Override
349    public void updateAllSlaStatus() {
350        LOG.info("Running periodic SLA check");
351        Iterator<String> iterator = slaMap.keySet().iterator();
352        while (iterator.hasNext()) {
353            String jobId = iterator.next();
354            try {
355                LOG.trace("Processing SLA for jobid={0}", jobId);
356                updateJobSla(jobId);
357            }
358            catch (Exception e) {
359                setLogPrefix(jobId);
360                LOG.error("Exception in SLA processing for job [{0}]", jobId, e);
361                LogUtils.clearLogPrefix();
362            }
363        }
364    }
365
366    /**
367     * Register a new job into the map for SLA tracking
368     */
369    @Override
370    public boolean addRegistration(String jobId, SLARegistrationBean reg) throws JPAExecutorException {
371        try {
372            if (slaMap.size() < capacity) {
373                SLACalcStatus slaCalc = new SLACalcStatus(reg);
374                slaCalc.setSLAStatus(SLAStatus.NOT_STARTED);
375                slaCalc.setJobStatus(getJobStatus(reg.getAppType()));
376                slaMap.put(jobId, slaCalc);
377                List<JsonBean> insertList = new ArrayList<JsonBean>();
378                final SLASummaryBean summaryBean = new SLASummaryBean(slaCalc);
379                final Timestamp currentTime = DateUtils.convertDateToTimestamp(new Date());
380                reg.setCreatedTimestamp(currentTime);
381                summaryBean.setCreatedTimestamp(currentTime);
382                insertList.add(reg);
383                insertList.add(summaryBean);
384                BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, null, null);
385                LOG.trace("SLA Registration Event - Job:" + jobId);
386                return true;
387            }
388            else {
389                setLogPrefix(reg.getId());
390                LOG.error(
391                        "SLACalculator memory capacity reached. Cannot add or update new SLA Registration entry for job [{0}]",
392                        reg.getId());
393                LogUtils.clearLogPrefix();
394            }
395        }
396        catch (JPAExecutorException jpa) {
397            throw jpa;
398        }
399        return false;
400    }
401
402    private String getJobStatus(AppType appType) {
403        String status = null;
404        switch (appType) {
405            case COORDINATOR_ACTION:
406                status = CoordinatorAction.Status.WAITING.name();
407                break;
408            case WORKFLOW_ACTION:
409                status = WorkflowAction.Status.PREP.name();
410                break;
411            case WORKFLOW_JOB:
412                status = WorkflowJob.Status.PREP.name();
413                break;
414            default:
415                break;
416        }
417        return status;
418    }
419
420    /**
421     * Update job into the map for SLA tracking
422     */
423    @Override
424    public boolean updateRegistration(String jobId, SLARegistrationBean reg) throws JPAExecutorException {
425        try {
426            if (slaMap.size() < capacity) {
427                SLACalcStatus slaCalc = new SLACalcStatus(reg);
428                slaCalc.setSLAStatus(SLAStatus.NOT_STARTED);
429                slaCalc.setJobStatus(getJobStatus(reg.getAppType()));
430                slaMap.put(jobId, slaCalc);
431
432                @SuppressWarnings("rawtypes")
433                List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
434                updateList.add(new UpdateEntry<SLARegQuery>(SLARegQuery.UPDATE_SLA_REG_ALL, reg));
435                updateList.add(new UpdateEntry<SLASummaryQuery>(SLASummaryQuery.UPDATE_SLA_SUMMARY_ALL,
436                        new SLASummaryBean(slaCalc)));
437                BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null);
438                LOG.trace("SLA Registration Event - Job:" + jobId);
439                return true;
440            }
441            else {
442                setLogPrefix(reg.getId());
443                LOG.error(
444                        "SLACalculator memory capacity reached. Cannot add or update new SLA Registration entry for job [{0}]",
445                        reg.getId());
446                LogUtils.clearLogPrefix();
447            }
448        }
449        catch (JPAExecutorException jpa) {
450            throw jpa;
451        }
452        return false;
453    }
454
455    /**
456     * Remove job from being tracked in map
457     */
458    @Override
459    public void removeRegistration(String jobId) {
460        if (slaMap.remove(jobId) == null) {
461            historySet.remove(jobId);
462        }
463    }
464
465    /**
466     * Triggered after receiving Job status change event, update SLA status
467     * accordingly
468     */
469    @Override
470    public boolean addJobStatus(String jobId, String jobStatus, JobEvent.EventStatus jobEventStatus, Date startTime,
471            Date endTime) throws JPAExecutorException, ServiceException {
472        LOG.debug(
473                "Received addJobStatus request for job  [{0}] jobStatus = [{1}], jobEventStatus = [{2}], startTime = [{3}], "
474                        + "endTime = [{4}] ", jobId, jobStatus, jobEventStatus, startTime, endTime);
475        SLACalcStatus slaCalc = slaMap.get(jobId);
476
477        if (slaCalc == null) {
478            SLARegistrationBean slaRegBean = SLARegistrationQueryExecutor.getInstance().get(
479                    SLARegQuery.GET_SLA_REG_ALL, jobId);
480            if (slaRegBean != null) { // filter out jobs picked by SLA job event listener
481                                      // but not actually configured for SLA
482                SLASummaryBean slaSummaryBean = SLASummaryQueryExecutor.getInstance().get(
483                        SLASummaryQuery.GET_SLA_SUMMARY, jobId);
484                slaCalc = new SLACalcStatus(slaSummaryBean, slaRegBean);
485                slaMap.put(jobId, slaCalc);
486            }
487        }
488        else {
489            SLASummaryBean summaryBean = ((SLASummaryQueryExecutor) SLASummaryQueryExecutor.getInstance()).get(
490                    SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED_LAST_MODIFIED, jobId);
491            byte eventProc = summaryBean.getEventProcessed();
492            if (!slaCalc.getLastModifiedTime().equals(summaryBean.getLastModifiedTime())) {
493                // Update last modified time.
494                slaCalc.setLastModifiedTime(summaryBean.getLastModifiedTime());
495                reloadExpectedTimeAndConfig(slaCalc);
496                LOG.debug("Last modified time has changed for job " + jobId + " reloading config from DB");
497
498            }
499            slaCalc.setEventProcessed(eventProc);
500        }
501        if (slaCalc != null) {
502            try {
503                SLAXCommandFactory.getSLAEventXCommand(slaCalc,
504                        ConfigurationService.getLong(SLAService.CONF_SLA_CALC_LOCK_TIMEOUT, 20 * 1000)).call();
505                checkEventProc(slaCalc);
506            }
507            catch (XException e) {
508                LOG.error(e);
509                throw new ServiceException(e);
510            }
511            return true;
512        }
513        else {
514            return false;
515        }
516    }
517
518    private void checkEventProc(SLACalcStatus slaCalc){
519        byte eventProc = slaCalc.getEventProcessed();
520        if (slaCalc.getEventProcessed() >= 8) {
521            slaMap.remove(slaCalc.getId());
522            LOG.debug("Removed Job [{0}] from map after Event-processed=8", slaCalc.getId());
523        }
524        if (eventProc == 7) {
525            historySet.add(slaCalc.getId());
526            slaMap.remove(slaCalc.getId());
527            LOG.debug("Removed Job [{0}] from map after Event-processed=7", slaCalc.getId());
528        }
529    }
530
531    public void reloadExpectedTimeAndConfig(SLACalcStatus slaCalc) throws JPAExecutorException {
532        SLARegistrationBean regBean = SLARegistrationQueryExecutor.getInstance().get(
533                SLARegQuery.GET_SLA_EXPECTED_VALUE_CONFIG, slaCalc.getId());
534
535        if (regBean.getExpectedDuration() > 0) {
536            slaCalc.getSLARegistrationBean().setExpectedDuration(regBean.getExpectedDuration());
537        }
538        if (regBean.getExpectedEnd() != null) {
539            slaCalc.getSLARegistrationBean().setExpectedEnd(regBean.getExpectedEnd());
540        }
541        if (regBean.getExpectedStart() != null) {
542            slaCalc.getSLARegistrationBean().setExpectedStart(regBean.getExpectedStart());
543        }
544        if (regBean.getSLAConfigMap().containsKey(OozieClient.SLA_DISABLE_ALERT)) {
545            slaCalc.getSLARegistrationBean().addToSLAConfigMap(OozieClient.SLA_DISABLE_ALERT,
546                    regBean.getSLAConfigMap().get(OozieClient.SLA_DISABLE_ALERT));
547        }
548        if (regBean.getNominalTime() != null) {
549            slaCalc.getSLARegistrationBean().setNominalTime(regBean.getNominalTime());
550        }
551    }
552
553    @VisibleForTesting
554    public boolean isJobIdInSLAMap(String jobId) {
555        return this.slaMap.containsKey(jobId);
556    }
557
558    @VisibleForTesting
559    public boolean isJobIdInHistorySet(String jobId) {
560        return this.historySet.contains(jobId);
561    }
562
563    private void setLogPrefix(String jobId) {
564        LOG = LogUtils.setLogInfo(LOG, jobId, null, null);
565    }
566
567    @Override
568    public boolean enableAlert(List<String> jobIds) throws JPAExecutorException, ServiceException {
569        boolean isJobFound = false;
570        @SuppressWarnings("rawtypes")
571        List<UpdateEntry> updateList = new ArrayList<BatchQueryExecutor.UpdateEntry>();
572        for (String jobId : jobIds) {
573            SLACalcStatus slaCalc = getSLACalcStatus(jobId);
574            if (slaCalc != null) {
575                slaCalc.getSLARegistrationBean().removeFromSLAConfigMap(OozieClient.SLA_DISABLE_ALERT);
576                updateDBSlaConfig(slaCalc, updateList);
577                isJobFound = true;
578            }
579        }
580        executeBatchQuery(updateList);
581        return isJobFound;
582    }
583
584    @Override
585    public boolean enableChildJobAlert(List<String> parentJobIds) throws JPAExecutorException, ServiceException {
586        return enableAlert(getSLAJobsforParents(parentJobIds));
587    }
588
589    @Override
590    public boolean disableAlert(List<String> jobIds) throws JPAExecutorException, ServiceException {
591        boolean isJobFound = false;
592        @SuppressWarnings("rawtypes")
593        List<UpdateEntry> updateList = new ArrayList<BatchQueryExecutor.UpdateEntry>();
594
595        for (String jobId : jobIds) {
596            SLACalcStatus slaCalc = getSLACalcStatus(jobId);
597            if (slaCalc != null) {
598                slaCalc.getSLARegistrationBean().addToSLAConfigMap(OozieClient.SLA_DISABLE_ALERT,
599                        Boolean.toString(true));
600                updateDBSlaConfig(slaCalc, updateList);
601                isJobFound = true;
602            }
603        }
604        executeBatchQuery(updateList);
605        return isJobFound;
606    }
607
608    @Override
609    public boolean disableChildJobAlert(List<String> parentJobIds) throws JPAExecutorException, ServiceException {
610        return disableAlert(getSLAJobsforParents(parentJobIds));
611    }
612
613    @Override
614    public boolean changeDefinition(List<Pair<String, Map<String, String>>> jobIdsSLAPair) throws JPAExecutorException,
615            ServiceException {
616        boolean isJobFound = false;
617        @SuppressWarnings("rawtypes")
618        List<UpdateEntry> updateList = new ArrayList<BatchQueryExecutor.UpdateEntry>();
619        for (Pair<String, Map<String, String>> jobIdSLAPair : jobIdsSLAPair) {
620            SLACalcStatus slaCalc = getSLACalcStatus(jobIdSLAPair.getFist());
621            if (slaCalc != null) {
622                updateParams(slaCalc, jobIdSLAPair.getSecond());
623                updateDBSlaExpectedValues(slaCalc, updateList);
624                isJobFound = true;
625            }
626        }
627        executeBatchQuery(updateList);
628        return isJobFound;
629    }
630
631    private void updateParams(SLACalcStatus slaCalc, Map<String, String> newParams) throws ServiceException {
632        SLARegistrationBean reg = slaCalc.getSLARegistrationBean();
633        if (newParams != null) {
634            try {
635                Date newNominal = SLAOperations.setNominalTime(newParams.get(RestConstants.SLA_NOMINAL_TIME), reg);
636                SLAOperations.setExpectedStart(newParams.get(RestConstants.SLA_SHOULD_START), newNominal, reg);
637                SLAOperations.setExpectedEnd(newParams.get(RestConstants.SLA_SHOULD_END), newNominal, reg);
638                SLAOperations.setExpectedDuration(newParams.get(RestConstants.SLA_MAX_DURATION), reg);
639            }
640            catch (CommandException ce) {
641                throw new ServiceException(ce);
642            }
643        }
644    }
645
646    private List<String> getSLAJobsforParents(List<String> parentJobIds) throws JPAExecutorException {
647        List<String> childJobIds = new ArrayList<String>();
648        for (String jobId : parentJobIds) {
649            List<SLARegistrationBean> registrationBeanList = SLARegistrationQueryExecutor.getInstance().getList(
650                    SLARegQuery.GET_SLA_REG_FOR_PARENT_ID, jobId);
651            for (SLARegistrationBean bean : registrationBeanList) {
652                childJobIds.add(bean.getId());
653            }
654        }
655        return childJobIds;
656    }
657
658}