001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 020package org.apache.oozie.util; 021 022import com.codahale.metrics.Counter; 023import com.codahale.metrics.ExponentiallyDecayingReservoir; 024import com.codahale.metrics.Gauge; 025import com.codahale.metrics.Histogram; 026import com.codahale.metrics.MetricRegistry; 027import com.codahale.metrics.json.MetricsModule; 028import com.codahale.metrics.jvm.MemoryUsageGaugeSet; 029import com.fasterxml.jackson.core.JsonProcessingException; 030import com.fasterxml.jackson.databind.ObjectMapper; 031import com.google.common.annotations.VisibleForTesting; 032import com.google.common.cache.CacheBuilder; 033import com.google.common.cache.CacheLoader; 034import com.google.common.cache.LoadingCache; 035 036import java.io.IOException; 037import java.io.OutputStream; 038import java.util.Map; 039import java.util.concurrent.ConcurrentHashMap; 040import java.util.concurrent.ExecutionException; 041import java.util.concurrent.ScheduledExecutorService; 042import java.util.concurrent.TimeUnit; 043import java.util.concurrent.locks.Lock; 044import java.util.concurrent.locks.ReentrantLock; 045 046/** 047 * Instrumentation framework that is mostly compatible with {@link Instrumentation} but is backed by Codahale Metrics. This class 048 * was designed to minimize the changes required to switch from {@link Instrumentation} to {@link MetricsInstrumentation} by keeping 049 * the same API. However, certain operations are obviously implemented differently or are no longer needed; and the output format 050 * is a little different. Internally, this class maps Cron to {@link com.codahale.metrics.Timer}, Variable to {@link Gauge}, 051 * counter to {@link Counter}, and Sampler to {@link Histogram}. 052 */ 053@SuppressWarnings("unchecked") 054public class MetricsInstrumentation extends Instrumentation { 055 056 private final MetricRegistry metricRegistry; 057 private transient ObjectMapper jsonMapper; 058 private ScheduledExecutorService scheduler; 059 private final LoadingCache<String, Counter> counters; 060 private final Map<String, Gauge> gauges; 061 private final LoadingCache<String, com.codahale.metrics.Timer> timers; 062 private final Map<String, Histogram> histograms; 063 private Lock timersLock; 064 private Lock gaugesLock; 065 private Lock countersLock; 066 private Lock histogramsLock; 067 068 private static final TimeUnit RATE_UNIT = TimeUnit.MILLISECONDS; 069 private static final TimeUnit DURATION_UNIT = TimeUnit.MILLISECONDS; 070 071 /** 072 * Creates the MetricsInstrumentation and starts taking some metrics. 073 */ 074 public MetricsInstrumentation() { 075 metricRegistry = new MetricRegistry(); 076 077 timersLock = new ReentrantLock(); 078 gaugesLock = new ReentrantLock(); 079 countersLock = new ReentrantLock(); 080 histogramsLock = new ReentrantLock(); 081 082 // Used for writing the json for the metrics (see com.codahale.metrics.servlets.MetricsServlet) 083 // The "false" is to prevent it from printing out all of the values used in the histograms and timers 084 this.jsonMapper = new ObjectMapper().registerModule(new MetricsModule(RATE_UNIT, DURATION_UNIT, false)); 085 086 // Register the JVM memory gauges and prefix the keys 087 MemoryUsageGaugeSet memorySet = new MemoryUsageGaugeSet(); 088 for (String key : memorySet.getMetrics().keySet()) { 089 metricRegistry.register(MetricRegistry.name("jvm", "memory", key), memorySet.getMetrics().get(key)); 090 } 091 092 // By setting this up as a cache, if a counter doesn't exist when we try to retrieve it, it will automatically be created 093 counters = CacheBuilder.newBuilder().build( 094 new CacheLoader<String, Counter>() { 095 @Override 096 public Counter load(String key) throws Exception { 097 Counter counter = new Counter(); 098 metricRegistry.register(key, counter); 099 return counter; 100 } 101 } 102 ); 103 timers = CacheBuilder.newBuilder().build( 104 new CacheLoader<String, com.codahale.metrics.Timer>() { 105 @Override 106 public com.codahale.metrics.Timer load(String key) throws Exception { 107 com.codahale.metrics.Timer timer 108 = new com.codahale.metrics.Timer(new ExponentiallyDecayingReservoir()); 109 metricRegistry.register(key, timer); 110 return timer; 111 } 112 } 113 ); 114 gauges = new ConcurrentHashMap<String, Gauge>(); 115 histograms = new ConcurrentHashMap<String, Histogram>(); 116 } 117 118 /** 119 * Add a cron to an instrumentation timer. The timer is created if it does not exists. <p/> 120 * Internally, this is backed by a {@link com.codahale.metrics.Timer}. 121 * 122 * @param group timer group. 123 * @param name timer name. 124 * @param cron cron to add to the timer. 125 */ 126 @Override 127 public void addCron(String group, String name, Cron cron) { 128 String key = MetricRegistry.name(group, name, "timer"); 129 try { 130 timersLock.lock(); 131 com.codahale.metrics.Timer timer = timers.get(key); 132 timer.update(cron.getOwn(), TimeUnit.MILLISECONDS); 133 } catch(ExecutionException ee) { 134 throw new RuntimeException(ee); 135 } finally { 136 timersLock.unlock(); 137 } 138 } 139 140 /** 141 * Add an instrumentation variable. <p/> 142 * Internally, this is backed by a {@link Gauge}. 143 * 144 * @param group counter group. 145 * @param name counter name. 146 * @param variable variable to add. 147 */ 148 @Override 149 public void addVariable(String group, String name, final Variable variable) { 150 Gauge gauge = new Gauge() { 151 @Override 152 public Object getValue() { 153 return variable.getValue(); 154 } 155 }; 156 String key = MetricRegistry.name(group, name); 157 158 try { 159 gaugesLock.lock(); 160 gauges.put(key, gauge); 161 // Metrics throws an Exception if we don't do this when the key already exists 162 if (metricRegistry.getGauges().containsKey(key)) { 163 XLog.getLog(MetricsInstrumentation.class).debug("A Variable with name [" + key + "] already exists. " 164 + " The old Variable will be overwritten, but this is not recommended"); 165 metricRegistry.remove(key); 166 } 167 metricRegistry.register(key, gauge); 168 } finally { 169 gaugesLock.unlock(); 170 } 171 } 172 173 /** 174 * Increment an instrumentation counter. The counter is created if it does not exists. <p/> 175 * Internally, this is backed by a {@link Counter}. 176 * 177 * @param group counter group. 178 * @param name counter name. 179 * @param count increment to add to the counter. 180 */ 181 @Override 182 public void incr(String group, String name, long count) { 183 String key = MetricRegistry.name(group, name); 184 try { 185 countersLock.lock(); 186 counters.get(key).inc(count); 187 } catch(ExecutionException ee) { 188 throw new RuntimeException(ee); 189 } finally { 190 countersLock.unlock(); 191 } 192 } 193 194 /** 195 * Add a sampling variable. <p/> 196 * Internally, this is backed by a biased (decaying) {@link Histogram}. 197 * 198 * @param group timer group. 199 * @param name timer name. 200 * @param period (ignored) 201 * @param interval sampling frequency, how often the variable is probed. 202 * @param variable variable to sample. 203 */ 204 @Override 205 public void addSampler(String group, String name, int period, int interval, Variable<Long> variable) { 206 if (scheduler == null) { 207 throw new IllegalStateException("scheduler not set, cannot sample"); 208 } 209 Histogram histogram = new Histogram(new ExponentiallyDecayingReservoir()); 210 Sampler sampler = new Sampler(variable, histogram); 211 scheduler.scheduleAtFixedRate(sampler, 0, interval, TimeUnit.SECONDS); 212 String key = MetricRegistry.name(group, name, "histogram"); 213 try { 214 histogramsLock.lock(); 215 histograms.put(key, histogram); 216 // Metrics throws an Exception if we don't do this when the key already exists 217 if (metricRegistry.getHistograms().containsKey(key)) { 218 XLog.getLog(MetricsInstrumentation.class).debug("A Sampler with name [" + key + "] already exists. " 219 + " The old Sampler will be overwritten, but this is not recommended"); 220 metricRegistry.remove(key); 221 } 222 metricRegistry.register(key, histogram); 223 } finally { 224 histogramsLock.unlock(); 225 } 226 } 227 228 public static class Sampler implements Runnable { 229 private final Variable<Long> variable; 230 private final Histogram histogram; 231 public Sampler(Variable<Long> variable, Histogram histogram) { 232 this.variable = variable; 233 this.histogram = histogram; 234 } 235 236 @Override 237 public void run() { 238 histogram.update(variable.getValue()); 239 } 240 } 241 242 /** 243 * Set the scheduler instance to handle the samplers. 244 * 245 * @param scheduler scheduler instance. 246 */ 247 @Override 248 public void setScheduler(ScheduledExecutorService scheduler) { 249 this.scheduler = scheduler; 250 } 251 252 /** 253 * Return the string representation of the instrumentation. It does a JSON pretty-print. 254 * 255 * @return the string representation of the instrumentation. 256 */ 257 @Override 258 public String toString() { 259 try { 260 return jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(metricRegistry); 261 } catch (JsonProcessingException jpe) { 262 throw new RuntimeException(jpe); 263 } 264 } 265 266 /** 267 * Converts the current state of the metrics and writes them to the OutputStream. 268 * 269 * @param os The OutputStream to write the metrics to 270 * @throws IOException 271 */ 272 public void writeJSONResponse(OutputStream os) throws IOException { 273 jsonMapper.writer().writeValue(os, metricRegistry); 274 } 275 276 /** 277 * Returns the MetricRegistry: public for unit tests -- do not use. 278 * 279 * @return the MetricRegistry 280 */ 281 @VisibleForTesting 282 MetricRegistry getMetricRegistry() { 283 return metricRegistry; 284 } 285 286 /** 287 * Not Supported: throws {@link UnsupportedOperationException} 288 * 289 * @return nothing 290 */ 291 @Override 292 public Map<String, Map<String, Map<String, Object>>> getAll() { 293 throw new UnsupportedOperationException(); 294 } 295 296 /** 297 * Not Supported: throws {@link UnsupportedOperationException} 298 * 299 * @return nothing 300 */ 301 @Override 302 public Map<String, Map<String, Element<Long>>> getCounters() { 303 throw new UnsupportedOperationException(); 304 } 305 306 /** 307 * Not Supported: throws {@link UnsupportedOperationException} 308 * 309 * @return nothing 310 */ 311 @Override 312 public Map<String, Map<String, Element<Double>>> getSamplers() { 313 throw new UnsupportedOperationException(); 314 } 315 316 /** 317 * Not Supported: throws {@link UnsupportedOperationException} 318 * 319 * @return nothing 320 */ 321 @Override 322 public Map<String, Map<String, Element<Timer>>> getTimers() { 323 throw new UnsupportedOperationException(); 324 } 325 326 /** 327 * Not Supported: throws {@link UnsupportedOperationException} 328 * 329 * @return nothing 330 */ 331 @Override 332 public Map<String, Map<String, Element<Variable>>> getVariables() { 333 throw new UnsupportedOperationException(); 334 } 335}