001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.sla.service;
019    
020    import java.util.Date;
021    import org.apache.hadoop.conf.Configuration;
022    import org.apache.oozie.ErrorCode;
023    import org.apache.oozie.client.event.JobEvent.EventStatus;
024    import org.apache.oozie.executor.jpa.JPAExecutorException;
025    import org.apache.oozie.service.EventHandlerService;
026    import org.apache.oozie.service.SchedulerService;
027    import org.apache.oozie.service.Service;
028    import org.apache.oozie.service.ServiceException;
029    import org.apache.oozie.service.Services;
030    import org.apache.oozie.sla.SLACalculator;
031    import org.apache.oozie.sla.SLACalculatorMemory;
032    import org.apache.oozie.sla.SLARegistrationBean;
033    import org.apache.oozie.util.XLog;
034    import com.google.common.annotations.VisibleForTesting;
035    
036    public class SLAService implements Service {
037    
038        public static final String CONF_PREFIX = "oozie.sla.service.SLAService.";
039        public static final String CONF_CALCULATOR_IMPL = CONF_PREFIX + "calculator.impl";
040        public static final String CONF_CAPACITY = CONF_PREFIX + "capacity";
041        public static final String CONF_ALERT_EVENTS = CONF_PREFIX + "alert.events";
042        public static final String CONF_EVENTS_MODIFIED_AFTER = CONF_PREFIX + "events.modified.after";
043        public static final String CONF_JOB_EVENT_LATENCY = CONF_PREFIX + "job.event.latency";
044    
045        private static SLACalculator calcImpl;
046        private static boolean slaEnabled = false;
047        private EventHandlerService eventHandler;
048        public static XLog LOG;
049        @Override
050        public void init(Services services) throws ServiceException {
051            try {
052                Configuration conf = services.getConf();
053                Class<? extends SLACalculator> calcClazz = (Class<? extends SLACalculator>) conf.getClass(
054                        CONF_CALCULATOR_IMPL, null);
055                calcImpl = calcClazz == null ? new SLACalculatorMemory() : (SLACalculator) calcClazz.newInstance();
056                calcImpl.init(conf);
057                eventHandler = Services.get().get(EventHandlerService.class);
058                if (eventHandler == null) {
059                    throw new ServiceException(ErrorCode.E0103, "EventHandlerService", "Add it under config "
060                            + Services.CONF_SERVICE_EXT_CLASSES + " or declare it BEFORE SLAService");
061                }
062                LOG = XLog.getLog(getClass());
063                java.util.Set<String> appTypes = eventHandler.getAppTypes();
064                appTypes.add("workflow_action");
065                eventHandler.setAppTypes(appTypes);
066    
067                Runnable slaThread = new SLAWorker(calcImpl);
068                // schedule runnable by default every 30 sec
069                services.get(SchedulerService.class).schedule(slaThread, 10, 30, SchedulerService.Unit.SEC);
070                slaEnabled = true;
071                LOG.info("SLAService initialized with impl [{0}] capacity [{1}]", calcImpl.getClass().getName(),
072                        conf.get(SLAService.CONF_CAPACITY));
073            }
074            catch (Exception ex) {
075                throw new ServiceException(ErrorCode.E0102, ex.getMessage(), ex);
076            }
077        }
078    
079        @Override
080        public void destroy() {
081            slaEnabled = false;
082        }
083    
084        @Override
085        public Class<? extends Service> getInterface() {
086            return SLAService.class;
087        }
088    
089        public static boolean isEnabled() {
090            return slaEnabled;
091        }
092    
093        public SLACalculator getSLACalculator() {
094            return calcImpl;
095        }
096    
097        @VisibleForTesting
098        public void runSLAWorker() {
099            new SLAWorker(calcImpl).run();
100        }
101    
102        private class SLAWorker implements Runnable {
103    
104            SLACalculator calc;
105    
106            public SLAWorker(SLACalculator calc) {
107                this.calc = calc;
108            }
109    
110            @Override
111            public void run() {
112                if (Thread.currentThread().isInterrupted()) {
113                    return;
114                }
115                try {
116                    calc.updateAllSlaStatus();
117                }
118                catch (Throwable error) {
119                    XLog.getLog(SLAService.class).debug("Throwable in SLAWorker thread run : ", error);
120                }
121            }
122        }
123    
124        public boolean addRegistrationEvent(SLARegistrationBean reg) throws ServiceException {
125            try {
126                if (calcImpl.addRegistration(reg.getId(), reg)) {
127                    return true;
128                }
129                else {
130                    LOG.warn("SLA queue full. Unable to add new SLA entry for job [{0}]", reg.getId());
131                }
132            }
133            catch (JPAExecutorException ex) {
134                LOG.warn("Could not add new SLA entry for job [{0}]", reg.getId(), ex);
135            }
136            return false;
137        }
138    
139        public boolean updateRegistrationEvent(SLARegistrationBean reg) throws ServiceException {
140            try {
141                if (calcImpl.updateRegistration(reg.getId(), reg)) {
142                    return true;
143                }
144                else {
145                    LOG.warn("SLA queue full. Unable to update the SLA entry for job [{0}]", reg.getId());
146                }
147            }
148            catch (JPAExecutorException ex) {
149                LOG.warn("Could not update SLA entry for job [{0}]", reg.getId(), ex);
150            }
151            return false;
152        }
153    
154        public boolean addStatusEvent(String jobId, String status, EventStatus eventStatus, Date startTime, Date endTime)
155                throws ServiceException {
156            try {
157                if (calcImpl.addJobStatus(jobId, status, eventStatus, startTime, endTime)) {
158                    return true;
159                }
160            }
161            catch (JPAExecutorException jpe) {
162                LOG.error("Exception while adding SLA Status event for Job [{0}]", jobId);
163            }
164            return false;
165        }
166    
167        public void removeRegistration(String jobId) {
168            calcImpl.removeRegistration(jobId);
169        }
170    
171    }