This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019
020package org.apache.oozie.util;
021
022import com.codahale.metrics.Counter;
023import com.codahale.metrics.ExponentiallyDecayingReservoir;
024import com.codahale.metrics.Gauge;
025import com.codahale.metrics.Histogram;
026import com.codahale.metrics.MetricRegistry;
027import com.codahale.metrics.json.MetricsModule;
028import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
029import com.fasterxml.jackson.core.JsonProcessingException;
030import com.fasterxml.jackson.databind.ObjectMapper;
031import com.google.common.annotations.VisibleForTesting;
032import com.google.common.cache.CacheBuilder;
033import com.google.common.cache.CacheLoader;
034import com.google.common.cache.LoadingCache;
035
036import java.io.IOException;
037import java.io.OutputStream;
038import java.util.Map;
039import java.util.concurrent.ConcurrentHashMap;
040import java.util.concurrent.ExecutionException;
041import java.util.concurrent.ScheduledExecutorService;
042import java.util.concurrent.TimeUnit;
043import java.util.concurrent.locks.Lock;
044import java.util.concurrent.locks.ReentrantLock;
045
046/**
047 * Instrumentation framework that is mostly compatible with {@link Instrumentation} but is backed by Codahale Metrics.  This class
048 * was designed to minimize the changes required to switch from {@link Instrumentation} to {@link MetricsInstrumentation} by keeping
049 * the same API.  However, certain operations are obviously implemented differently or are no longer needed; and the output format
050 * is a little different.  Internally, this class maps Cron to {@link com.codahale.metrics.Timer}, Variable to {@link Gauge},
051 * counter to {@link Counter}, and Sampler to {@link Histogram}.
052 */
053@SuppressWarnings("unchecked")
054public class MetricsInstrumentation extends Instrumentation {
055
056    private final MetricRegistry metricRegistry;
057    private transient ObjectMapper jsonMapper;
058    private ScheduledExecutorService scheduler;
059    private final LoadingCache<String, Counter> counters;
060    private final Map<String, Gauge> gauges;
061    private final LoadingCache<String, com.codahale.metrics.Timer> timers;
062    private final Map<String, Histogram> histograms;
063    private Lock timersLock;
064    private Lock gaugesLock;
065    private Lock countersLock;
066    private Lock histogramsLock;
067
068    private static final TimeUnit RATE_UNIT = TimeUnit.MILLISECONDS;
069    private static final TimeUnit DURATION_UNIT = TimeUnit.MILLISECONDS;
070
071    /**
072     * Creates the MetricsInstrumentation and starts taking some metrics.
073     */
074    public MetricsInstrumentation() {
075        metricRegistry = new MetricRegistry();
076
077        timersLock = new ReentrantLock();
078        gaugesLock = new ReentrantLock();
079        countersLock = new ReentrantLock();
080        histogramsLock = new ReentrantLock();
081
082        // Used for writing the json for the metrics (see com.codahale.metrics.servlets.MetricsServlet)
083        // The "false" is to prevent it from printing out all of the values used in the histograms and timers
084        this.jsonMapper = new ObjectMapper().registerModule(new MetricsModule(RATE_UNIT, DURATION_UNIT, false));
085
086        // Register the JVM memory gauges and prefix the keys
087        MemoryUsageGaugeSet memorySet = new MemoryUsageGaugeSet();
088        for (String key : memorySet.getMetrics().keySet()) {
089            metricRegistry.register(MetricRegistry.name("jvm", "memory", key), memorySet.getMetrics().get(key));
090        }
091
092        // By setting this up as a cache, if a counter doesn't exist when we try to retrieve it, it will automatically be created
093        counters = CacheBuilder.newBuilder().build(
094                new CacheLoader<String, Counter>() {
095                    @Override
096                    public Counter load(String key) throws Exception {
097                        Counter counter = new Counter();
098                        metricRegistry.register(key, counter);
099                        return counter;
100                    }
101                }
102        );
103        timers = CacheBuilder.newBuilder().build(
104                new CacheLoader<String, com.codahale.metrics.Timer>() {
105                    @Override
106                    public com.codahale.metrics.Timer load(String key) throws Exception {
107                        com.codahale.metrics.Timer timer
108                                = new com.codahale.metrics.Timer(new ExponentiallyDecayingReservoir());
109                        metricRegistry.register(key, timer);
110                        return timer;
111                    }
112                }
113        );
114        gauges = new ConcurrentHashMap<String, Gauge>();
115        histograms = new ConcurrentHashMap<String, Histogram>();
116    }
117
118    /**
119     * Add a cron to an instrumentation timer. The timer is created if it does not exists. <p/>
120     * Internally, this is backed by a {@link com.codahale.metrics.Timer}.
121     *
122     * @param group timer group.
123     * @param name timer name.
124     * @param cron cron to add to the timer.
125     */
126    @Override
127    public void addCron(String group, String name, Cron cron) {
128        String key = MetricRegistry.name(group, name, "timer");
129        try {
130            timersLock.lock();
131            com.codahale.metrics.Timer timer = timers.get(key);
132            timer.update(cron.getOwn(), TimeUnit.MILLISECONDS);
133        } catch(ExecutionException ee) {
134            throw new RuntimeException(ee);
135        } finally {
136            timersLock.unlock();
137        }
138    }
139
140    /**
141     * Add an instrumentation variable. <p/>
142     * Internally, this is backed by a {@link Gauge}.
143     *
144     * @param group counter group.
145     * @param name counter name.
146     * @param variable variable to add.
147     */
148    @Override
149    public void addVariable(String group, String name, final Variable variable) {
150        Gauge gauge = new Gauge() {
151            @Override
152            public Object getValue() {
153                return variable.getValue();
154            }
155        };
156        String key = MetricRegistry.name(group, name);
157
158        try {
159            gaugesLock.lock();
160            gauges.put(key, gauge);
161            // Metrics throws an Exception if we don't do this when the key already exists
162            if (metricRegistry.getGauges().containsKey(key)) {
163                XLog.getLog(MetricsInstrumentation.class).debug("A Variable with name [" + key + "] already exists. "
164                        + " The old Variable will be overwritten, but this is not recommended");
165                metricRegistry.remove(key);
166            }
167            metricRegistry.register(key, gauge);
168        } finally {
169            gaugesLock.unlock();
170        }
171    }
172
173   /**
174     * Increment an instrumentation counter. The counter is created if it does not exists. <p/>
175     * Internally, this is backed by a {@link Counter}.
176     *
177     * @param group counter group.
178     * @param name counter name.
179     * @param count increment to add to the counter.
180     */
181    @Override
182    public void incr(String group, String name, long count) {
183        String key = MetricRegistry.name(group, name);
184        try {
185            countersLock.lock();
186            counters.get(key).inc(count);
187        } catch(ExecutionException ee) {
188            throw new RuntimeException(ee);
189        } finally {
190            countersLock.unlock();
191        }
192    }
193
194    /**
195     * Add a sampling variable. <p/>
196     * Internally, this is backed by a biased (decaying) {@link Histogram}.
197     *
198     * @param group timer group.
199     * @param name timer name.
200     * @param period (ignored)
201     * @param interval sampling frequency, how often the variable is probed.
202     * @param variable variable to sample.
203     */
204    @Override
205    public void addSampler(String group, String name, int period, int interval, Variable<Long> variable) {
206        if (scheduler == null) {
207            throw new IllegalStateException("scheduler not set, cannot sample");
208        }
209        Histogram histogram = new Histogram(new ExponentiallyDecayingReservoir());
210        Sampler sampler = new Sampler(variable, histogram);
211        scheduler.scheduleAtFixedRate(sampler, 0, interval, TimeUnit.SECONDS);
212        String key = MetricRegistry.name(group, name, "histogram");
213        try {
214            histogramsLock.lock();
215            histograms.put(key, histogram);
216            // Metrics throws an Exception if we don't do this when the key already exists
217            if (metricRegistry.getHistograms().containsKey(key)) {
218                XLog.getLog(MetricsInstrumentation.class).debug("A Sampler with name [" + key + "] already exists. "
219                        + " The old Sampler will be overwritten, but this is not recommended");
220                metricRegistry.remove(key);
221            }
222            metricRegistry.register(key, histogram);
223        } finally {
224            histogramsLock.unlock();
225        }
226    }
227
228    public static class Sampler implements Runnable {
229        private final Variable<Long> variable;
230        private final Histogram histogram;
231        public Sampler(Variable<Long> variable, Histogram histogram) {
232            this.variable = variable;
233            this.histogram = histogram;
234        }
235
236        @Override
237        public void run() {
238            histogram.update(variable.getValue());
239        }
240    }
241
242    /**
243     * Set the scheduler instance to handle the samplers.
244     *
245     * @param scheduler scheduler instance.
246     */
247    @Override
248    public void setScheduler(ScheduledExecutorService scheduler) {
249        this.scheduler = scheduler;
250    }
251
252    /**
253     * Return the string representation of the instrumentation.  It does a JSON pretty-print.
254     *
255     * @return the string representation of the instrumentation.
256     */
257    @Override
258    public String toString() {
259        try {
260            return jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(metricRegistry);
261        } catch (JsonProcessingException jpe) {
262            throw new RuntimeException(jpe);
263        }
264    }
265
266    /**
267     * Converts the current state of the metrics and writes them to the OutputStream.
268     *
269     * @param os The OutputStream to write the metrics to
270     * @throws IOException
271     */
272    public void writeJSONResponse(OutputStream os) throws IOException {
273        jsonMapper.writer().writeValue(os, metricRegistry);
274    }
275
276    /**
277     * Returns the MetricRegistry: public for unit tests -- do not use.
278     *
279     * @return the MetricRegistry
280     */
281    @VisibleForTesting
282    MetricRegistry getMetricRegistry() {
283        return metricRegistry;
284    }
285
286    /**
287     * Not Supported: throws {@link UnsupportedOperationException}
288     *
289     * @return nothing
290     */
291    @Override
292    public Map<String, Map<String, Map<String, Object>>> getAll() {
293        throw new UnsupportedOperationException();
294    }
295
296    /**
297     * Not Supported: throws {@link UnsupportedOperationException}
298     *
299     * @return nothing
300     */
301    @Override
302    public Map<String, Map<String, Element<Long>>> getCounters() {
303        throw new UnsupportedOperationException();
304    }
305
306    /**
307     * Not Supported: throws {@link UnsupportedOperationException}
308     *
309     * @return nothing
310     */
311    @Override
312    public Map<String, Map<String, Element<Double>>> getSamplers() {
313        throw new UnsupportedOperationException();
314    }
315
316    /**
317     * Not Supported: throws {@link UnsupportedOperationException}
318     *
319     * @return nothing
320     */
321    @Override
322    public Map<String, Map<String, Element<Timer>>> getTimers() {
323        throw new UnsupportedOperationException();
324    }
325
326    /**
327     * Not Supported: throws {@link UnsupportedOperationException}
328     *
329     * @return nothing
330     */
331    @Override
332    public Map<String, Map<String, Element<Variable>>> getVariables() {
333        throw new UnsupportedOperationException();
334    }
335}