001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.sla;
020
021import java.sql.Timestamp;
022import java.util.ArrayList;
023import java.util.Collections;
024import java.util.Date;
025import java.util.HashSet;
026import java.util.Iterator;
027import java.util.List;
028import java.util.Map;
029import java.util.Set;
030import java.util.concurrent.ConcurrentHashMap;
031
032import org.apache.hadoop.conf.Configuration;
033import org.apache.oozie.AppType;
034import org.apache.oozie.ErrorCode;
035import org.apache.oozie.XException;
036import org.apache.oozie.client.CoordinatorAction;
037import org.apache.oozie.client.OozieClient;
038import org.apache.oozie.client.WorkflowAction;
039import org.apache.oozie.client.WorkflowJob;
040import org.apache.oozie.client.event.JobEvent;
041import org.apache.oozie.client.event.SLAEvent.SLAStatus;
042import org.apache.oozie.client.rest.JsonBean;
043import org.apache.oozie.client.rest.RestConstants;
044import org.apache.oozie.command.CommandException;
045import org.apache.oozie.executor.jpa.BatchQueryExecutor;
046import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
047import org.apache.oozie.executor.jpa.JPAExecutorException;
048import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor;
049import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor.SLARegQuery;
050import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor;
051import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery;
052import org.apache.oozie.executor.jpa.sla.SLASummaryGetRecordsOnRestartJPAExecutor;
053import org.apache.oozie.service.ConfigurationService;
054import org.apache.oozie.service.EventHandlerService;
055import org.apache.oozie.service.JPAService;
056import org.apache.oozie.service.SchedulerService;
057import org.apache.oozie.service.ServiceException;
058import org.apache.oozie.service.Services;
059import org.apache.oozie.sla.service.SLAService;
060import org.apache.oozie.util.DateUtils;
061import org.apache.oozie.util.LogUtils;
062import org.apache.oozie.util.XLog;
063import org.apache.oozie.util.Pair;
064
065import com.google.common.annotations.VisibleForTesting;
066
067/**
068 * Implementation class for SLACalculator that calculates SLA related to
069 * start/end/duration of jobs using a memory-based map
070 */
071public class SLACalculatorMemory implements SLACalculator {
072
073    private static XLog LOG = XLog.getLog(SLACalculatorMemory.class);
074    // TODO optimization priority based insertion/processing/bumping up-down
075    protected Map<String, SLACalcStatus> slaMap;
076    protected Set<String> historySet;
077    private static int capacity;
078    private static JPAService jpaService;
079    protected EventHandlerService eventHandler;
080    private static int modifiedAfter;
081    private static long jobEventLatency;
082
083    @Override
084    public void init(Configuration conf) throws ServiceException {
085        capacity = ConfigurationService.getInt(conf, SLAService.CONF_CAPACITY);
086        jobEventLatency = ConfigurationService.getInt(conf, SLAService.CONF_JOB_EVENT_LATENCY);
087        slaMap = new ConcurrentHashMap<String, SLACalcStatus>();
088        historySet = Collections.synchronizedSet(new HashSet<String>());
089        jpaService = Services.get().get(JPAService.class);
090        eventHandler = Services.get().get(EventHandlerService.class);
091        // load events modified after
092        modifiedAfter = conf.getInt(SLAService.CONF_EVENTS_MODIFIED_AFTER, 7);
093        loadOnRestart();
094        Runnable purgeThread = new HistoryPurgeWorker();
095        // schedule runnable by default 1 hours
096        Services.get()
097                .get(SchedulerService.class)
098                .schedule(purgeThread, 3600, Services.get().getConf().getInt(SLAService.CONF_SLA_HISTORY_PURGE_INTERVAL, 3600),
099                        SchedulerService.Unit.SEC);
100    }
101
102    public class HistoryPurgeWorker extends Thread {
103
104        public HistoryPurgeWorker() {
105        }
106
107        @Override
108        public void run() {
109            if (Thread.currentThread().isInterrupted()) {
110                return;
111            }
112            Iterator<String> jobItr = historySet.iterator();
113            while (jobItr.hasNext()) {
114                String jobId = jobItr.next();
115                LOG.debug(" Running HistoryPurgeWorker for " + jobId);
116                try {
117                    boolean isDone = SLAXCommandFactory.getSLAJobHistoryXCommand(jobId).call();
118                    if (isDone) {
119                        LOG.debug("[{0}] job is finished and processed. Removing from history");
120                        jobItr.remove();
121                    }
122                }
123                catch (CommandException e) {
124                    if (e.getErrorCode().equals(ErrorCode.E0604) || e.getErrorCode().equals(ErrorCode.E0605)) {
125                        LOG.warn("Job is not found in db: " + jobId, e);
126                        jobItr.remove();
127                    }
128                    else {
129                        LOG.error("Failed to fetch the job: " + jobId, e);
130                    }
131                }
132            }
133        }
134    }
135
136    private void loadOnRestart() {
137        try {
138            List<SLASummaryBean> summaryBeans = jpaService
139                    .execute(new SLASummaryGetRecordsOnRestartJPAExecutor(modifiedAfter));
140            for (SLASummaryBean summaryBean : summaryBeans) {
141                String jobId = summaryBean.getId();
142                slaMap.put(jobId, new SLACalcStatus(summaryBean));
143            }
144            LOG.info("Loaded {0} SLASummary object after restart", slaMap.size());
145        }
146        catch (Exception e) {
147            LOG.warn("Failed to retrieve SLASummary records on restart", e);
148        }
149    }
150
151    @Override
152    public int size() {
153        return slaMap.size();
154    }
155
156    @VisibleForTesting
157    public Set<String> getHistorySet(){
158        return historySet;
159    }
160
161    @Override
162    public SLACalcStatus get(String jobId) throws JPAExecutorException {
163        SLACalcStatus memObj;
164        memObj = slaMap.get(jobId);
165        if (memObj == null && historySet.contains(jobId)) {
166            memObj = new SLACalcStatus(SLASummaryQueryExecutor.getInstance()
167                    .get(SLASummaryQuery.GET_SLA_SUMMARY, jobId), SLARegistrationQueryExecutor.getInstance().get(
168                    SLARegQuery.GET_SLA_REG_ON_RESTART, jobId));
169        }
170        return memObj;
171    }
172
173    /**
174     * Get SLACalcStatus from map if SLARegistration is not null, else create a new SLACalcStatus
175     * This function deosn't update  slaMap
176     * @param jobId
177     * @return
178     * @throws JPAExecutorException
179     */
180    private SLACalcStatus getOrCreateSLACalcStatus(String jobId) throws JPAExecutorException {
181        SLACalcStatus memObj;
182        memObj = slaMap.get(jobId);
183        // if the request came from immediately after restart don't use map SLACalcStatus.
184        if (memObj == null || memObj.getSLARegistrationBean() == null) {
185            SLARegistrationBean registrationBean = SLARegistrationQueryExecutor.getInstance()
186                    .get(SLARegQuery.GET_SLA_REG_ON_RESTART, jobId);
187            SLASummaryBean summaryBean = memObj == null
188                    ? SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId)
189                    : memObj.getSLASummaryBean();
190            return new SLACalcStatus(summaryBean, registrationBean);
191        }
192        return memObj;
193    }
194
195    @Override
196    public Iterator<String> iterator() {
197        return slaMap.keySet().iterator();
198    }
199
200    @Override
201    public boolean isEmpty() {
202        return slaMap.isEmpty();
203    }
204
205    @Override
206    public void clear() {
207        slaMap.clear();
208        historySet.clear();
209    }
210
211    /**
212     * Invoked via periodic run, update the SLA for registered jobs
213     */
214    protected void updateJobSla(String jobId) throws Exception {
215        SLACalcStatus slaCalc = slaMap.get(jobId);
216
217        if (slaCalc == null) {
218            // job might be processed and removed from map by addJobStatus
219            return;
220        }
221        boolean firstCheckAfterRetstart = checkAndUpdateSLACalcAfterRestart(slaCalc);
222        // get eventProcessed on DB for validation in HA
223        SLASummaryBean summaryBean = null;
224        try {
225            summaryBean = ((SLASummaryQueryExecutor) SLASummaryQueryExecutor.getInstance())
226                    .get(SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED_LAST_MODIFIED, jobId);
227        }
228        catch (JPAExecutorException e) {
229            if (e.getErrorCode().equals(ErrorCode.E0604) || e.getErrorCode().equals(ErrorCode.E0605)) {
230                LOG.debug("job [{0}] is is not in DB, removing from Memory", jobId);
231                slaMap.remove(jobId);
232                return;
233            }
234            throw e;
235        }
236        byte eventProc = summaryBean.getEventProcessed();
237        slaCalc.setEventProcessed(eventProc);
238        if (eventProc >= 7) {
239            if (eventProc == 7) {
240                historySet.add(jobId);
241            }
242            slaMap.remove(jobId);
243            LOG.trace("Removed Job [{0}] from map as SLA processed", jobId);
244        }
245        else {
246            if (!slaCalc.getLastModifiedTime().equals(summaryBean.getLastModifiedTime())) {
247                // Update last modified time.
248                slaCalc.setLastModifiedTime(summaryBean.getLastModifiedTime());
249                reloadExpectedTimeAndConfig(slaCalc);
250                LOG.debug("Last modified time has changed for job " + jobId + " reloading config from DB");
251            }
252            if (firstCheckAfterRetstart || isChanged(slaCalc)) {
253                LOG.debug("{0} job has SLA event change. EventProc = {1}, status = {2}", slaCalc.getId(),
254                        slaCalc.getEventProcessed(), slaCalc.getJobStatus());
255                try {
256                    SLAXCommandFactory.getSLAEventXCommand(slaCalc).call();
257                    checkEventProc(slaCalc);
258                }
259                catch (XException e) {
260                    if (e.getErrorCode().equals(ErrorCode.E0604) || e.getErrorCode().equals(ErrorCode.E0605)) {
261                        LOG.debug("job [{0}] is is not in DB, removing from Memory", slaCalc.getId());
262                        slaMap.remove(jobId);
263                    }
264                    else {
265                        if (firstCheckAfterRetstart) {
266                            slaCalc.setSLARegistrationBean(null);
267                        }
268                    }
269                }
270            }
271        }
272    }
273
274    private boolean isChanged(SLACalcStatus slaCalc) {
275        SLARegistrationBean reg = slaCalc.getSLARegistrationBean();
276        byte eventProc = slaCalc.getEventProcessed();
277
278        if ((eventProc & 1) == 0) { // first bit (start-processed) unset
279            if (reg.getExpectedStart() != null) {
280                if (reg.getExpectedStart().getTime() + jobEventLatency < System.currentTimeMillis()) {
281                    return true;
282                }
283            }
284            else {
285                return true;
286            }
287        }
288        if (eventProc != 8 && ((eventProc >> 1) & 1) == 0) {
289            if (reg.getExpectedDuration() == -1) {
290                return true;
291            }
292            else if (slaCalc.getActualStart() != null) {
293                if ((reg.getExpectedDuration() + jobEventLatency) < (System.currentTimeMillis() - slaCalc
294                        .getActualStart().getTime())) {
295                    return true;
296                }
297            }
298        }
299        if (eventProc < 4) {
300            if (reg.getExpectedEnd().getTime() + jobEventLatency < System.currentTimeMillis()) {
301                return true;
302            }
303        }
304        return false;
305    }
306
307    @SuppressWarnings("rawtypes")
308    private void updateDBSlaConfig(SLACalcStatus slaCalc, List<UpdateEntry> updateList) throws JPAExecutorException {
309        updateList.add(new UpdateEntry<SLARegQuery>(SLARegQuery.UPDATE_SLA_CONFIG, slaCalc.getSLARegistrationBean()));
310        slaCalc.setLastModifiedTime(new Date());
311        updateList.add(new UpdateEntry<SLASummaryQuery>(SLASummaryQuery.UPDATE_SLA_SUMMARY_LAST_MODIFIED_TIME,
312                new SLASummaryBean(slaCalc)));
313    }
314
315    @SuppressWarnings("rawtypes")
316    private void updateDBSlaExpectedValues(SLACalcStatus slaCalc, List<UpdateEntry> updateList)
317            throws JPAExecutorException {
318        slaCalc.setLastModifiedTime(new Date());
319        updateList.add(new UpdateEntry<SLARegQuery>(SLARegQuery.UPDATE_SLA_EXPECTED_VALUE, slaCalc
320                .getSLARegistrationBean()));
321        updateList.add(new UpdateEntry<SLASummaryQuery>(SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_EXPECTED_TIMES,
322                new SLASummaryBean(slaCalc)));
323    }
324
325    @SuppressWarnings("rawtypes")
326    private void executeBatchQuery(List<UpdateEntry> updateList) throws JPAExecutorException {
327        BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null);
328    }
329
330    /**
331     * Periodically run by the SLAService worker threads to update SLA status by
332     * iterating through all the jobs in the map
333     */
334    @Override
335    public void updateAllSlaStatus() {
336        LOG.info("Running periodic SLA check");
337        Iterator<String> iterator = slaMap.keySet().iterator();
338        while (iterator.hasNext()) {
339            String jobId = iterator.next();
340            try {
341                LOG.trace("Processing SLA for jobid={0}", jobId);
342                updateJobSla(jobId);
343            }
344            catch (Exception e) {
345                setLogPrefix(jobId);
346                LOG.error("Exception in SLA processing for job [{0}]", jobId, e);
347                LogUtils.clearLogPrefix();
348            }
349        }
350    }
351
352    /**
353     * Register a new job into the map for SLA tracking
354     */
355    @Override
356    public boolean addRegistration(String jobId, SLARegistrationBean reg) throws JPAExecutorException {
357        try {
358            if (slaMap.size() < capacity) {
359                SLACalcStatus slaCalc = new SLACalcStatus(reg);
360                slaCalc.setSLAStatus(SLAStatus.NOT_STARTED);
361                slaCalc.setJobStatus(getJobStatus(reg.getAppType()));
362                slaMap.put(jobId, slaCalc);
363                List<JsonBean> insertList = new ArrayList<JsonBean>();
364                final SLASummaryBean summaryBean = new SLASummaryBean(slaCalc);
365                final Timestamp currentTime = DateUtils.convertDateToTimestamp(new Date());
366                reg.setCreatedTimestamp(currentTime);
367                summaryBean.setCreatedTimestamp(currentTime);
368                insertList.add(reg);
369                insertList.add(summaryBean);
370                BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, null, null);
371                LOG.trace("SLA Registration Event - Job:" + jobId);
372                return true;
373            }
374            else {
375                setLogPrefix(reg.getId());
376                LOG.error(
377                        "SLACalculator memory capacity reached. Cannot add or update new SLA Registration entry for job [{0}]",
378                        reg.getId());
379                LogUtils.clearLogPrefix();
380            }
381        }
382        catch (JPAExecutorException jpa) {
383            throw jpa;
384        }
385        return false;
386    }
387
388    private String getJobStatus(AppType appType) {
389        String status = null;
390        switch (appType) {
391            case COORDINATOR_ACTION:
392                status = CoordinatorAction.Status.WAITING.name();
393                break;
394            case WORKFLOW_ACTION:
395                status = WorkflowAction.Status.PREP.name();
396                break;
397            case WORKFLOW_JOB:
398                status = WorkflowJob.Status.PREP.name();
399                break;
400            default:
401                break;
402        }
403        return status;
404    }
405
406    /**
407     * Update job into the map for SLA tracking
408     */
409    @Override
410    public boolean updateRegistration(String jobId, SLARegistrationBean reg) throws JPAExecutorException {
411        try {
412            if (slaMap.size() < capacity) {
413                SLACalcStatus slaCalc = new SLACalcStatus(reg);
414                slaCalc.setSLAStatus(SLAStatus.NOT_STARTED);
415                slaCalc.setJobStatus(getJobStatus(reg.getAppType()));
416                slaMap.put(jobId, slaCalc);
417
418                @SuppressWarnings("rawtypes")
419                List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
420                updateList.add(new UpdateEntry<SLARegQuery>(SLARegQuery.UPDATE_SLA_REG_ALL, reg));
421                updateList.add(new UpdateEntry<SLASummaryQuery>(SLASummaryQuery.UPDATE_SLA_SUMMARY_ALL,
422                        new SLASummaryBean(slaCalc)));
423                BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null);
424                LOG.trace("SLA Registration Event - Job:" + jobId);
425                return true;
426            }
427            else {
428                setLogPrefix(reg.getId());
429                LOG.error(
430                        "SLACalculator memory capacity reached. Cannot add or update new SLA Registration entry for job [{0}]",
431                        reg.getId());
432                LogUtils.clearLogPrefix();
433            }
434        }
435        catch (JPAExecutorException jpa) {
436            throw jpa;
437        }
438        return false;
439    }
440
441    /**
442     * Remove job from being tracked in map
443     */
444    @Override
445    public void removeRegistration(String jobId) {
446        if (slaMap.remove(jobId) == null) {
447            historySet.remove(jobId);
448        }
449    }
450
451    /**
452     * Triggered after receiving Job status change event, update SLA status
453     * accordingly
454     */
455    @Override
456    public boolean addJobStatus(String jobId, String jobStatus, JobEvent.EventStatus jobEventStatus, Date startTime,
457            Date endTime) throws JPAExecutorException, ServiceException {
458        LOG.debug(
459                "Received addJobStatus request for job  [{0}] jobStatus = [{1}], jobEventStatus = [{2}], startTime = [{3}], "
460                        + "endTime = [{4}] ", jobId, jobStatus, jobEventStatus, startTime, endTime);
461        SLACalcStatus slaCalc = slaMap.get(jobId);
462        boolean firstCheckAfterRetstart = checkAndUpdateSLACalcAfterRestart(slaCalc);
463        if (slaCalc == null) {
464            SLARegistrationBean slaRegBean = SLARegistrationQueryExecutor.getInstance().get(
465                    SLARegQuery.GET_SLA_REG_ALL, jobId);
466            if (slaRegBean != null) { // filter out jobs picked by SLA job event listener
467                                      // but not actually configured for SLA
468                SLASummaryBean slaSummaryBean = SLASummaryQueryExecutor.getInstance().get(
469                        SLASummaryQuery.GET_SLA_SUMMARY, jobId);
470                slaCalc = new SLACalcStatus(slaSummaryBean, slaRegBean);
471                slaMap.put(jobId, slaCalc);
472            }
473        }
474        else {
475            SLASummaryBean summaryBean = ((SLASummaryQueryExecutor) SLASummaryQueryExecutor.getInstance()).get(
476                    SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED_LAST_MODIFIED, jobId);
477            byte eventProc = summaryBean.getEventProcessed();
478            if (!slaCalc.getLastModifiedTime().equals(summaryBean.getLastModifiedTime())) {
479                // Update last modified time.
480                slaCalc.setLastModifiedTime(summaryBean.getLastModifiedTime());
481                reloadExpectedTimeAndConfig(slaCalc);
482                LOG.debug("Last modified time has changed for job " + jobId + " reloading config from DB");
483
484            }
485            slaCalc.setEventProcessed(eventProc);
486        }
487        if (slaCalc != null) {
488            try {
489                SLAXCommandFactory.getSLAEventXCommand(slaCalc,
490                        ConfigurationService.getLong(SLAService.CONF_SLA_CALC_LOCK_TIMEOUT, 20 * 1000)).call();
491                checkEventProc(slaCalc);
492            }
493            catch (XException e) {
494                if (firstCheckAfterRetstart) {
495                    slaCalc.setSLARegistrationBean(null);
496                }
497                LOG.error(e);
498                throw new ServiceException(e);
499            }
500            return true;
501        }
502        else {
503            return false;
504        }
505    }
506
507    private void checkEventProc(SLACalcStatus slaCalc){
508        byte eventProc = slaCalc.getEventProcessed();
509        if (slaCalc.getEventProcessed() >= 8) {
510            slaMap.remove(slaCalc.getId());
511            LOG.debug("Removed Job [{0}] from map after Event-processed=8", slaCalc.getId());
512        }
513        if (eventProc == 7) {
514            historySet.add(slaCalc.getId());
515            slaMap.remove(slaCalc.getId());
516            LOG.debug("Removed Job [{0}] from map after Event-processed=7", slaCalc.getId());
517        }
518    }
519
520    public void reloadExpectedTimeAndConfig(SLACalcStatus slaCalc) throws JPAExecutorException {
521        SLARegistrationBean regBean = SLARegistrationQueryExecutor.getInstance().get(
522                SLARegQuery.GET_SLA_EXPECTED_VALUE_CONFIG, slaCalc.getId());
523
524        if (regBean.getExpectedDuration() > 0) {
525            slaCalc.getSLARegistrationBean().setExpectedDuration(regBean.getExpectedDuration());
526        }
527        if (regBean.getExpectedEnd() != null) {
528            slaCalc.getSLARegistrationBean().setExpectedEnd(regBean.getExpectedEnd());
529        }
530        if (regBean.getExpectedStart() != null) {
531            slaCalc.getSLARegistrationBean().setExpectedStart(regBean.getExpectedStart());
532        }
533        if (regBean.getSLAConfigMap().containsKey(OozieClient.SLA_DISABLE_ALERT)) {
534            slaCalc.getSLARegistrationBean().addToSLAConfigMap(OozieClient.SLA_DISABLE_ALERT,
535                    regBean.getSLAConfigMap().get(OozieClient.SLA_DISABLE_ALERT));
536        }
537        if (regBean.getNominalTime() != null) {
538            slaCalc.getSLARegistrationBean().setNominalTime(regBean.getNominalTime());
539        }
540    }
541
542    @VisibleForTesting
543    public boolean isJobIdInSLAMap(String jobId) {
544        return this.slaMap.containsKey(jobId);
545    }
546
547    @VisibleForTesting
548    public boolean isJobIdInHistorySet(String jobId) {
549        return this.historySet.contains(jobId);
550    }
551
552    private void setLogPrefix(String jobId) {
553        LOG = LogUtils.setLogInfo(LOG, jobId, null, null);
554    }
555
556    @Override
557    public boolean enableAlert(List<String> jobIds) throws JPAExecutorException, ServiceException {
558        boolean isJobFound = false;
559        @SuppressWarnings("rawtypes")
560        List<UpdateEntry> updateList = new ArrayList<BatchQueryExecutor.UpdateEntry>();
561        for (String jobId : jobIds) {
562            SLACalcStatus slaCalc = getOrCreateSLACalcStatus(jobId);
563            slaCalc.getSLARegistrationBean().removeFromSLAConfigMap(OozieClient.SLA_DISABLE_ALERT);
564            updateDBSlaConfig(slaCalc, updateList);
565            isJobFound = true;
566        }
567        executeBatchQuery(updateList);
568        return isJobFound;
569    }
570
571    @Override
572    public boolean enableChildJobAlert(List<String> parentJobIds) throws JPAExecutorException, ServiceException {
573        return enableAlert(getSLAJobsforParents(parentJobIds));
574    }
575
576    @Override
577    public boolean disableAlert(List<String> jobIds) throws JPAExecutorException, ServiceException {
578        boolean isJobFound = false;
579        @SuppressWarnings("rawtypes")
580        List<UpdateEntry> updateList = new ArrayList<BatchQueryExecutor.UpdateEntry>();
581
582        for (String jobId : jobIds) {
583            SLACalcStatus slaCalc = getOrCreateSLACalcStatus(jobId);
584            slaCalc.getSLARegistrationBean().addToSLAConfigMap(OozieClient.SLA_DISABLE_ALERT, Boolean.toString(true));
585            updateDBSlaConfig(slaCalc, updateList);
586            isJobFound = true;
587        }
588        executeBatchQuery(updateList);
589        return isJobFound;
590    }
591
592    @Override
593    public boolean disableChildJobAlert(List<String> parentJobIds) throws JPAExecutorException, ServiceException {
594        return disableAlert(getSLAJobsforParents(parentJobIds));
595    }
596
597    @Override
598    public boolean changeDefinition(List<Pair<String, Map<String, String>>> jobIdsSLAPair) throws JPAExecutorException,
599            ServiceException {
600        boolean isJobFound = false;
601        @SuppressWarnings("rawtypes")
602        List<UpdateEntry> updateList = new ArrayList<BatchQueryExecutor.UpdateEntry>();
603        for (Pair<String, Map<String, String>> jobIdSLAPair : jobIdsSLAPair) {
604            SLACalcStatus slaCalc = getOrCreateSLACalcStatus(jobIdSLAPair.getFist());
605            updateParams(slaCalc, jobIdSLAPair.getSecond());
606            updateDBSlaExpectedValues(slaCalc, updateList);
607            isJobFound = true;
608        }
609        executeBatchQuery(updateList);
610        return isJobFound;
611    }
612
613    private void updateParams(SLACalcStatus slaCalc, Map<String, String> newParams) throws ServiceException {
614        SLARegistrationBean reg = slaCalc.getSLARegistrationBean();
615        if (newParams != null) {
616            try {
617                Date newNominal = SLAOperations.setNominalTime(newParams.get(RestConstants.SLA_NOMINAL_TIME), reg);
618                SLAOperations.setExpectedStart(newParams.get(RestConstants.SLA_SHOULD_START), newNominal, reg);
619                SLAOperations.setExpectedEnd(newParams.get(RestConstants.SLA_SHOULD_END), newNominal, reg);
620                SLAOperations.setExpectedDuration(newParams.get(RestConstants.SLA_MAX_DURATION), reg);
621            }
622            catch (CommandException ce) {
623                throw new ServiceException(ce);
624            }
625        }
626    }
627
628    private List<String> getSLAJobsforParents(List<String> parentJobIds) throws JPAExecutorException {
629        List<String> childJobIds = new ArrayList<String>();
630        for (String jobId : parentJobIds) {
631            List<SLARegistrationBean> registrationBeanList = SLARegistrationQueryExecutor.getInstance().getList(
632                    SLARegQuery.GET_SLA_REG_FOR_PARENT_ID, jobId);
633            for (SLARegistrationBean bean : registrationBeanList) {
634                childJobIds.add(bean.getId());
635            }
636        }
637        return childJobIds;
638    }
639
640    private boolean checkAndUpdateSLACalcAfterRestart(SLACalcStatus slaCalc) throws JPAExecutorException {
641        if (slaCalc != null && slaCalc.getSLARegistrationBean() == null) {
642            return updateSLARegistartion(slaCalc);
643        }
644        return false;
645    }
646
647    public boolean updateSLARegistartion(SLACalcStatus slaCalc) throws JPAExecutorException {
648        if (slaCalc.getSLARegistrationBean() == null) {
649            synchronized (slaCalc) {
650                if (slaCalc.getSLARegistrationBean() == null) {
651                    SLARegistrationBean slaRegBean = SLARegistrationQueryExecutor.getInstance()
652                            .get(SLARegQuery.GET_SLA_REG_ON_RESTART, slaCalc.getId());
653                    slaCalc.updateSLARegistrationBean(slaRegBean);
654                    return true;
655                }
656            }
657        }
658        return false;
659    }
660}