020package org.apache.oozie.util;
022import com.codahale.metrics.Counter;
023import com.codahale.metrics.ExponentiallyDecayingReservoir;
024import com.codahale.metrics.Gauge;
025import com.codahale.metrics.Histogram;
026import com.codahale.metrics.JmxReporter;
027import com.codahale.metrics.MetricFilter;
028import com.codahale.metrics.MetricRegistry;
029import com.codahale.metrics.ganglia.GangliaReporter;
030import com.codahale.metrics.graphite.Graphite;
031import com.codahale.metrics.graphite.GraphiteReporter;
032import com.codahale.metrics.json.MetricsModule;
033import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
034import com.fasterxml.jackson.core.JsonProcessingException;
035import com.fasterxml.jackson.databind.ObjectMapper;
040import info.ganglia.gmetric4j.gmetric.GMetric;
041import org.apache.oozie.service.ConfigurationService;
048import java.util.Map;
049import java.util.concurrent.ConcurrentHashMap;
050import java.util.concurrent.ExecutionException;
051import java.util.concurrent.ScheduledExecutorService;
052import java.util.concurrent.TimeUnit;
053import java.util.concurrent.locks.Lock;
054import java.util.concurrent.locks.ReentrantLock;
057 * Instrumentation framework that is mostly compatible with {@link Instrumentation} but is backed by Codahale Metrics.  This class
058 * was designed to minimize the changes required to switch from {@link Instrumentation} to {@link MetricsInstrumentation} by keeping
059 * the same API.  However, certain operations are obviously implemented differently or are no longer needed; and the output format
060 * is a little different.  Internally, this class maps Cron to {@link com.codahale.metrics.Timer}, Variable to {@link Gauge},
061 * counter to {@link Counter}, and Sampler to {@link Histogram}.
062 */
064public class MetricsInstrumentation extends Instrumentation {
066    private final MetricRegistry metricRegistry;
067    private transient ObjectMapper jsonMapper;
068    private ScheduledExecutorService scheduler;
069    private final LoadingCache<String, Counter> counters;
070    private final Map<String, Gauge> gauges;
071    private final LoadingCache<String, com.codahale.metrics.Timer> timers;
072    private final Map<String, Histogram> histograms;
073    private Lock timersLock;
074    private Lock gaugesLock;
075    private Lock countersLock;
076    private Lock histogramsLock;
078    public static final String EXTERNAL_MONITORING_ENABLE = "oozie.external_monitoring.enable";
079    public static final String EXTERNAL_MONITORING_TYPE = "oozie.external_monitoring.type";
080    public static final String EXTERNAL_MONITORING_ADDRESS = "oozie.external_monitoring.address";
081    public static final String EXTERNAL_MONITORING_PREFIX = "oozie.external_monitoring.metricPrefix";
082    public static final String EXTERNAL_MONITORING_INTERVAL = "oozie.external_monitoring.reporterIntervalSecs";
083    public static final String JMX_MONITORING_ENABLE = "oozie.jmx_monitoring.enable";
084    public static final String GRAPHITE="graphite";
085    public static final String GANGLIA="ganglia";
086    private String metricsAddress;
087    private String metricsHost;
088    private String metricsPrefix;
089    private String metricsServerName;
090    private int metricsPort;
091    private GraphiteReporter graphiteReporter = null;
092    private GangliaReporter gangliaReporter = null;
093    private JmxReporter jmxReporter = null;
094    private long metricsReportIntervalSec;
095    private boolean isExternalMonitoringEnabled;
096    private boolean isJMXMonitoringEnabled;
098    private static final TimeUnit RATE_UNIT = TimeUnit.MILLISECONDS;
099    private static final TimeUnit DURATION_UNIT = TimeUnit.MILLISECONDS;
101    protected XLog LOG = XLog.getLog(getClass());
103    /**
104     * Creates the MetricsInstrumentation and starts taking some metrics.
105     */
106    public MetricsInstrumentation() {
107        metricRegistry = new MetricRegistry();
109        isExternalMonitoringEnabled = ConfigurationService.getBoolean(EXTERNAL_MONITORING_ENABLE);
110        if(isExternalMonitoringEnabled) {
111            metricsServerName = ConfigurationService.get(EXTERNAL_MONITORING_TYPE);
112            if (metricsServerName != null) {
113                String modifiedServerName = metricsServerName.trim().toLowerCase();
114                if (modifiedServerName.equals(GRAPHITE) || modifiedServerName.equals(GANGLIA)) {
115                    metricsAddress = ConfigurationService.get(EXTERNAL_MONITORING_ADDRESS);
116                    metricsPrefix = ConfigurationService.get(EXTERNAL_MONITORING_PREFIX);
117                    metricsReportIntervalSec = ConfigurationService.getLong(EXTERNAL_MONITORING_INTERVAL);
118                    LOG.debug("Publishing external monitoring to [{0}]  at host [{1}] every [{2}] seconds with prefix " +
119                            "[{3}]", metricsServerName, metricsAddress, metricsReportIntervalSec, metricsPrefix);
121                    try {
122                        URL url = new URL(metricsAddress);
123                        metricsHost = url.getHost();
124                        metricsPort = url.getPort();
125                    } catch (MalformedURLException e) {
126                        LOG.error("Exception, ", e);
127                    }
129                    if (modifiedServerName.equals(GRAPHITE)) {
130                        Graphite graphite = new Graphite(new InetSocketAddress(metricsHost, metricsPort));
131                        graphiteReporter = GraphiteReporter.forRegistry(metricRegistry).prefixedWith(metricsPrefix)
132                                .convertDurationsTo(TimeUnit.SECONDS).filter(MetricFilter.ALL).build(graphite);
133                        graphiteReporter.start(metricsReportIntervalSec, TimeUnit.SECONDS);
134                    }
136                    if (modifiedServerName.equals(GANGLIA)) {
137                        GMetric ganglia;
138                        try {
139                            ganglia = new GMetric(metricsHost, metricsPort, GMetric.UDPAddressingMode.MULTICAST, 1);
140                        } catch (IOException e) {
141                            LOG.error("Exception, ", e);
142                            throw new RuntimeException(e);
143                        }
144                        gangliaReporter = GangliaReporter.forRegistry(metricRegistry).prefixedWith(metricsPrefix)
145                                .convertRatesTo(TimeUnit.SECONDS)
146                                .convertDurationsTo(TimeUnit.MILLISECONDS)
147                                .build(ganglia);
148                        gangliaReporter.start(metricsReportIntervalSec, TimeUnit.SECONDS);
149                    }
150                } else {
151                    throw new RuntimeException("Metrics Server Name should be either graphite or ganglia");
152                }
153            }
154            else {
155                throw new RuntimeException("Metrics Server Name is not specified");
156            }
157        }
159        timersLock = new ReentrantLock();
160        gaugesLock = new ReentrantLock();
161        countersLock = new ReentrantLock();
162        histogramsLock = new ReentrantLock();
164        // Used for writing the json for the metrics (see com.codahale.metrics.servlets.MetricsServlet)
165        // The "false" is to prevent it from printing out all of the values used in the histograms and timers
166        this.jsonMapper = new ObjectMapper().registerModule(new MetricsModule(RATE_UNIT, DURATION_UNIT, false));
168        // Register the JVM memory gauges and prefix the keys
169        MemoryUsageGaugeSet memorySet = new MemoryUsageGaugeSet();
170        for (String key : memorySet.getMetrics().keySet()) {
171            metricRegistry.register("jvm", "memory", key), memorySet.getMetrics().get(key));
172        }
174        // By setting this up as a cache, if a counter doesn't exist when we try to retrieve it, it will automatically be created
175        counters = CacheBuilder.newBuilder().build(
176                new CacheLoader<String, Counter>() {
177                    @Override
178                    public Counter load(String key) throws Exception {
179                        Counter counter = new Counter();
180                        metricRegistry.register(key, counter);
181                        return counter;
182                    }
183                }
184        );
185        timers = CacheBuilder.newBuilder().build(
186                new CacheLoader<String, com.codahale.metrics.Timer>() {
187                    @Override
188                    public com.codahale.metrics.Timer load(String key) throws Exception {
189                        com.codahale.metrics.Timer timer
190                                = new com.codahale.metrics.Timer(new ExponentiallyDecayingReservoir());
191                        metricRegistry.register(key, timer);
192                        return timer;
193                    }
194                }
195        );
196        gauges = new ConcurrentHashMap<String, Gauge>();
197        histograms = new ConcurrentHashMap<String, Histogram>();
198        isJMXMonitoringEnabled = ConfigurationService.getBoolean(JMX_MONITORING_ENABLE);
199        if (isJMXMonitoringEnabled) {
200            jmxReporter  = JmxReporter.forRegistry(metricRegistry).build();
201            jmxReporter.start();
202        }
203    }
205    /**
206     * Reporting final metrics into the server before stopping
207     */
208    @Override
209    public void stop() {
210        if (graphiteReporter != null) {
211            try {
212                // reporting final metrics into graphite before stopping
213      ;
214            } finally {
215                graphiteReporter.stop();
216            }
217        }
218        if (gangliaReporter != null) {
219            try {
220                // reporting final metrics into ganglia before stopping
221      ;
222            } finally {
223                gangliaReporter.stop();
224            }
225        }
227        if (jmxReporter != null) {
228            jmxReporter.stop();
229        }
230    }
232    /**
233     * Add a cron to an instrumentation timer. The timer is created if it does not exists. <p>
234     * Internally, this is backed by a {@link com.codahale.metrics.Timer}.
235     *
236     * @param group timer group.
237     * @param name timer name.
238     * @param cron cron to add to the timer.
239     */
240    @Override
241    public void addCron(String group, String name, Cron cron) {
242        String key =, name, "timer");
243        try {
244            timersLock.lock();
245            com.codahale.metrics.Timer timer = timers.get(key);
246            timer.update(cron.getOwn(), TimeUnit.MILLISECONDS);
247        } catch(ExecutionException ee) {
248            throw new RuntimeException(ee);
249        } finally {
250            timersLock.unlock();
251        }
252    }
254    /**
255     * Add an instrumentation variable. <p>
256     * Internally, this is backed by a {@link Gauge}.
257     *
258     * @param group counter group.
259     * @param name counter name.
260     * @param variable variable to add.
261     */
262    @Override
263    public void addVariable(String group, String name, final Variable variable) {
264        Gauge gauge = new Gauge() {
265            @Override
266            public Object getValue() {
267                return variable.getValue();
268            }
269        };
270        String key =, name);
272        try {
273            gaugesLock.lock();
274            gauges.put(key, gauge);
275            // Metrics throws an Exception if we don't do this when the key already exists
276            if (metricRegistry.getGauges().containsKey(key)) {
277                XLog.getLog(MetricsInstrumentation.class).debug("A Variable with name [" + key + "] already exists. "
278                        + " The old Variable will be overwritten, but this is not recommended");
279                metricRegistry.remove(key);
280            }
281            metricRegistry.register(key, gauge);
282        } finally {
283            gaugesLock.unlock();
284        }
285    }
287   /**
288     * Increment an instrumentation counter. The counter is created if it does not exists. <p>
289     * Internally, this is backed by a {@link Counter}.
290     *
291     * @param group counter group.
292     * @param name counter name.
293     * @param count increment to add to the counter.
294     */
295    @Override
296    public void incr(String group, String name, long count) {
297        String key =, name);
298        try {
299            countersLock.lock();
300            counters.get(key).inc(count);
301        } catch(ExecutionException ee) {
302            throw new RuntimeException(ee);
303        } finally {
304            countersLock.unlock();
305        }
306    }
308    /**
309     * Add a sampling variable. <p>
310     * Internally, this is backed by a biased (decaying) {@link Histogram}.
311     *
312     * @param group timer group.
313     * @param name timer name.
314     * @param period (ignored)
315     * @param interval sampling frequency, how often the variable is probed.
316     * @param variable variable to sample.
317     */
318    @Override
319    public void addSampler(String group, String name, int period, int interval, Variable<Long> variable) {
320        if (scheduler == null) {
321            throw new IllegalStateException("scheduler not set, cannot sample");
322        }
323        Histogram histogram = new Histogram(new ExponentiallyDecayingReservoir());
324        Sampler sampler = new Sampler(variable, histogram);
325        scheduler.scheduleAtFixedRate(sampler, 0, interval, TimeUnit.SECONDS);
326        String key =, name, "histogram");
327        try {
328            histogramsLock.lock();
329            histograms.put(key, histogram);
330            // Metrics throws an Exception if we don't do this when the key already exists
331            if (metricRegistry.getHistograms().containsKey(key)) {
332                XLog.getLog(MetricsInstrumentation.class).debug("A Sampler with name [" + key + "] already exists. "
333                        + " The old Sampler will be overwritten, but this is not recommended");
334                metricRegistry.remove(key);
335            }
336            metricRegistry.register(key, histogram);
337        } finally {
338            histogramsLock.unlock();
339        }
340    }
342    public static class Sampler implements Runnable {
343        private final Variable<Long> variable;
344        private final Histogram histogram;
345        public Sampler(Variable<Long> variable, Histogram histogram) {
346            this.variable = variable;
347            this.histogram = histogram;
348        }
350        @Override
351        public void run() {
352            histogram.update(variable.getValue());
353        }
354    }
356    /**
357     * Set the scheduler instance to handle the samplers.
358     *
359     * @param scheduler scheduler instance.
360     */
361    @Override
362    public void setScheduler(ScheduledExecutorService scheduler) {
363        this.scheduler = scheduler;
364    }
366    /**
367     * Return the string representation of the instrumentation.  It does a JSON pretty-print.
368     *
369     * @return the string representation of the instrumentation.
370     */
371    @Override
372    public String toString() {
373        try {
374            return jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(metricRegistry);
375        } catch (JsonProcessingException jpe) {
376            throw new RuntimeException(jpe);
377        }
378    }
380    /**
381     * Converts the current state of the metrics and writes them to the OutputStream.
382     *
383     * @param os The OutputStream to write the metrics to
384     * @throws IOException in case of error during writing to the stream
385     */
386    public void writeJSONResponse(OutputStream os) throws IOException {
387        jsonMapper.writer().writeValue(os, metricRegistry);
388    }
390    /**
391     * Returns the MetricRegistry: public for unit tests -- do not use.
392     *
393     * @return the MetricRegistry
394     */
395    @VisibleForTesting
396    MetricRegistry getMetricRegistry() {
397        return metricRegistry;
398    }
400    /**
401     * Not Supported: throws {@link UnsupportedOperationException}
402     *
403     * @return nothing
404     */
405    @Override
406    public Map<String, Map<String, Map<String, Object>>> getAll() {
407        throw new UnsupportedOperationException();
408    }
410    /**
411     * Not Supported: throws {@link UnsupportedOperationException}
412     *
413     * @return nothing
414     */
415    @Override
416    public Map<String, Map<String, Element<Long>>> getCounters() {
417        throw new UnsupportedOperationException();
418    }
420    /**
421     * Not Supported: throws {@link UnsupportedOperationException}
422     *
423     * @return nothing
424     */
425    @Override
426    public Map<String, Map<String, Element<Double>>> getSamplers() {
427        throw new UnsupportedOperationException();
428    }
430    /**
431     * Not Supported: throws {@link UnsupportedOperationException}
432     *
433     * @return nothing
434     */
435    @Override
436    public Map<String, Map<String, Element<Timer>>> getTimers() {
437        throw new UnsupportedOperationException();
438    }
440    /**
441     * Not Supported: throws {@link UnsupportedOperationException}
442     *
443     * @return nothing
444     */
445    @Override
446    public Map<String, Map<String, Element<Variable>>> getVariables() {
447        throw new UnsupportedOperationException();
448    }