001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.sla.service; 020 021import java.util.Date; 022import java.util.List; 023import java.util.Map; 024 025import org.apache.hadoop.conf.Configuration; 026import org.apache.oozie.ErrorCode; 027import org.apache.oozie.client.event.JobEvent.EventStatus; 028import org.apache.oozie.executor.jpa.JPAExecutorException; 029import org.apache.oozie.service.ConfigurationService; 030import org.apache.oozie.service.EventHandlerService; 031import org.apache.oozie.service.SchedulerService; 032import org.apache.oozie.service.Service; 033import org.apache.oozie.service.ServiceException; 034import org.apache.oozie.service.Services; 035import org.apache.oozie.sla.SLACalculator; 036import org.apache.oozie.sla.SLACalculatorMemory; 037import org.apache.oozie.sla.SLARegistrationBean; 038import org.apache.oozie.util.Pair; 039import org.apache.oozie.util.XLog; 040 041import com.google.common.annotations.VisibleForTesting; 042 043public class SLAService implements Service { 044 045 public static final String CONF_PREFIX = "oozie.sla.service.SLAService."; 046 public static final String CONF_CALCULATOR_IMPL = CONF_PREFIX + "calculator.impl"; 047 public static final String CONF_CAPACITY = CONF_PREFIX + "capacity"; 048 public static final String CONF_ALERT_EVENTS = CONF_PREFIX + "alert.events"; 049 public static final String CONF_EVENTS_MODIFIED_AFTER = CONF_PREFIX + "events.modified.after"; 050 public static final String CONF_JOB_EVENT_LATENCY = CONF_PREFIX + "job.event.latency"; 051 //Time interval, in seconds, at which SLA Worker will be scheduled to run 052 public static final String CONF_SLA_CHECK_INTERVAL = CONF_PREFIX + "check.interval"; 053 public static final String CONF_SLA_CHECK_INITIAL_DELAY = CONF_PREFIX + "check.initial.delay"; 054 public static final String CONF_SLA_CALC_LOCK_TIMEOUT = CONF_PREFIX + "oozie.sla.calc.default.lock.timeout"; 055 public static final String CONF_SLA_HISTORY_PURGE_INTERVAL = CONF_PREFIX + "history.purge.interval"; 056 057 private static SLACalculator calcImpl; 058 private static boolean slaEnabled = false; 059 private EventHandlerService eventHandler; 060 public static XLog LOG; 061 @Override 062 public void init(Services services) throws ServiceException { 063 try { 064 Configuration conf = services.getConf(); 065 Class<? extends SLACalculator> calcClazz = (Class<? extends SLACalculator>) ConfigurationService.getClass( 066 conf, CONF_CALCULATOR_IMPL); 067 calcImpl = calcClazz == null ? new SLACalculatorMemory() : (SLACalculator) calcClazz.newInstance(); 068 calcImpl.init(conf); 069 eventHandler = Services.get().get(EventHandlerService.class); 070 if (eventHandler == null) { 071 throw new ServiceException(ErrorCode.E0103, "EventHandlerService", "Add it under config " 072 + Services.CONF_SERVICE_EXT_CLASSES + " or declare it BEFORE SLAService"); 073 } 074 LOG = XLog.getLog(getClass()); 075 java.util.Set<String> appTypes = eventHandler.getAppTypes(); 076 appTypes.add("workflow_action"); 077 eventHandler.setAppTypes(appTypes); 078 079 Runnable slaThread = new SLAWorker(calcImpl); 080 // schedule runnable by default every 30 sec 081 int slaCheckInterval = ConfigurationService.getInt(conf, CONF_SLA_CHECK_INTERVAL); 082 int slaCheckInitialDelay = ConfigurationService.getInt(conf, CONF_SLA_CHECK_INITIAL_DELAY); 083 services.get(SchedulerService.class).schedule(slaThread, slaCheckInitialDelay, slaCheckInterval, 084 SchedulerService.Unit.SEC); 085 slaEnabled = true; 086 LOG.info("SLAService initialized with impl [{0}] capacity [{1}]", calcImpl.getClass().getName(), 087 conf.get(SLAService.CONF_CAPACITY)); 088 } 089 catch (Exception ex) { 090 throw new ServiceException(ErrorCode.E0102, ex.getMessage(), ex); 091 } 092 } 093 094 @Override 095 public void destroy() { 096 slaEnabled = false; 097 } 098 099 @Override 100 public Class<? extends Service> getInterface() { 101 return SLAService.class; 102 } 103 104 public static boolean isEnabled() { 105 return slaEnabled; 106 } 107 108 @VisibleForTesting 109 public SLACalculator getSLACalculator() { 110 return calcImpl; 111 } 112 113 public void runSLAWorker() { 114 new SLAWorker(calcImpl).run(); 115 } 116 117 private class SLAWorker implements Runnable { 118 119 SLACalculator calc; 120 121 public SLAWorker(SLACalculator calc) { 122 this.calc = calc; 123 } 124 125 @Override 126 public void run() { 127 if (Thread.currentThread().isInterrupted()) { 128 return; 129 } 130 try { 131 calc.updateAllSlaStatus(); 132 } 133 catch (Throwable error) { 134 XLog.getLog(SLAService.class).debug("Throwable in SLAWorker thread run : ", error); 135 } 136 } 137 } 138 139 public boolean addRegistrationEvent(SLARegistrationBean reg) throws ServiceException { 140 try { 141 if (calcImpl.addRegistration(reg.getId(), reg)) { 142 return true; 143 } 144 else { 145 LOG.warn("SLA queue full. Unable to add new SLA entry for job [{0}]", reg.getId()); 146 } 147 } 148 catch (JPAExecutorException ex) { 149 LOG.warn("Could not add new SLA entry for job [{0}]", reg.getId(), ex); 150 } 151 return false; 152 } 153 154 public boolean updateRegistrationEvent(SLARegistrationBean reg) throws ServiceException { 155 try { 156 if (calcImpl.updateRegistration(reg.getId(), reg)) { 157 return true; 158 } 159 else { 160 LOG.warn("SLA queue full. Unable to update the SLA entry for job [{0}]", reg.getId()); 161 } 162 } 163 catch (JPAExecutorException ex) { 164 LOG.warn("Could not update SLA entry for job [{0}]", reg.getId(), ex); 165 } 166 return false; 167 } 168 169 public boolean addStatusEvent(String jobId, String status, EventStatus eventStatus, Date startTime, Date endTime) 170 throws ServiceException { 171 try { 172 if (calcImpl.addJobStatus(jobId, status, eventStatus, startTime, endTime)) { 173 return true; 174 } 175 } 176 catch (JPAExecutorException jpe) { 177 LOG.error("Exception while adding SLA Status event for Job [{0}]", jobId); 178 } 179 return false; 180 } 181 182 public void removeRegistration(String jobId) { 183 calcImpl.removeRegistration(jobId); 184 } 185 186 /** 187 * Enable jobs sla alert. 188 * 189 * @param jobIds the job ids 190 * @return true, if successful 191 * @throws ServiceException the service exception 192 */ 193 public boolean enableAlert(List<String> jobIds) throws ServiceException { 194 try { 195 return calcImpl.enableAlert(jobIds); 196 } 197 catch (JPAExecutorException jpe) { 198 LOG.error("Exception while updating SLA alerting for Job [{0}]", jobIds.get(0)); 199 throw new ServiceException(jpe); 200 } 201 } 202 203 /** 204 * Enable child jobs sla alert. 205 * 206 * @param parentJobIds the parent job ids 207 * @return true, if successful 208 * @throws ServiceException the service exception 209 */ 210 public boolean enableChildJobAlert(List<String> parentJobIds) throws ServiceException { 211 try { 212 return calcImpl.enableChildJobAlert(parentJobIds); 213 } 214 catch (JPAExecutorException jpe) { 215 LOG.error("Exception while updating SLA alerting for Job [{0}]", parentJobIds.get(0)); 216 throw new ServiceException(jpe); 217 } 218 } 219 220 /** 221 * Disable jobs Sla alert. 222 * 223 * @param jobIds the job ids 224 * @return true, if successful 225 * @throws ServiceException the service exception 226 */ 227 public boolean disableAlert(List<String> jobIds) throws ServiceException { 228 try { 229 return calcImpl.disableAlert(jobIds); 230 } 231 catch (JPAExecutorException jpe) { 232 LOG.error("Exception while updating SLA alerting for Job [{0}]", jobIds.get(0)); 233 throw new ServiceException(jpe); 234 } 235 } 236 237 /** 238 * Disable child jobs Sla alert. 239 * 240 * @param parentJobIds the parent job ids 241 * @return true, if successful 242 * @throws ServiceException the service exception 243 */ 244 public boolean disableChildJobAlert(List<String> parentJobIds) throws ServiceException { 245 try { 246 return calcImpl.disableChildJobAlert(parentJobIds); 247 } 248 catch (JPAExecutorException jpe) { 249 LOG.error("Exception while updating SLA alerting for Job [{0}]", parentJobIds.get(0)); 250 throw new ServiceException(jpe); 251 } 252 } 253 254 /** 255 * Change jobs Sla definitions 256 * It takes list of pairs of jobid and key/value pairs of el evaluated sla definition. 257 * Support definition are sla-should-start, sla-should-end, sla-nominal-time and sla-max-duration. 258 * 259 * @param idSlaDefinitionList the job ids sla pair 260 * @return true, if successful 261 * @throws ServiceException the service exception 262 */ 263 public boolean changeDefinition(List<Pair<String, Map<String, String>>> idSlaDefinitionList) 264 throws ServiceException { 265 try { 266 return calcImpl.changeDefinition(idSlaDefinitionList); 267 } 268 catch (JPAExecutorException jpe) { 269 throw new ServiceException(jpe); 270 } 271 } 272}