001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.sla.service; 019 020 import java.util.Date; 021 import org.apache.hadoop.conf.Configuration; 022 import org.apache.oozie.ErrorCode; 023 import org.apache.oozie.client.event.JobEvent.EventStatus; 024 import org.apache.oozie.executor.jpa.JPAExecutorException; 025 import org.apache.oozie.service.EventHandlerService; 026 import org.apache.oozie.service.SchedulerService; 027 import org.apache.oozie.service.Service; 028 import org.apache.oozie.service.ServiceException; 029 import org.apache.oozie.service.Services; 030 import org.apache.oozie.sla.SLACalculator; 031 import org.apache.oozie.sla.SLACalculatorMemory; 032 import org.apache.oozie.sla.SLARegistrationBean; 033 import org.apache.oozie.util.XLog; 034 import com.google.common.annotations.VisibleForTesting; 035 036 public class SLAService implements Service { 037 038 public static final String CONF_PREFIX = "oozie.sla.service.SLAService."; 039 public static final String CONF_CALCULATOR_IMPL = CONF_PREFIX + "calculator.impl"; 040 public static final String CONF_CAPACITY = CONF_PREFIX + "capacity"; 041 public static final String CONF_ALERT_EVENTS = CONF_PREFIX + "alert.events"; 042 public static final String CONF_EVENTS_MODIFIED_AFTER = CONF_PREFIX + "events.modified.after"; 043 public static final String CONF_JOB_EVENT_LATENCY = CONF_PREFIX + "job.event.latency"; 044 045 private static SLACalculator calcImpl; 046 private static boolean slaEnabled = false; 047 private EventHandlerService eventHandler; 048 public static XLog LOG; 049 @Override 050 public void init(Services services) throws ServiceException { 051 try { 052 Configuration conf = services.getConf(); 053 Class<? extends SLACalculator> calcClazz = (Class<? extends SLACalculator>) conf.getClass( 054 CONF_CALCULATOR_IMPL, null); 055 calcImpl = calcClazz == null ? new SLACalculatorMemory() : (SLACalculator) calcClazz.newInstance(); 056 calcImpl.init(conf); 057 eventHandler = Services.get().get(EventHandlerService.class); 058 if (eventHandler == null) { 059 throw new ServiceException(ErrorCode.E0103, "EventHandlerService", "Add it under config " 060 + Services.CONF_SERVICE_EXT_CLASSES + " or declare it BEFORE SLAService"); 061 } 062 LOG = XLog.getLog(getClass()); 063 java.util.Set<String> appTypes = eventHandler.getAppTypes(); 064 appTypes.add("workflow_action"); 065 eventHandler.setAppTypes(appTypes); 066 067 Runnable slaThread = new SLAWorker(calcImpl); 068 // schedule runnable by default every 30 sec 069 services.get(SchedulerService.class).schedule(slaThread, 10, 30, SchedulerService.Unit.SEC); 070 slaEnabled = true; 071 LOG.info("SLAService initialized with impl [{0}] capacity [{1}]", calcImpl.getClass().getName(), 072 conf.get(SLAService.CONF_CAPACITY)); 073 } 074 catch (Exception ex) { 075 throw new ServiceException(ErrorCode.E0102, ex.getMessage(), ex); 076 } 077 } 078 079 @Override 080 public void destroy() { 081 slaEnabled = false; 082 } 083 084 @Override 085 public Class<? extends Service> getInterface() { 086 return SLAService.class; 087 } 088 089 public static boolean isEnabled() { 090 return slaEnabled; 091 } 092 093 public SLACalculator getSLACalculator() { 094 return calcImpl; 095 } 096 097 @VisibleForTesting 098 public void runSLAWorker() { 099 new SLAWorker(calcImpl).run(); 100 } 101 102 private class SLAWorker implements Runnable { 103 104 SLACalculator calc; 105 106 public SLAWorker(SLACalculator calc) { 107 this.calc = calc; 108 } 109 110 @Override 111 public void run() { 112 if (Thread.currentThread().isInterrupted()) { 113 return; 114 } 115 try { 116 calc.updateAllSlaStatus(); 117 } 118 catch (Throwable error) { 119 XLog.getLog(SLAService.class).debug("Throwable in SLAWorker thread run : ", error); 120 } 121 } 122 } 123 124 public boolean addRegistrationEvent(SLARegistrationBean reg) throws ServiceException { 125 try { 126 if (calcImpl.addRegistration(reg.getId(), reg)) { 127 return true; 128 } 129 else { 130 LOG.warn("SLA queue full. Unable to add new SLA entry for job [{0}]", reg.getId()); 131 } 132 } 133 catch (JPAExecutorException ex) { 134 LOG.warn("Could not add new SLA entry for job [{0}]", reg.getId(), ex); 135 } 136 return false; 137 } 138 139 public boolean updateRegistrationEvent(SLARegistrationBean reg) throws ServiceException { 140 try { 141 if (calcImpl.updateRegistration(reg.getId(), reg)) { 142 return true; 143 } 144 else { 145 LOG.warn("SLA queue full. Unable to update the SLA entry for job [{0}]", reg.getId()); 146 } 147 } 148 catch (JPAExecutorException ex) { 149 LOG.warn("Could not update SLA entry for job [{0}]", reg.getId(), ex); 150 } 151 return false; 152 } 153 154 public boolean addStatusEvent(String jobId, String status, EventStatus eventStatus, Date startTime, Date endTime) 155 throws ServiceException { 156 try { 157 if (calcImpl.addJobStatus(jobId, status, eventStatus, startTime, endTime)) { 158 return true; 159 } 160 } 161 catch (JPAExecutorException jpe) { 162 LOG.error("Exception while adding SLA Status event for Job [{0}]", jobId); 163 } 164 return false; 165 } 166 167 public void removeRegistration(String jobId) { 168 calcImpl.removeRegistration(jobId); 169 } 170 171 }