This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.sla.service;
019
020 import java.util.Date;
021 import org.apache.hadoop.conf.Configuration;
022 import org.apache.oozie.ErrorCode;
023 import org.apache.oozie.client.event.JobEvent.EventStatus;
024 import org.apache.oozie.executor.jpa.JPAExecutorException;
025 import org.apache.oozie.service.EventHandlerService;
026 import org.apache.oozie.service.SchedulerService;
027 import org.apache.oozie.service.Service;
028 import org.apache.oozie.service.ServiceException;
029 import org.apache.oozie.service.Services;
030 import org.apache.oozie.sla.SLACalculator;
031 import org.apache.oozie.sla.SLACalculatorMemory;
032 import org.apache.oozie.sla.SLARegistrationBean;
033 import org.apache.oozie.util.XLog;
034 import com.google.common.annotations.VisibleForTesting;
035
036 public class SLAService implements Service {
037
038 public static final String CONF_PREFIX = "oozie.sla.service.SLAService.";
039 public static final String CONF_CALCULATOR_IMPL = CONF_PREFIX + "calculator.impl";
040 public static final String CONF_CAPACITY = CONF_PREFIX + "capacity";
041 public static final String CONF_ALERT_EVENTS = CONF_PREFIX + "alert.events";
042 public static final String CONF_EVENTS_MODIFIED_AFTER = CONF_PREFIX + "events.modified.after";
043 public static final String CONF_JOB_EVENT_LATENCY = CONF_PREFIX + "job.event.latency";
044
045 private static SLACalculator calcImpl;
046 private static boolean slaEnabled = false;
047 private EventHandlerService eventHandler;
048 public static XLog LOG;
049 @Override
050 public void init(Services services) throws ServiceException {
051 try {
052 Configuration conf = services.getConf();
053 Class<? extends SLACalculator> calcClazz = (Class<? extends SLACalculator>) conf.getClass(
054 CONF_CALCULATOR_IMPL, null);
055 calcImpl = calcClazz == null ? new SLACalculatorMemory() : (SLACalculator) calcClazz.newInstance();
056 calcImpl.init(conf);
057 eventHandler = Services.get().get(EventHandlerService.class);
058 if (eventHandler == null) {
059 throw new ServiceException(ErrorCode.E0103, "EventHandlerService", "Add it under config "
060 + Services.CONF_SERVICE_EXT_CLASSES + " or declare it BEFORE SLAService");
061 }
062 LOG = XLog.getLog(getClass());
063 java.util.Set<String> appTypes = eventHandler.getAppTypes();
064 appTypes.add("workflow_action");
065 eventHandler.setAppTypes(appTypes);
066
067 Runnable slaThread = new SLAWorker(calcImpl);
068 // schedule runnable by default every 30 sec
069 services.get(SchedulerService.class).schedule(slaThread, 10, 30, SchedulerService.Unit.SEC);
070 slaEnabled = true;
071 LOG.info("SLAService initialized with impl [{0}] capacity [{1}]", calcImpl.getClass().getName(),
072 conf.get(SLAService.CONF_CAPACITY));
073 }
074 catch (Exception ex) {
075 throw new ServiceException(ErrorCode.E0102, ex.getMessage(), ex);
076 }
077 }
078
079 @Override
080 public void destroy() {
081 slaEnabled = false;
082 }
083
084 @Override
085 public Class<? extends Service> getInterface() {
086 return SLAService.class;
087 }
088
089 public static boolean isEnabled() {
090 return slaEnabled;
091 }
092
093 public SLACalculator getSLACalculator() {
094 return calcImpl;
095 }
096
097 @VisibleForTesting
098 public void runSLAWorker() {
099 new SLAWorker(calcImpl).run();
100 }
101
102 private class SLAWorker implements Runnable {
103
104 SLACalculator calc;
105
106 public SLAWorker(SLACalculator calc) {
107 this.calc = calc;
108 }
109
110 @Override
111 public void run() {
112 if (Thread.currentThread().isInterrupted()) {
113 return;
114 }
115 try {
116 calc.updateAllSlaStatus();
117 }
118 catch (Throwable error) {
119 XLog.getLog(SLAService.class).debug("Throwable in SLAWorker thread run : ", error);
120 }
121 }
122 }
123
124 public boolean addRegistrationEvent(SLARegistrationBean reg) throws ServiceException {
125 try {
126 if (calcImpl.addRegistration(reg.getId(), reg)) {
127 return true;
128 }
129 else {
130 LOG.warn("SLA queue full. Unable to add new SLA entry for job [{0}]", reg.getId());
131 }
132 }
133 catch (JPAExecutorException ex) {
134 LOG.warn("Could not add new SLA entry for job [{0}]", reg.getId(), ex);
135 }
136 return false;
137 }
138
139 public boolean updateRegistrationEvent(SLARegistrationBean reg) throws ServiceException {
140 try {
141 if (calcImpl.updateRegistration(reg.getId(), reg)) {
142 return true;
143 }
144 else {
145 LOG.warn("SLA queue full. Unable to update the SLA entry for job [{0}]", reg.getId());
146 }
147 }
148 catch (JPAExecutorException ex) {
149 LOG.warn("Could not update SLA entry for job [{0}]", reg.getId(), ex);
150 }
151 return false;
152 }
153
154 public boolean addStatusEvent(String jobId, String status, EventStatus eventStatus, Date startTime, Date endTime)
155 throws ServiceException {
156 try {
157 if (calcImpl.addJobStatus(jobId, status, eventStatus, startTime, endTime)) {
158 return true;
159 }
160 }
161 catch (JPAExecutorException jpe) {
162 LOG.error("Exception while adding SLA Status event for Job [{0}]", jobId);
163 }
164 return false;
165 }
166
167 public void removeRegistration(String jobId) {
168 calcImpl.removeRegistration(jobId);
169 }
170
171 }