001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.sla.service; 020 021import java.util.Date; 022import java.util.List; 023import java.util.Map; 024 025import org.apache.hadoop.conf.Configuration; 026import org.apache.oozie.ErrorCode; 027import org.apache.oozie.client.event.JobEvent.EventStatus; 028import org.apache.oozie.executor.jpa.JPAExecutorException; 029import org.apache.oozie.service.ConfigurationService; 030import org.apache.oozie.service.EventHandlerService; 031import org.apache.oozie.service.SchedulerService; 032import org.apache.oozie.service.Service; 033import org.apache.oozie.service.ServiceException; 034import org.apache.oozie.service.Services; 035import org.apache.oozie.sla.SLACalculator; 036import org.apache.oozie.sla.SLACalculatorMemory; 037import org.apache.oozie.sla.SLARegistrationBean; 038import org.apache.oozie.util.Pair; 039import org.apache.oozie.util.XLog; 040 041import com.google.common.annotations.VisibleForTesting; 042 043public class SLAService implements Service { 044 045 public static final String CONF_PREFIX = "oozie.sla.service.SLAService."; 046 public static final String CONF_CALCULATOR_IMPL = CONF_PREFIX + "calculator.impl"; 047 public static final String CONF_CAPACITY = CONF_PREFIX + "capacity"; 048 public static final String CONF_ALERT_EVENTS = CONF_PREFIX + "alert.events"; 049 public static final String CONF_EVENTS_MODIFIED_AFTER = CONF_PREFIX + "events.modified.after"; 050 public static final String CONF_JOB_EVENT_LATENCY = CONF_PREFIX + "job.event.latency"; 051 //Time interval, in seconds, at which SLA Worker will be scheduled to run 052 public static final String CONF_SLA_CHECK_INTERVAL = CONF_PREFIX + "check.interval"; 053 public static final String CONF_SLA_CHECK_INITIAL_DELAY = CONF_PREFIX + "check.initial.delay"; 054 public static final String CONF_SLA_CALC_LOCK_TIMEOUT = CONF_PREFIX + "oozie.sla.calc.default.lock.timeout"; 055 public static final String CONF_SLA_HISTORY_PURGE_INTERVAL = CONF_PREFIX + "history.purge.interval"; 056 057 private static SLACalculator calcImpl; 058 private static boolean slaEnabled = false; 059 private EventHandlerService eventHandler; 060 public static XLog LOG; 061 @Override 062 public void init(Services services) throws ServiceException { 063 try { 064 Configuration conf = services.getConf(); 065 Class<? extends SLACalculator> calcClazz = (Class<? extends SLACalculator>) ConfigurationService.getClass( 066 conf, CONF_CALCULATOR_IMPL); 067 calcImpl = calcClazz == null ? new SLACalculatorMemory() : (SLACalculator) calcClazz.newInstance(); 068 calcImpl.init(conf); 069 eventHandler = Services.get().get(EventHandlerService.class); 070 if (eventHandler == null) { 071 throw new ServiceException(ErrorCode.E0103, "EventHandlerService", "Add it under config " 072 + Services.CONF_SERVICE_EXT_CLASSES + " or declare it BEFORE SLAService"); 073 } 074 LOG = XLog.getLog(getClass()); 075 java.util.Set<String> appTypes = eventHandler.getAppTypes(); 076 appTypes.add("workflow_action"); 077 eventHandler.setAppTypes(appTypes); 078 079 Runnable slaThread = new SLAWorker(calcImpl); 080 // schedule runnable by default every 30 sec 081 int slaCheckInterval = ConfigurationService.getInt(conf, CONF_SLA_CHECK_INTERVAL); 082 int slaCheckInitialDelay = ConfigurationService.getInt(conf, CONF_SLA_CHECK_INITIAL_DELAY); 083 services.get(SchedulerService.class).schedule(slaThread, slaCheckInitialDelay, slaCheckInterval, 084 SchedulerService.Unit.SEC); 085 slaEnabled = true; 086 LOG.info("SLAService initialized with impl [{0}] capacity [{1}]", calcImpl.getClass().getName(), 087 conf.get(SLAService.CONF_CAPACITY)); 088 } 089 catch (Exception ex) { 090 throw new ServiceException(ErrorCode.E0102, ex.getMessage(), ex); 091 } 092 } 093 094 @Override 095 public void destroy() { 096 slaEnabled = false; 097 } 098 099 @Override 100 public Class<? extends Service> getInterface() { 101 return SLAService.class; 102 } 103 104 public static boolean isEnabled() { 105 return slaEnabled; 106 } 107 108 @VisibleForTesting 109 public SLACalculator getSLACalculator() { 110 return calcImpl; 111 } 112 113 public void runSLAWorker() { 114 new SLAWorker(calcImpl).run(); 115 } 116 117 @VisibleForTesting 118 public void startSLAWorker() { 119 new Thread(new SLAWorker(calcImpl)).start(); 120 } 121 122 private class SLAWorker implements Runnable { 123 124 SLACalculator calc; 125 126 public SLAWorker(SLACalculator calc) { 127 this.calc = calc; 128 } 129 130 @Override 131 public void run() { 132 if (Thread.currentThread().isInterrupted()) { 133 return; 134 } 135 try { 136 calc.updateAllSlaStatus(); 137 } 138 catch (Throwable error) { 139 XLog.getLog(SLAService.class).debug("Throwable in SLAWorker thread run : ", error); 140 } 141 } 142 } 143 144 public boolean addRegistrationEvent(SLARegistrationBean reg) throws ServiceException { 145 try { 146 if (calcImpl.addRegistration(reg.getId(), reg)) { 147 return true; 148 } 149 else { 150 LOG.warn("SLA queue full. Unable to add new SLA entry for job [{0}]", reg.getId()); 151 } 152 } 153 catch (JPAExecutorException ex) { 154 LOG.warn("Could not add new SLA entry for job [{0}]", reg.getId(), ex); 155 } 156 return false; 157 } 158 159 public boolean updateRegistrationEvent(SLARegistrationBean reg) throws ServiceException { 160 try { 161 if (calcImpl.updateRegistration(reg.getId(), reg)) { 162 return true; 163 } 164 else { 165 LOG.warn("SLA queue full. Unable to update the SLA entry for job [{0}]", reg.getId()); 166 } 167 } 168 catch (JPAExecutorException ex) { 169 LOG.warn("Could not update SLA entry for job [{0}]", reg.getId(), ex); 170 } 171 return false; 172 } 173 174 public boolean addStatusEvent(String jobId, String status, EventStatus eventStatus, Date startTime, Date endTime) 175 throws ServiceException { 176 try { 177 if (calcImpl.addJobStatus(jobId, status, eventStatus, startTime, endTime)) { 178 return true; 179 } 180 } 181 catch (JPAExecutorException jpe) { 182 LOG.error("Exception while adding SLA Status event for Job [{0}]", jobId); 183 } 184 return false; 185 } 186 187 public void removeRegistration(String jobId) { 188 calcImpl.removeRegistration(jobId); 189 } 190 191 /** 192 * Enable jobs sla alert. 193 * 194 * @param jobIds the job ids 195 * @return true, if successful 196 * @throws ServiceException the service exception 197 */ 198 public boolean enableAlert(List<String> jobIds) throws ServiceException { 199 try { 200 return calcImpl.enableAlert(jobIds); 201 } 202 catch (JPAExecutorException jpe) { 203 LOG.error("Exception while updating SLA alerting for Job [{0}]", jobIds.get(0)); 204 throw new ServiceException(jpe); 205 } 206 } 207 208 /** 209 * Enable child jobs sla alert. 210 * 211 * @param parentJobIds the parent job ids 212 * @return true, if successful 213 * @throws ServiceException the service exception 214 */ 215 public boolean enableChildJobAlert(List<String> parentJobIds) throws ServiceException { 216 try { 217 return calcImpl.enableChildJobAlert(parentJobIds); 218 } 219 catch (JPAExecutorException jpe) { 220 LOG.error("Exception while updating SLA alerting for Job [{0}]", parentJobIds.get(0)); 221 throw new ServiceException(jpe); 222 } 223 } 224 225 /** 226 * Disable jobs Sla alert. 227 * 228 * @param jobIds the job ids 229 * @return true, if successful 230 * @throws ServiceException the service exception 231 */ 232 public boolean disableAlert(List<String> jobIds) throws ServiceException { 233 try { 234 return calcImpl.disableAlert(jobIds); 235 } 236 catch (JPAExecutorException jpe) { 237 LOG.error("Exception while updating SLA alerting for Job [{0}]", jobIds.get(0)); 238 throw new ServiceException(jpe); 239 } 240 } 241 242 /** 243 * Disable child jobs Sla alert. 244 * 245 * @param parentJobIds the parent job ids 246 * @return true, if successful 247 * @throws ServiceException the service exception 248 */ 249 public boolean disableChildJobAlert(List<String> parentJobIds) throws ServiceException { 250 try { 251 return calcImpl.disableChildJobAlert(parentJobIds); 252 } 253 catch (JPAExecutorException jpe) { 254 LOG.error("Exception while updating SLA alerting for Job [{0}]", parentJobIds.get(0)); 255 throw new ServiceException(jpe); 256 } 257 } 258 259 /** 260 * Change jobs Sla definitions 261 * It takes list of pairs of jobid and key/value pairs of el evaluated sla definition. 262 * Support definition are sla-should-start, sla-should-end, sla-nominal-time and sla-max-duration. 263 * 264 * @param idSlaDefinitionList the job ids sla pair 265 * @return true, if successful 266 * @throws ServiceException the service exception 267 */ 268 public boolean changeDefinition(List<Pair<String, Map<String, String>>> idSlaDefinitionList) 269 throws ServiceException { 270 try { 271 return calcImpl.changeDefinition(idSlaDefinitionList); 272 } 273 catch (JPAExecutorException jpe) { 274 throw new ServiceException(jpe); 275 } 276 } 277}