001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.sla.service;
020
021import java.util.Date;
022import java.util.List;
023import java.util.Map;
024
025import org.apache.hadoop.conf.Configuration;
026import org.apache.oozie.ErrorCode;
027import org.apache.oozie.client.event.JobEvent.EventStatus;
028import org.apache.oozie.executor.jpa.JPAExecutorException;
029import org.apache.oozie.service.ConfigurationService;
030import org.apache.oozie.service.EventHandlerService;
031import org.apache.oozie.service.SchedulerService;
032import org.apache.oozie.service.Service;
033import org.apache.oozie.service.ServiceException;
034import org.apache.oozie.service.Services;
035import org.apache.oozie.sla.SLACalculator;
036import org.apache.oozie.sla.SLACalculatorMemory;
037import org.apache.oozie.sla.SLARegistrationBean;
038import org.apache.oozie.util.Pair;
039import org.apache.oozie.util.XLog;
040
041import com.google.common.annotations.VisibleForTesting;
042
043public class SLAService implements Service {
044
045    public static final String CONF_PREFIX = "oozie.sla.service.SLAService.";
046    public static final String CONF_CALCULATOR_IMPL = CONF_PREFIX + "calculator.impl";
047    public static final String CONF_CAPACITY = CONF_PREFIX + "capacity";
048    public static final String CONF_ALERT_EVENTS = CONF_PREFIX + "alert.events";
049    public static final String CONF_EVENTS_MODIFIED_AFTER = CONF_PREFIX + "events.modified.after";
050    public static final String CONF_JOB_EVENT_LATENCY = CONF_PREFIX + "job.event.latency";
051    //Time interval, in seconds, at which SLA Worker will be scheduled to run
052    public static final String CONF_SLA_CHECK_INTERVAL = CONF_PREFIX + "check.interval";
053    public static final String CONF_SLA_CHECK_INITIAL_DELAY = CONF_PREFIX + "check.initial.delay";
054    public static final String CONF_SLA_CALC_LOCK_TIMEOUT = CONF_PREFIX + "oozie.sla.calc.default.lock.timeout";
055    public static final String CONF_SLA_HISTORY_PURGE_INTERVAL = CONF_PREFIX + "history.purge.interval";
056
057    private static SLACalculator calcImpl;
058    private static boolean slaEnabled = false;
059    private EventHandlerService eventHandler;
060    public static XLog LOG;
061    @Override
062    public void init(Services services) throws ServiceException {
063        try {
064            Configuration conf = services.getConf();
065            Class<? extends SLACalculator> calcClazz = (Class<? extends SLACalculator>) ConfigurationService.getClass(
066                    conf, CONF_CALCULATOR_IMPL);
067            calcImpl = calcClazz == null ? new SLACalculatorMemory() : (SLACalculator) calcClazz.newInstance();
068            calcImpl.init(conf);
069            eventHandler = Services.get().get(EventHandlerService.class);
070            if (eventHandler == null) {
071                throw new ServiceException(ErrorCode.E0103, "EventHandlerService", "Add it under config "
072                        + Services.CONF_SERVICE_EXT_CLASSES + " or declare it BEFORE SLAService");
073            }
074            LOG = XLog.getLog(getClass());
075            java.util.Set<String> appTypes = eventHandler.getAppTypes();
076            appTypes.add("workflow_action");
077            eventHandler.setAppTypes(appTypes);
078
079            Runnable slaThread = new SLAWorker(calcImpl);
080            // schedule runnable by default every 30 sec
081            int slaCheckInterval = ConfigurationService.getInt(conf, CONF_SLA_CHECK_INTERVAL);
082            int slaCheckInitialDelay = ConfigurationService.getInt(conf, CONF_SLA_CHECK_INITIAL_DELAY);
083            services.get(SchedulerService.class).schedule(slaThread, slaCheckInitialDelay, slaCheckInterval,
084                    SchedulerService.Unit.SEC);
085            slaEnabled = true;
086            LOG.info("SLAService initialized with impl [{0}] capacity [{1}]", calcImpl.getClass().getName(),
087                    conf.get(SLAService.CONF_CAPACITY));
088        }
089        catch (Exception ex) {
090            throw new ServiceException(ErrorCode.E0102, ex.getMessage(), ex);
091        }
092    }
093
094    @Override
095    public void destroy() {
096        slaEnabled = false;
097    }
098
099    @Override
100    public Class<? extends Service> getInterface() {
101        return SLAService.class;
102    }
103
104    public static boolean isEnabled() {
105        return slaEnabled;
106    }
107
108    @VisibleForTesting
109    public SLACalculator getSLACalculator() {
110        return calcImpl;
111    }
112
113    public void runSLAWorker() {
114        new SLAWorker(calcImpl).run();
115    }
116
117    private class SLAWorker implements Runnable {
118
119        SLACalculator calc;
120
121        public SLAWorker(SLACalculator calc) {
122            this.calc = calc;
123        }
124
125        @Override
126        public void run() {
127            if (Thread.currentThread().isInterrupted()) {
128                return;
129            }
130            try {
131                calc.updateAllSlaStatus();
132            }
133            catch (Throwable error) {
134                XLog.getLog(SLAService.class).debug("Throwable in SLAWorker thread run : ", error);
135            }
136        }
137    }
138
139    public boolean addRegistrationEvent(SLARegistrationBean reg) throws ServiceException {
140        try {
141            if (calcImpl.addRegistration(reg.getId(), reg)) {
142                return true;
143            }
144            else {
145                LOG.warn("SLA queue full. Unable to add new SLA entry for job [{0}]", reg.getId());
146            }
147        }
148        catch (JPAExecutorException ex) {
149            LOG.warn("Could not add new SLA entry for job [{0}]", reg.getId(), ex);
150        }
151        return false;
152    }
153
154    public boolean updateRegistrationEvent(SLARegistrationBean reg) throws ServiceException {
155        try {
156            if (calcImpl.updateRegistration(reg.getId(), reg)) {
157                return true;
158            }
159            else {
160                LOG.warn("SLA queue full. Unable to update the SLA entry for job [{0}]", reg.getId());
161            }
162        }
163        catch (JPAExecutorException ex) {
164            LOG.warn("Could not update SLA entry for job [{0}]", reg.getId(), ex);
165        }
166        return false;
167    }
168
169    public boolean addStatusEvent(String jobId, String status, EventStatus eventStatus, Date startTime, Date endTime)
170            throws ServiceException {
171        try {
172            if (calcImpl.addJobStatus(jobId, status, eventStatus, startTime, endTime)) {
173                return true;
174            }
175        }
176        catch (JPAExecutorException jpe) {
177            LOG.error("Exception while adding SLA Status event for Job [{0}]", jobId);
178        }
179        return false;
180    }
181
182    public void removeRegistration(String jobId) {
183        calcImpl.removeRegistration(jobId);
184    }
185
186    /**
187     * Enable jobs sla alert.
188     *
189     * @param jobIds the job ids
190     * @return true, if successful
191     * @throws ServiceException the service exception
192     */
193    public boolean enableAlert(List<String> jobIds) throws ServiceException {
194        try {
195            return calcImpl.enableAlert(jobIds);
196        }
197        catch (JPAExecutorException jpe) {
198            LOG.error("Exception while updating SLA alerting for Job [{0}]", jobIds.get(0));
199            throw new ServiceException(jpe);
200        }
201    }
202
203    /**
204     * Enable child jobs sla alert.
205     *
206     * @param parentJobIds the parent job ids
207     * @return true, if successful
208     * @throws ServiceException the service exception
209     */
210    public boolean enableChildJobAlert(List<String> parentJobIds) throws ServiceException {
211        try {
212            return calcImpl.enableChildJobAlert(parentJobIds);
213        }
214        catch (JPAExecutorException jpe) {
215            LOG.error("Exception while updating SLA alerting for Job [{0}]", parentJobIds.get(0));
216            throw new ServiceException(jpe);
217        }
218    }
219
220    /**
221     * Disable jobs Sla alert.
222     *
223     * @param jobIds the job ids
224     * @return true, if successful
225     * @throws ServiceException the service exception
226     */
227    public boolean disableAlert(List<String> jobIds) throws ServiceException {
228        try {
229            return calcImpl.disableAlert(jobIds);
230        }
231        catch (JPAExecutorException jpe) {
232            LOG.error("Exception while updating SLA alerting for Job [{0}]", jobIds.get(0));
233            throw new ServiceException(jpe);
234        }
235    }
236
237    /**
238     * Disable child jobs Sla alert.
239     *
240     * @param parentJobIds the parent job ids
241     * @return true, if successful
242     * @throws ServiceException the service exception
243     */
244    public boolean disableChildJobAlert(List<String> parentJobIds) throws ServiceException {
245        try {
246            return calcImpl.disableChildJobAlert(parentJobIds);
247        }
248        catch (JPAExecutorException jpe) {
249            LOG.error("Exception while updating SLA alerting for Job [{0}]", parentJobIds.get(0));
250            throw new ServiceException(jpe);
251        }
252    }
253
254    /**
255     * Change jobs Sla definitions
256     * It takes list of pairs of jobid and key/value pairs of el evaluated sla definition.
257     * Support definition are sla-should-start, sla-should-end, sla-nominal-time and sla-max-duration.
258     *
259     * @param idSlaDefinitionList the job ids sla pair
260     * @return true, if successful
261     * @throws ServiceException the service exception
262     */
263    public boolean changeDefinition(List<Pair<String, Map<String, String>>> idSlaDefinitionList)
264            throws ServiceException {
265        try {
266            return calcImpl.changeDefinition(idSlaDefinitionList);
267        }
268        catch (JPAExecutorException jpe) {
269            throw new ServiceException(jpe);
270        }
271    }
272}