001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.sla.service;
020
021import java.util.Date;
022import java.util.List;
023import java.util.Map;
024
025import org.apache.hadoop.conf.Configuration;
026import org.apache.oozie.ErrorCode;
027import org.apache.oozie.client.event.JobEvent.EventStatus;
028import org.apache.oozie.executor.jpa.JPAExecutorException;
029import org.apache.oozie.service.ConfigurationService;
030import org.apache.oozie.service.EventHandlerService;
031import org.apache.oozie.service.SchedulerService;
032import org.apache.oozie.service.Service;
033import org.apache.oozie.service.ServiceException;
034import org.apache.oozie.service.Services;
035import org.apache.oozie.sla.SLACalculator;
036import org.apache.oozie.sla.SLACalculatorMemory;
037import org.apache.oozie.sla.SLARegistrationBean;
038import org.apache.oozie.util.Pair;
039import org.apache.oozie.util.XLog;
040
041import com.google.common.annotations.VisibleForTesting;
042
043public class SLAService implements Service {
044
045    public static final String CONF_PREFIX = "oozie.sla.service.SLAService.";
046    public static final String CONF_CALCULATOR_IMPL = CONF_PREFIX + "calculator.impl";
047    public static final String CONF_CAPACITY = CONF_PREFIX + "capacity";
048    public static final String CONF_ALERT_EVENTS = CONF_PREFIX + "alert.events";
049    public static final String CONF_EVENTS_MODIFIED_AFTER = CONF_PREFIX + "events.modified.after";
050    public static final String CONF_JOB_EVENT_LATENCY = CONF_PREFIX + "job.event.latency";
051    //Time interval, in seconds, at which SLA Worker will be scheduled to run
052    public static final String CONF_SLA_CHECK_INTERVAL = CONF_PREFIX + "check.interval";
053    public static final String CONF_SLA_CHECK_INITIAL_DELAY = CONF_PREFIX + "check.initial.delay";
054    public static final String CONF_SLA_CALC_LOCK_TIMEOUT = CONF_PREFIX + "oozie.sla.calc.default.lock.timeout";
055    public static final String CONF_SLA_HISTORY_PURGE_INTERVAL = CONF_PREFIX + "history.purge.interval";
056
057    private static SLACalculator calcImpl;
058    private static boolean slaEnabled = false;
059    private EventHandlerService eventHandler;
060    public static XLog LOG;
061    @Override
062    public void init(Services services) throws ServiceException {
063        try {
064            Configuration conf = services.getConf();
065            Class<? extends SLACalculator> calcClazz = (Class<? extends SLACalculator>) ConfigurationService.getClass(
066                    conf, CONF_CALCULATOR_IMPL);
067            calcImpl = calcClazz == null ? new SLACalculatorMemory() : (SLACalculator) calcClazz.newInstance();
068            calcImpl.init(conf);
069            eventHandler = Services.get().get(EventHandlerService.class);
070            if (eventHandler == null) {
071                throw new ServiceException(ErrorCode.E0103, "EventHandlerService", "Add it under config "
072                        + Services.CONF_SERVICE_EXT_CLASSES + " or declare it BEFORE SLAService");
073            }
074            LOG = XLog.getLog(getClass());
075            java.util.Set<String> appTypes = eventHandler.getAppTypes();
076            appTypes.add("workflow_action");
077            eventHandler.setAppTypes(appTypes);
078
079            Runnable slaThread = new SLAWorker(calcImpl);
080            // schedule runnable by default every 30 sec
081            int slaCheckInterval = ConfigurationService.getInt(conf, CONF_SLA_CHECK_INTERVAL);
082            int slaCheckInitialDelay = ConfigurationService.getInt(conf, CONF_SLA_CHECK_INITIAL_DELAY);
083            services.get(SchedulerService.class).schedule(slaThread, slaCheckInitialDelay, slaCheckInterval,
084                    SchedulerService.Unit.SEC);
085            slaEnabled = true;
086            LOG.info("SLAService initialized with impl [{0}] capacity [{1}]", calcImpl.getClass().getName(),
087                    conf.get(SLAService.CONF_CAPACITY));
088        }
089        catch (Exception ex) {
090            throw new ServiceException(ErrorCode.E0102, ex.getMessage(), ex);
091        }
092    }
093
094    @Override
095    public void destroy() {
096        slaEnabled = false;
097    }
098
099    @Override
100    public Class<? extends Service> getInterface() {
101        return SLAService.class;
102    }
103
104    public static boolean isEnabled() {
105        return slaEnabled;
106    }
107
108    @VisibleForTesting
109    public SLACalculator getSLACalculator() {
110        return calcImpl;
111    }
112
113    public void runSLAWorker() {
114        new SLAWorker(calcImpl).run();
115    }
116
117    @VisibleForTesting
118    public void startSLAWorker() {
119        new Thread(new SLAWorker(calcImpl)).start();
120    }
121
122    private class SLAWorker implements Runnable {
123
124        SLACalculator calc;
125
126        public SLAWorker(SLACalculator calc) {
127            this.calc = calc;
128        }
129
130        @Override
131        public void run() {
132            if (Thread.currentThread().isInterrupted()) {
133                return;
134            }
135            try {
136                calc.updateAllSlaStatus();
137            }
138            catch (Throwable error) {
139                XLog.getLog(SLAService.class).debug("Throwable in SLAWorker thread run : ", error);
140            }
141        }
142    }
143
144    public boolean addRegistrationEvent(SLARegistrationBean reg) throws ServiceException {
145        try {
146            if (calcImpl.addRegistration(reg.getId(), reg)) {
147                return true;
148            }
149            else {
150                LOG.warn("SLA queue full. Unable to add new SLA entry for job [{0}]", reg.getId());
151            }
152        }
153        catch (JPAExecutorException ex) {
154            LOG.warn("Could not add new SLA entry for job [{0}]", reg.getId(), ex);
155        }
156        return false;
157    }
158
159    public boolean updateRegistrationEvent(SLARegistrationBean reg) throws ServiceException {
160        try {
161            if (calcImpl.updateRegistration(reg.getId(), reg)) {
162                return true;
163            }
164            else {
165                LOG.warn("SLA queue full. Unable to update the SLA entry for job [{0}]", reg.getId());
166            }
167        }
168        catch (JPAExecutorException ex) {
169            LOG.warn("Could not update SLA entry for job [{0}]", reg.getId(), ex);
170        }
171        return false;
172    }
173
174    public boolean addStatusEvent(String jobId, String status, EventStatus eventStatus, Date startTime, Date endTime)
175            throws ServiceException {
176        try {
177            if (calcImpl.addJobStatus(jobId, status, eventStatus, startTime, endTime)) {
178                return true;
179            }
180        }
181        catch (JPAExecutorException jpe) {
182            LOG.error("Exception while adding SLA Status event for Job [{0}]", jobId);
183        }
184        return false;
185    }
186
187    public void removeRegistration(String jobId) {
188        calcImpl.removeRegistration(jobId);
189    }
190
191    /**
192     * Enable jobs sla alert.
193     *
194     * @param jobIds the job ids
195     * @return true, if successful
196     * @throws ServiceException the service exception
197     */
198    public boolean enableAlert(List<String> jobIds) throws ServiceException {
199        try {
200            return calcImpl.enableAlert(jobIds);
201        }
202        catch (JPAExecutorException jpe) {
203            LOG.error("Exception while updating SLA alerting for Job [{0}]", jobIds.get(0));
204            throw new ServiceException(jpe);
205        }
206    }
207
208    /**
209     * Enable child jobs sla alert.
210     *
211     * @param parentJobIds the parent job ids
212     * @return true, if successful
213     * @throws ServiceException the service exception
214     */
215    public boolean enableChildJobAlert(List<String> parentJobIds) throws ServiceException {
216        try {
217            return calcImpl.enableChildJobAlert(parentJobIds);
218        }
219        catch (JPAExecutorException jpe) {
220            LOG.error("Exception while updating SLA alerting for Job [{0}]", parentJobIds.get(0));
221            throw new ServiceException(jpe);
222        }
223    }
224
225    /**
226     * Disable jobs Sla alert.
227     *
228     * @param jobIds the job ids
229     * @return true, if successful
230     * @throws ServiceException the service exception
231     */
232    public boolean disableAlert(List<String> jobIds) throws ServiceException {
233        try {
234            return calcImpl.disableAlert(jobIds);
235        }
236        catch (JPAExecutorException jpe) {
237            LOG.error("Exception while updating SLA alerting for Job [{0}]", jobIds.get(0));
238            throw new ServiceException(jpe);
239        }
240    }
241
242    /**
243     * Disable child jobs Sla alert.
244     *
245     * @param parentJobIds the parent job ids
246     * @return true, if successful
247     * @throws ServiceException the service exception
248     */
249    public boolean disableChildJobAlert(List<String> parentJobIds) throws ServiceException {
250        try {
251            return calcImpl.disableChildJobAlert(parentJobIds);
252        }
253        catch (JPAExecutorException jpe) {
254            LOG.error("Exception while updating SLA alerting for Job [{0}]", parentJobIds.get(0));
255            throw new ServiceException(jpe);
256        }
257    }
258
259    /**
260     * Change jobs Sla definitions
261     * It takes list of pairs of jobid and key/value pairs of el evaluated sla definition.
262     * Support definition are sla-should-start, sla-should-end, sla-nominal-time and sla-max-duration.
263     *
264     * @param idSlaDefinitionList the job ids sla pair
265     * @return true, if successful
266     * @throws ServiceException the service exception
267     */
268    public boolean changeDefinition(List<Pair<String, Map<String, String>>> idSlaDefinitionList)
269            throws ServiceException {
270        try {
271            return calcImpl.changeDefinition(idSlaDefinitionList);
272        }
273        catch (JPAExecutorException jpe) {
274            throw new ServiceException(jpe);
275        }
276    }
277}