001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.util; 020 021import com.codahale.metrics.Counter; 022import com.codahale.metrics.ExponentiallyDecayingReservoir; 023import com.codahale.metrics.Gauge; 024import com.codahale.metrics.Histogram; 025import com.codahale.metrics.MetricRegistry; 026import com.codahale.metrics.json.MetricsModule; 027import com.codahale.metrics.jvm.MemoryUsageGaugeSet; 028import com.fasterxml.jackson.core.JsonProcessingException; 029import com.fasterxml.jackson.databind.ObjectMapper; 030import com.google.common.annotations.VisibleForTesting; 031import com.google.common.cache.CacheBuilder; 032import com.google.common.cache.CacheLoader; 033import com.google.common.cache.LoadingCache; 034 035import java.io.IOException; 036import java.io.OutputStream; 037import java.util.Map; 038import java.util.concurrent.ConcurrentHashMap; 039import java.util.concurrent.ExecutionException; 040import java.util.concurrent.ScheduledExecutorService; 041import java.util.concurrent.TimeUnit; 042import java.util.concurrent.locks.Lock; 043import java.util.concurrent.locks.ReentrantLock; 044 045/** 046 * Instrumentation framework that is mostly compatible with {@link Instrumentation} but is backed by Codahale Metrics. This class 047 * was designed to minimize the changes required to switch from {@link Instrumentation} to {@link MetricsInstrumentation} by keeping 048 * the same API. However, certain operations are obviously implemented differently or are no longer needed; and the output format 049 * is a little different. Internally, this class maps Cron to {@link com.codahale.metrics.Timer}, Variable to {@link Gauge}, 050 * counter to {@link Counter}, and Sampler to {@link Histogram}. 051 */ 052@SuppressWarnings("unchecked") 053public class MetricsInstrumentation extends Instrumentation { 054 055 private final MetricRegistry metricRegistry; 056 private transient ObjectMapper jsonMapper; 057 private ScheduledExecutorService scheduler; 058 private final LoadingCache<String, Counter> counters; 059 private final Map<String, Gauge> gauges; 060 private final LoadingCache<String, com.codahale.metrics.Timer> timers; 061 private final Map<String, Histogram> histograms; 062 private Lock timersLock; 063 private Lock gaugesLock; 064 private Lock countersLock; 065 private Lock histogramsLock; 066 067 private static final TimeUnit RATE_UNIT = TimeUnit.MILLISECONDS; 068 private static final TimeUnit DURATION_UNIT = TimeUnit.MILLISECONDS; 069 070 /** 071 * Creates the MetricsInstrumentation and starts taking some metrics. 072 */ 073 public MetricsInstrumentation() { 074 metricRegistry = new MetricRegistry(); 075 076 timersLock = new ReentrantLock(); 077 gaugesLock = new ReentrantLock(); 078 countersLock = new ReentrantLock(); 079 histogramsLock = new ReentrantLock(); 080 081 // Used for writing the json for the metrics (see com.codahale.metrics.servlets.MetricsServlet) 082 // The "false" is to prevent it from printing out all of the values used in the histograms and timers 083 this.jsonMapper = new ObjectMapper().registerModule(new MetricsModule(RATE_UNIT, DURATION_UNIT, false)); 084 085 // Register the JVM memory gauges and prefix the keys 086 MemoryUsageGaugeSet memorySet = new MemoryUsageGaugeSet(); 087 for (String key : memorySet.getMetrics().keySet()) { 088 metricRegistry.register(MetricRegistry.name("jvm", "memory", key), memorySet.getMetrics().get(key)); 089 } 090 091 // By setting this up as a cache, if a counter doesn't exist when we try to retrieve it, it will automatically be created 092 counters = CacheBuilder.newBuilder().build( 093 new CacheLoader<String, Counter>() { 094 @Override 095 public Counter load(String key) throws Exception { 096 Counter counter = new Counter(); 097 metricRegistry.register(key, counter); 098 return counter; 099 } 100 } 101 ); 102 timers = CacheBuilder.newBuilder().build( 103 new CacheLoader<String, com.codahale.metrics.Timer>() { 104 @Override 105 public com.codahale.metrics.Timer load(String key) throws Exception { 106 com.codahale.metrics.Timer timer 107 = new com.codahale.metrics.Timer(new ExponentiallyDecayingReservoir()); 108 metricRegistry.register(key, timer); 109 return timer; 110 } 111 } 112 ); 113 gauges = new ConcurrentHashMap<String, Gauge>(); 114 histograms = new ConcurrentHashMap<String, Histogram>(); 115 } 116 117 /** 118 * Add a cron to an instrumentation timer. The timer is created if it does not exists. <p/> 119 * Internally, this is backed by a {@link com.codahale.metrics.Timer}. 120 * 121 * @param group timer group. 122 * @param name timer name. 123 * @param cron cron to add to the timer. 124 */ 125 @Override 126 public void addCron(String group, String name, Cron cron) { 127 String key = MetricRegistry.name(group, name, "timer"); 128 try { 129 timersLock.lock(); 130 com.codahale.metrics.Timer timer = timers.get(key); 131 timer.update(cron.getOwn(), TimeUnit.MILLISECONDS); 132 } catch(ExecutionException ee) { 133 throw new RuntimeException(ee); 134 } finally { 135 timersLock.unlock(); 136 } 137 } 138 139 /** 140 * Add an instrumentation variable. <p/> 141 * Internally, this is backed by a {@link Gauge}. 142 * 143 * @param group counter group. 144 * @param name counter name. 145 * @param variable variable to add. 146 */ 147 @Override 148 public void addVariable(String group, String name, final Variable variable) { 149 Gauge gauge = new Gauge() { 150 @Override 151 public Object getValue() { 152 return variable.getValue(); 153 } 154 }; 155 String key = MetricRegistry.name(group, name); 156 157 try { 158 gaugesLock.lock(); 159 gauges.put(key, gauge); 160 // Metrics throws an Exception if we don't do this when the key already exists 161 if (metricRegistry.getGauges().containsKey(key)) { 162 XLog.getLog(MetricsInstrumentation.class).debug("A Variable with name [" + key + "] already exists. " 163 + " The old Variable will be overwritten, but this is not recommended"); 164 metricRegistry.remove(key); 165 } 166 metricRegistry.register(key, gauge); 167 } finally { 168 gaugesLock.unlock(); 169 } 170 } 171 172 /** 173 * Increment an instrumentation counter. The counter is created if it does not exists. <p/> 174 * Internally, this is backed by a {@link Counter}. 175 * 176 * @param group counter group. 177 * @param name counter name. 178 * @param count increment to add to the counter. 179 */ 180 @Override 181 public void incr(String group, String name, long count) { 182 String key = MetricRegistry.name(group, name); 183 try { 184 countersLock.lock(); 185 counters.get(key).inc(count); 186 } catch(ExecutionException ee) { 187 throw new RuntimeException(ee); 188 } finally { 189 countersLock.unlock(); 190 } 191 } 192 193 /** 194 * Add a sampling variable. <p/> 195 * Internally, this is backed by a biased (decaying) {@link Histogram}. 196 * 197 * @param group timer group. 198 * @param name timer name. 199 * @param period (ignored) 200 * @param interval sampling frequency, how often the variable is probed. 201 * @param variable variable to sample. 202 */ 203 @Override 204 public void addSampler(String group, String name, int period, int interval, Variable<Long> variable) { 205 if (scheduler == null) { 206 throw new IllegalStateException("scheduler not set, cannot sample"); 207 } 208 Histogram histogram = new Histogram(new ExponentiallyDecayingReservoir()); 209 Sampler sampler = new Sampler(variable, histogram); 210 scheduler.scheduleAtFixedRate(sampler, 0, interval, TimeUnit.SECONDS); 211 String key = MetricRegistry.name(group, name, "histogram"); 212 try { 213 histogramsLock.lock(); 214 histograms.put(key, histogram); 215 // Metrics throws an Exception if we don't do this when the key already exists 216 if (metricRegistry.getHistograms().containsKey(key)) { 217 XLog.getLog(MetricsInstrumentation.class).debug("A Sampler with name [" + key + "] already exists. " 218 + " The old Sampler will be overwritten, but this is not recommended"); 219 metricRegistry.remove(key); 220 } 221 metricRegistry.register(key, histogram); 222 } finally { 223 histogramsLock.unlock(); 224 } 225 } 226 227 public static class Sampler implements Runnable { 228 private final Variable<Long> variable; 229 private final Histogram histogram; 230 public Sampler(Variable<Long> variable, Histogram histogram) { 231 this.variable = variable; 232 this.histogram = histogram; 233 } 234 235 @Override 236 public void run() { 237 histogram.update(variable.getValue()); 238 } 239 } 240 241 /** 242 * Set the scheduler instance to handle the samplers. 243 * 244 * @param scheduler scheduler instance. 245 */ 246 @Override 247 public void setScheduler(ScheduledExecutorService scheduler) { 248 this.scheduler = scheduler; 249 } 250 251 /** 252 * Return the string representation of the instrumentation. It does a JSON pretty-print. 253 * 254 * @return the string representation of the instrumentation. 255 */ 256 @Override 257 public String toString() { 258 try { 259 return jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(metricRegistry); 260 } catch (JsonProcessingException jpe) { 261 throw new RuntimeException(jpe); 262 } 263 } 264 265 /** 266 * Converts the current state of the metrics and writes them to the OutputStream. 267 * 268 * @param os The OutputStream to write the metrics to 269 * @throws IOException 270 */ 271 public void writeJSONResponse(OutputStream os) throws IOException { 272 jsonMapper.writer().writeValue(os, metricRegistry); 273 } 274 275 /** 276 * Returns the MetricRegistry: public for unit tests -- do not use. 277 * 278 * @return the MetricRegistry 279 */ 280 @VisibleForTesting 281 MetricRegistry getMetricRegistry() { 282 return metricRegistry; 283 } 284 285 /** 286 * Not Supported: throws {@link UnsupportedOperationException} 287 * 288 * @return nothing 289 */ 290 @Override 291 public Map<String, Map<String, Map<String, Object>>> getAll() { 292 throw new UnsupportedOperationException(); 293 } 294 295 /** 296 * Not Supported: throws {@link UnsupportedOperationException} 297 * 298 * @return nothing 299 */ 300 @Override 301 public Map<String, Map<String, Element<Long>>> getCounters() { 302 throw new UnsupportedOperationException(); 303 } 304 305 /** 306 * Not Supported: throws {@link UnsupportedOperationException} 307 * 308 * @return nothing 309 */ 310 @Override 311 public Map<String, Map<String, Element<Double>>> getSamplers() { 312 throw new UnsupportedOperationException(); 313 } 314 315 /** 316 * Not Supported: throws {@link UnsupportedOperationException} 317 * 318 * @return nothing 319 */ 320 @Override 321 public Map<String, Map<String, Element<Timer>>> getTimers() { 322 throw new UnsupportedOperationException(); 323 } 324 325 /** 326 * Not Supported: throws {@link UnsupportedOperationException} 327 * 328 * @return nothing 329 */ 330 @Override 331 public Map<String, Map<String, Element<Variable>>> getVariables() { 332 throw new UnsupportedOperationException(); 333 } 334}