001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.util;
020
021import com.codahale.metrics.Counter;
022import com.codahale.metrics.ExponentiallyDecayingReservoir;
023import com.codahale.metrics.Gauge;
024import com.codahale.metrics.Histogram;
025import com.codahale.metrics.MetricRegistry;
026import com.codahale.metrics.json.MetricsModule;
027import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
028import com.fasterxml.jackson.core.JsonProcessingException;
029import com.fasterxml.jackson.databind.ObjectMapper;
030import com.google.common.annotations.VisibleForTesting;
031import com.google.common.cache.CacheBuilder;
032import com.google.common.cache.CacheLoader;
033import com.google.common.cache.LoadingCache;
034
035import java.io.IOException;
036import java.io.OutputStream;
037import java.util.Map;
038import java.util.concurrent.ConcurrentHashMap;
039import java.util.concurrent.ExecutionException;
040import java.util.concurrent.ScheduledExecutorService;
041import java.util.concurrent.TimeUnit;
042import java.util.concurrent.locks.Lock;
043import java.util.concurrent.locks.ReentrantLock;
044
045/**
046 * Instrumentation framework that is mostly compatible with {@link Instrumentation} but is backed by Codahale Metrics.  This class
047 * was designed to minimize the changes required to switch from {@link Instrumentation} to {@link MetricsInstrumentation} by keeping
048 * the same API.  However, certain operations are obviously implemented differently or are no longer needed; and the output format
049 * is a little different.  Internally, this class maps Cron to {@link com.codahale.metrics.Timer}, Variable to {@link Gauge},
050 * counter to {@link Counter}, and Sampler to {@link Histogram}.
051 */
052@SuppressWarnings("unchecked")
053public class MetricsInstrumentation extends Instrumentation {
054
055    private final MetricRegistry metricRegistry;
056    private transient ObjectMapper jsonMapper;
057    private ScheduledExecutorService scheduler;
058    private final LoadingCache<String, Counter> counters;
059    private final Map<String, Gauge> gauges;
060    private final LoadingCache<String, com.codahale.metrics.Timer> timers;
061    private final Map<String, Histogram> histograms;
062    private Lock timersLock;
063    private Lock gaugesLock;
064    private Lock countersLock;
065    private Lock histogramsLock;
066
067    private static final TimeUnit RATE_UNIT = TimeUnit.MILLISECONDS;
068    private static final TimeUnit DURATION_UNIT = TimeUnit.MILLISECONDS;
069
070    /**
071     * Creates the MetricsInstrumentation and starts taking some metrics.
072     */
073    public MetricsInstrumentation() {
074        metricRegistry = new MetricRegistry();
075
076        timersLock = new ReentrantLock();
077        gaugesLock = new ReentrantLock();
078        countersLock = new ReentrantLock();
079        histogramsLock = new ReentrantLock();
080
081        // Used for writing the json for the metrics (see com.codahale.metrics.servlets.MetricsServlet)
082        // The "false" is to prevent it from printing out all of the values used in the histograms and timers
083        this.jsonMapper = new ObjectMapper().registerModule(new MetricsModule(RATE_UNIT, DURATION_UNIT, false));
084
085        // Register the JVM memory gauges and prefix the keys
086        MemoryUsageGaugeSet memorySet = new MemoryUsageGaugeSet();
087        for (String key : memorySet.getMetrics().keySet()) {
088            metricRegistry.register(MetricRegistry.name("jvm", "memory", key), memorySet.getMetrics().get(key));
089        }
090
091        // By setting this up as a cache, if a counter doesn't exist when we try to retrieve it, it will automatically be created
092        counters = CacheBuilder.newBuilder().build(
093                new CacheLoader<String, Counter>() {
094                    @Override
095                    public Counter load(String key) throws Exception {
096                        Counter counter = new Counter();
097                        metricRegistry.register(key, counter);
098                        return counter;
099                    }
100                }
101        );
102        timers = CacheBuilder.newBuilder().build(
103                new CacheLoader<String, com.codahale.metrics.Timer>() {
104                    @Override
105                    public com.codahale.metrics.Timer load(String key) throws Exception {
106                        com.codahale.metrics.Timer timer
107                                = new com.codahale.metrics.Timer(new ExponentiallyDecayingReservoir());
108                        metricRegistry.register(key, timer);
109                        return timer;
110                    }
111                }
112        );
113        gauges = new ConcurrentHashMap<String, Gauge>();
114        histograms = new ConcurrentHashMap<String, Histogram>();
115    }
116
117    /**
118     * Add a cron to an instrumentation timer. The timer is created if it does not exists. <p/>
119     * Internally, this is backed by a {@link com.codahale.metrics.Timer}.
120     *
121     * @param group timer group.
122     * @param name timer name.
123     * @param cron cron to add to the timer.
124     */
125    @Override
126    public void addCron(String group, String name, Cron cron) {
127        String key = MetricRegistry.name(group, name, "timer");
128        try {
129            timersLock.lock();
130            com.codahale.metrics.Timer timer = timers.get(key);
131            timer.update(cron.getOwn(), TimeUnit.MILLISECONDS);
132        } catch(ExecutionException ee) {
133            throw new RuntimeException(ee);
134        } finally {
135            timersLock.unlock();
136        }
137    }
138
139    /**
140     * Add an instrumentation variable. <p/>
141     * Internally, this is backed by a {@link Gauge}.
142     *
143     * @param group counter group.
144     * @param name counter name.
145     * @param variable variable to add.
146     */
147    @Override
148    public void addVariable(String group, String name, final Variable variable) {
149        Gauge gauge = new Gauge() {
150            @Override
151            public Object getValue() {
152                return variable.getValue();
153            }
154        };
155        String key = MetricRegistry.name(group, name);
156
157        try {
158            gaugesLock.lock();
159            gauges.put(key, gauge);
160            // Metrics throws an Exception if we don't do this when the key already exists
161            if (metricRegistry.getGauges().containsKey(key)) {
162                XLog.getLog(MetricsInstrumentation.class).debug("A Variable with name [" + key + "] already exists. "
163                        + " The old Variable will be overwritten, but this is not recommended");
164                metricRegistry.remove(key);
165            }
166            metricRegistry.register(key, gauge);
167        } finally {
168            gaugesLock.unlock();
169        }
170    }
171
172   /**
173     * Increment an instrumentation counter. The counter is created if it does not exists. <p/>
174     * Internally, this is backed by a {@link Counter}.
175     *
176     * @param group counter group.
177     * @param name counter name.
178     * @param count increment to add to the counter.
179     */
180    @Override
181    public void incr(String group, String name, long count) {
182        String key = MetricRegistry.name(group, name);
183        try {
184            countersLock.lock();
185            counters.get(key).inc(count);
186        } catch(ExecutionException ee) {
187            throw new RuntimeException(ee);
188        } finally {
189            countersLock.unlock();
190        }
191    }
192
193    /**
194     * Add a sampling variable. <p/>
195     * Internally, this is backed by a biased (decaying) {@link Histogram}.
196     *
197     * @param group timer group.
198     * @param name timer name.
199     * @param period (ignored)
200     * @param interval sampling frequency, how often the variable is probed.
201     * @param variable variable to sample.
202     */
203    @Override
204    public void addSampler(String group, String name, int period, int interval, Variable<Long> variable) {
205        if (scheduler == null) {
206            throw new IllegalStateException("scheduler not set, cannot sample");
207        }
208        Histogram histogram = new Histogram(new ExponentiallyDecayingReservoir());
209        Sampler sampler = new Sampler(variable, histogram);
210        scheduler.scheduleAtFixedRate(sampler, 0, interval, TimeUnit.SECONDS);
211        String key = MetricRegistry.name(group, name, "histogram");
212        try {
213            histogramsLock.lock();
214            histograms.put(key, histogram);
215            // Metrics throws an Exception if we don't do this when the key already exists
216            if (metricRegistry.getHistograms().containsKey(key)) {
217                XLog.getLog(MetricsInstrumentation.class).debug("A Sampler with name [" + key + "] already exists. "
218                        + " The old Sampler will be overwritten, but this is not recommended");
219                metricRegistry.remove(key);
220            }
221            metricRegistry.register(key, histogram);
222        } finally {
223            histogramsLock.unlock();
224        }
225    }
226
227    public static class Sampler implements Runnable {
228        private final Variable<Long> variable;
229        private final Histogram histogram;
230        public Sampler(Variable<Long> variable, Histogram histogram) {
231            this.variable = variable;
232            this.histogram = histogram;
233        }
234
235        @Override
236        public void run() {
237            histogram.update(variable.getValue());
238        }
239    }
240
241    /**
242     * Set the scheduler instance to handle the samplers.
243     *
244     * @param scheduler scheduler instance.
245     */
246    @Override
247    public void setScheduler(ScheduledExecutorService scheduler) {
248        this.scheduler = scheduler;
249    }
250
251    /**
252     * Return the string representation of the instrumentation.  It does a JSON pretty-print.
253     *
254     * @return the string representation of the instrumentation.
255     */
256    @Override
257    public String toString() {
258        try {
259            return jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(metricRegistry);
260        } catch (JsonProcessingException jpe) {
261            throw new RuntimeException(jpe);
262        }
263    }
264
265    /**
266     * Converts the current state of the metrics and writes them to the OutputStream.
267     *
268     * @param os The OutputStream to write the metrics to
269     * @throws IOException
270     */
271    public void writeJSONResponse(OutputStream os) throws IOException {
272        jsonMapper.writer().writeValue(os, metricRegistry);
273    }
274
275    /**
276     * Returns the MetricRegistry: public for unit tests -- do not use.
277     *
278     * @return the MetricRegistry
279     */
280    @VisibleForTesting
281    MetricRegistry getMetricRegistry() {
282        return metricRegistry;
283    }
284
285    /**
286     * Not Supported: throws {@link UnsupportedOperationException}
287     *
288     * @return nothing
289     */
290    @Override
291    public Map<String, Map<String, Map<String, Object>>> getAll() {
292        throw new UnsupportedOperationException();
293    }
294
295    /**
296     * Not Supported: throws {@link UnsupportedOperationException}
297     *
298     * @return nothing
299     */
300    @Override
301    public Map<String, Map<String, Element<Long>>> getCounters() {
302        throw new UnsupportedOperationException();
303    }
304
305    /**
306     * Not Supported: throws {@link UnsupportedOperationException}
307     *
308     * @return nothing
309     */
310    @Override
311    public Map<String, Map<String, Element<Double>>> getSamplers() {
312        throw new UnsupportedOperationException();
313    }
314
315    /**
316     * Not Supported: throws {@link UnsupportedOperationException}
317     *
318     * @return nothing
319     */
320    @Override
321    public Map<String, Map<String, Element<Timer>>> getTimers() {
322        throw new UnsupportedOperationException();
323    }
324
325    /**
326     * Not Supported: throws {@link UnsupportedOperationException}
327     *
328     * @return nothing
329     */
330    @Override
331    public Map<String, Map<String, Element<Variable>>> getVariables() {
332        throw new UnsupportedOperationException();
333    }
334}