001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.sla.service;
019
020import java.util.Date;
021
022import org.apache.hadoop.conf.Configuration;
023import org.apache.oozie.ErrorCode;
024import org.apache.oozie.client.event.JobEvent.EventStatus;
025import org.apache.oozie.executor.jpa.JPAExecutorException;
026import org.apache.oozie.service.EventHandlerService;
027import org.apache.oozie.service.SchedulerService;
028import org.apache.oozie.service.Service;
029import org.apache.oozie.service.ServiceException;
030import org.apache.oozie.service.Services;
031import org.apache.oozie.sla.SLACalculator;
032import org.apache.oozie.sla.SLACalculatorMemory;
033import org.apache.oozie.sla.SLARegistrationBean;
034import org.apache.oozie.util.XLog;
035
036import com.google.common.annotations.VisibleForTesting;
037
038public class SLAService implements Service {
039
040    public static final String CONF_PREFIX = "oozie.sla.service.SLAService.";
041    public static final String CONF_CALCULATOR_IMPL = CONF_PREFIX + "calculator.impl";
042    public static final String CONF_CAPACITY = CONF_PREFIX + "capacity";
043    public static final String CONF_ALERT_EVENTS = CONF_PREFIX + "alert.events";
044    public static final String CONF_EVENTS_MODIFIED_AFTER = CONF_PREFIX + "events.modified.after";
045    public static final String CONF_JOB_EVENT_LATENCY = CONF_PREFIX + "job.event.latency";
046    //Time interval, in seconds, at which SLA Worker will be scheduled to run
047    public static final String CONF_SLA_CHECK_INTERVAL = CONF_PREFIX + "check.interval";
048    public static final String CONF_SLA_CHECK_INITIAL_DELAY = CONF_PREFIX + "check.initial.delay";
049    public static final String CONF_SLA_CALC_LOCK_TIMEOUT = CONF_PREFIX + "oozie.sla.calc.default.lock.timeout";
050    public static final String CONF_SLA_HISTORY_PURGE_INTERVAL = CONF_PREFIX + "history.purge.interval";
051
052    private static SLACalculator calcImpl;
053    private static boolean slaEnabled = false;
054    private EventHandlerService eventHandler;
055    public static XLog LOG;
056    @Override
057    public void init(Services services) throws ServiceException {
058        try {
059            Configuration conf = services.getConf();
060            Class<? extends SLACalculator> calcClazz = (Class<? extends SLACalculator>) conf.getClass(
061                    CONF_CALCULATOR_IMPL, null);
062            calcImpl = calcClazz == null ? new SLACalculatorMemory() : (SLACalculator) calcClazz.newInstance();
063            calcImpl.init(conf);
064            eventHandler = Services.get().get(EventHandlerService.class);
065            if (eventHandler == null) {
066                throw new ServiceException(ErrorCode.E0103, "EventHandlerService", "Add it under config "
067                        + Services.CONF_SERVICE_EXT_CLASSES + " or declare it BEFORE SLAService");
068            }
069            LOG = XLog.getLog(getClass());
070            java.util.Set<String> appTypes = eventHandler.getAppTypes();
071            appTypes.add("workflow_action");
072            eventHandler.setAppTypes(appTypes);
073
074            Runnable slaThread = new SLAWorker(calcImpl);
075            // schedule runnable by default every 30 sec
076            int slaCheckInterval = services.getConf().getInt(CONF_SLA_CHECK_INTERVAL, 30);
077            int slaCheckInitialDelay = services.getConf().getInt(CONF_SLA_CHECK_INITIAL_DELAY, 10);
078            services.get(SchedulerService.class).schedule(slaThread, slaCheckInitialDelay, slaCheckInterval,
079                    SchedulerService.Unit.SEC);
080            slaEnabled = true;
081            LOG.info("SLAService initialized with impl [{0}] capacity [{1}]", calcImpl.getClass().getName(),
082                    conf.get(SLAService.CONF_CAPACITY));
083        }
084        catch (Exception ex) {
085            throw new ServiceException(ErrorCode.E0102, ex.getMessage(), ex);
086        }
087    }
088
089    @Override
090    public void destroy() {
091        slaEnabled = false;
092    }
093
094    @Override
095    public Class<? extends Service> getInterface() {
096        return SLAService.class;
097    }
098
099    public static boolean isEnabled() {
100        return slaEnabled;
101    }
102
103    @VisibleForTesting
104    public SLACalculator getSLACalculator() {
105        return calcImpl;
106    }
107
108    @VisibleForTesting
109    public void runSLAWorker() {
110        new SLAWorker(calcImpl).run();
111    }
112
113    private class SLAWorker implements Runnable {
114
115        SLACalculator calc;
116
117        public SLAWorker(SLACalculator calc) {
118            this.calc = calc;
119        }
120
121        @Override
122        public void run() {
123            if (Thread.currentThread().isInterrupted()) {
124                return;
125            }
126            try {
127                calc.updateAllSlaStatus();
128            }
129            catch (Throwable error) {
130                XLog.getLog(SLAService.class).debug("Throwable in SLAWorker thread run : ", error);
131            }
132        }
133    }
134
135    public boolean addRegistrationEvent(SLARegistrationBean reg) throws ServiceException {
136        try {
137            if (calcImpl.addRegistration(reg.getId(), reg)) {
138                return true;
139            }
140            else {
141                LOG.warn("SLA queue full. Unable to add new SLA entry for job [{0}]", reg.getId());
142            }
143        }
144        catch (JPAExecutorException ex) {
145            LOG.warn("Could not add new SLA entry for job [{0}]", reg.getId(), ex);
146        }
147        return false;
148    }
149
150    public boolean updateRegistrationEvent(SLARegistrationBean reg) throws ServiceException {
151        try {
152            if (calcImpl.updateRegistration(reg.getId(), reg)) {
153                return true;
154            }
155            else {
156                LOG.warn("SLA queue full. Unable to update the SLA entry for job [{0}]", reg.getId());
157            }
158        }
159        catch (JPAExecutorException ex) {
160            LOG.warn("Could not update SLA entry for job [{0}]", reg.getId(), ex);
161        }
162        return false;
163    }
164
165    public boolean addStatusEvent(String jobId, String status, EventStatus eventStatus, Date startTime, Date endTime)
166            throws ServiceException {
167        try {
168            if (calcImpl.addJobStatus(jobId, status, eventStatus, startTime, endTime)) {
169                return true;
170            }
171        }
172        catch (JPAExecutorException jpe) {
173            LOG.error("Exception while adding SLA Status event for Job [{0}]", jobId);
174        }
175        return false;
176    }
177
178    public void removeRegistration(String jobId) {
179        calcImpl.removeRegistration(jobId);
180    }
181
182}