001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.sla.service; 019 020import java.util.Date; 021 022import org.apache.hadoop.conf.Configuration; 023import org.apache.oozie.ErrorCode; 024import org.apache.oozie.client.event.JobEvent.EventStatus; 025import org.apache.oozie.executor.jpa.JPAExecutorException; 026import org.apache.oozie.service.EventHandlerService; 027import org.apache.oozie.service.SchedulerService; 028import org.apache.oozie.service.Service; 029import org.apache.oozie.service.ServiceException; 030import org.apache.oozie.service.Services; 031import org.apache.oozie.sla.SLACalculator; 032import org.apache.oozie.sla.SLACalculatorMemory; 033import org.apache.oozie.sla.SLARegistrationBean; 034import org.apache.oozie.util.XLog; 035 036import com.google.common.annotations.VisibleForTesting; 037 038public class SLAService implements Service { 039 040 public static final String CONF_PREFIX = "oozie.sla.service.SLAService."; 041 public static final String CONF_CALCULATOR_IMPL = CONF_PREFIX + "calculator.impl"; 042 public static final String CONF_CAPACITY = CONF_PREFIX + "capacity"; 043 public static final String CONF_ALERT_EVENTS = CONF_PREFIX + "alert.events"; 044 public static final String CONF_EVENTS_MODIFIED_AFTER = CONF_PREFIX + "events.modified.after"; 045 public static final String CONF_JOB_EVENT_LATENCY = CONF_PREFIX + "job.event.latency"; 046 //Time interval, in seconds, at which SLA Worker will be scheduled to run 047 public static final String CONF_SLA_CHECK_INTERVAL = CONF_PREFIX + "check.interval"; 048 public static final String CONF_SLA_CHECK_INITIAL_DELAY = CONF_PREFIX + "check.initial.delay"; 049 public static final String CONF_SLA_CALC_LOCK_TIMEOUT = CONF_PREFIX + "oozie.sla.calc.default.lock.timeout"; 050 public static final String CONF_SLA_HISTORY_PURGE_INTERVAL = CONF_PREFIX + "history.purge.interval"; 051 052 private static SLACalculator calcImpl; 053 private static boolean slaEnabled = false; 054 private EventHandlerService eventHandler; 055 public static XLog LOG; 056 @Override 057 public void init(Services services) throws ServiceException { 058 try { 059 Configuration conf = services.getConf(); 060 Class<? extends SLACalculator> calcClazz = (Class<? extends SLACalculator>) conf.getClass( 061 CONF_CALCULATOR_IMPL, null); 062 calcImpl = calcClazz == null ? new SLACalculatorMemory() : (SLACalculator) calcClazz.newInstance(); 063 calcImpl.init(conf); 064 eventHandler = Services.get().get(EventHandlerService.class); 065 if (eventHandler == null) { 066 throw new ServiceException(ErrorCode.E0103, "EventHandlerService", "Add it under config " 067 + Services.CONF_SERVICE_EXT_CLASSES + " or declare it BEFORE SLAService"); 068 } 069 LOG = XLog.getLog(getClass()); 070 java.util.Set<String> appTypes = eventHandler.getAppTypes(); 071 appTypes.add("workflow_action"); 072 eventHandler.setAppTypes(appTypes); 073 074 Runnable slaThread = new SLAWorker(calcImpl); 075 // schedule runnable by default every 30 sec 076 int slaCheckInterval = services.getConf().getInt(CONF_SLA_CHECK_INTERVAL, 30); 077 int slaCheckInitialDelay = services.getConf().getInt(CONF_SLA_CHECK_INITIAL_DELAY, 10); 078 services.get(SchedulerService.class).schedule(slaThread, slaCheckInitialDelay, slaCheckInterval, 079 SchedulerService.Unit.SEC); 080 slaEnabled = true; 081 LOG.info("SLAService initialized with impl [{0}] capacity [{1}]", calcImpl.getClass().getName(), 082 conf.get(SLAService.CONF_CAPACITY)); 083 } 084 catch (Exception ex) { 085 throw new ServiceException(ErrorCode.E0102, ex.getMessage(), ex); 086 } 087 } 088 089 @Override 090 public void destroy() { 091 slaEnabled = false; 092 } 093 094 @Override 095 public Class<? extends Service> getInterface() { 096 return SLAService.class; 097 } 098 099 public static boolean isEnabled() { 100 return slaEnabled; 101 } 102 103 @VisibleForTesting 104 public SLACalculator getSLACalculator() { 105 return calcImpl; 106 } 107 108 @VisibleForTesting 109 public void runSLAWorker() { 110 new SLAWorker(calcImpl).run(); 111 } 112 113 private class SLAWorker implements Runnable { 114 115 SLACalculator calc; 116 117 public SLAWorker(SLACalculator calc) { 118 this.calc = calc; 119 } 120 121 @Override 122 public void run() { 123 if (Thread.currentThread().isInterrupted()) { 124 return; 125 } 126 try { 127 calc.updateAllSlaStatus(); 128 } 129 catch (Throwable error) { 130 XLog.getLog(SLAService.class).debug("Throwable in SLAWorker thread run : ", error); 131 } 132 } 133 } 134 135 public boolean addRegistrationEvent(SLARegistrationBean reg) throws ServiceException { 136 try { 137 if (calcImpl.addRegistration(reg.getId(), reg)) { 138 return true; 139 } 140 else { 141 LOG.warn("SLA queue full. Unable to add new SLA entry for job [{0}]", reg.getId()); 142 } 143 } 144 catch (JPAExecutorException ex) { 145 LOG.warn("Could not add new SLA entry for job [{0}]", reg.getId(), ex); 146 } 147 return false; 148 } 149 150 public boolean updateRegistrationEvent(SLARegistrationBean reg) throws ServiceException { 151 try { 152 if (calcImpl.updateRegistration(reg.getId(), reg)) { 153 return true; 154 } 155 else { 156 LOG.warn("SLA queue full. Unable to update the SLA entry for job [{0}]", reg.getId()); 157 } 158 } 159 catch (JPAExecutorException ex) { 160 LOG.warn("Could not update SLA entry for job [{0}]", reg.getId(), ex); 161 } 162 return false; 163 } 164 165 public boolean addStatusEvent(String jobId, String status, EventStatus eventStatus, Date startTime, Date endTime) 166 throws ServiceException { 167 try { 168 if (calcImpl.addJobStatus(jobId, status, eventStatus, startTime, endTime)) { 169 return true; 170 } 171 } 172 catch (JPAExecutorException jpe) { 173 LOG.error("Exception while adding SLA Status event for Job [{0}]", jobId); 174 } 175 return false; 176 } 177 178 public void removeRegistration(String jobId) { 179 calcImpl.removeRegistration(jobId); 180 } 181 182}