001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 020package org.apache.oozie.util; 021 022import com.codahale.metrics.Counter; 023import com.codahale.metrics.ExponentiallyDecayingReservoir; 024import com.codahale.metrics.Gauge; 025import com.codahale.metrics.Histogram; 026import com.codahale.metrics.JmxReporter; 027import com.codahale.metrics.MetricFilter; 028import com.codahale.metrics.MetricRegistry; 029import com.codahale.metrics.ganglia.GangliaReporter; 030import com.codahale.metrics.graphite.Graphite; 031import com.codahale.metrics.graphite.GraphiteReporter; 032import com.codahale.metrics.json.MetricsModule; 033import com.codahale.metrics.jvm.MemoryUsageGaugeSet; 034import com.fasterxml.jackson.core.JsonProcessingException; 035import com.fasterxml.jackson.databind.ObjectMapper; 036import com.google.common.annotations.VisibleForTesting; 037import com.google.common.cache.CacheBuilder; 038import com.google.common.cache.CacheLoader; 039import com.google.common.cache.LoadingCache; 040import info.ganglia.gmetric4j.gmetric.GMetric; 041import org.apache.oozie.service.ConfigurationService; 042 043import java.io.IOException; 044import java.io.OutputStream; 045import java.net.InetSocketAddress; 046import java.net.MalformedURLException; 047import java.net.URL; 048import java.util.Map; 049import java.util.concurrent.ConcurrentHashMap; 050import java.util.concurrent.ExecutionException; 051import java.util.concurrent.ScheduledExecutorService; 052import java.util.concurrent.TimeUnit; 053import java.util.concurrent.locks.Lock; 054import java.util.concurrent.locks.ReentrantLock; 055 056/** 057 * Instrumentation framework that is mostly compatible with {@link Instrumentation} but is backed by Codahale Metrics. This class 058 * was designed to minimize the changes required to switch from {@link Instrumentation} to {@link MetricsInstrumentation} by keeping 059 * the same API. However, certain operations are obviously implemented differently or are no longer needed; and the output format 060 * is a little different. Internally, this class maps Cron to {@link com.codahale.metrics.Timer}, Variable to {@link Gauge}, 061 * counter to {@link Counter}, and Sampler to {@link Histogram}. 062 */ 063@SuppressWarnings("unchecked") 064public class MetricsInstrumentation extends Instrumentation { 065 066 private final MetricRegistry metricRegistry; 067 private transient ObjectMapper jsonMapper; 068 private ScheduledExecutorService scheduler; 069 private final LoadingCache<String, Counter> counters; 070 private final Map<String, Gauge> gauges; 071 private final LoadingCache<String, com.codahale.metrics.Timer> timers; 072 private final Map<String, Histogram> histograms; 073 private Lock timersLock; 074 private Lock gaugesLock; 075 private Lock countersLock; 076 private Lock histogramsLock; 077 078 public static final String EXTERNAL_MONITORING_ENABLE = "oozie.external_monitoring.enable"; 079 public static final String EXTERNAL_MONITORING_TYPE = "oozie.external_monitoring.type"; 080 public static final String EXTERNAL_MONITORING_ADDRESS = "oozie.external_monitoring.address"; 081 public static final String EXTERNAL_MONITORING_PREFIX = "oozie.external_monitoring.metricPrefix"; 082 public static final String EXTERNAL_MONITORING_INTERVAL = "oozie.external_monitoring.reporterIntervalSecs"; 083 public static final String JMX_MONITORING_ENABLE = "oozie.jmx_monitoring.enable"; 084 public static final String GRAPHITE="graphite"; 085 public static final String GANGLIA="ganglia"; 086 private String metricsAddress; 087 private String metricsHost; 088 private String metricsPrefix; 089 private String metricsServerName; 090 private int metricsPort; 091 private GraphiteReporter graphiteReporter = null; 092 private GangliaReporter gangliaReporter = null; 093 private JmxReporter jmxReporter = null; 094 private long metricsReportIntervalSec; 095 private boolean isExternalMonitoringEnabled; 096 private boolean isJMXMonitoringEnabled; 097 098 private static final TimeUnit RATE_UNIT = TimeUnit.MILLISECONDS; 099 private static final TimeUnit DURATION_UNIT = TimeUnit.MILLISECONDS; 100 101 protected XLog LOG = XLog.getLog(getClass()); 102 103 /** 104 * Creates the MetricsInstrumentation and starts taking some metrics. 105 */ 106 public MetricsInstrumentation() { 107 metricRegistry = new MetricRegistry(); 108 109 isExternalMonitoringEnabled = ConfigurationService.getBoolean(EXTERNAL_MONITORING_ENABLE); 110 if(isExternalMonitoringEnabled) { 111 metricsServerName = ConfigurationService.get(EXTERNAL_MONITORING_TYPE); 112 if (metricsServerName != null) { 113 String modifiedServerName = metricsServerName.trim().toLowerCase(); 114 if (modifiedServerName.equals(GRAPHITE) || modifiedServerName.equals(GANGLIA)) { 115 metricsAddress = ConfigurationService.get(EXTERNAL_MONITORING_ADDRESS); 116 metricsPrefix = ConfigurationService.get(EXTERNAL_MONITORING_PREFIX); 117 metricsReportIntervalSec = ConfigurationService.getLong(EXTERNAL_MONITORING_INTERVAL); 118 LOG.debug("Publishing external monitoring to [{0}] at host [{1}] every [{2}] seconds with prefix " + 119 "[{3}]", metricsServerName, metricsAddress, metricsReportIntervalSec, metricsPrefix); 120 121 try { 122 URL url = new URL(metricsAddress); 123 metricsHost = url.getHost(); 124 metricsPort = url.getPort(); 125 } catch (MalformedURLException e) { 126 LOG.error("Exception, ", e); 127 } 128 129 if (modifiedServerName.equals(GRAPHITE)) { 130 Graphite graphite = new Graphite(new InetSocketAddress(metricsHost, metricsPort)); 131 graphiteReporter = GraphiteReporter.forRegistry(metricRegistry).prefixedWith(metricsPrefix) 132 .convertDurationsTo(TimeUnit.SECONDS).filter(MetricFilter.ALL).build(graphite); 133 graphiteReporter.start(metricsReportIntervalSec, TimeUnit.SECONDS); 134 } 135 136 if (modifiedServerName.equals(GANGLIA)) { 137 GMetric ganglia; 138 try { 139 ganglia = new GMetric(metricsHost, metricsPort, GMetric.UDPAddressingMode.MULTICAST, 1); 140 } catch (IOException e) { 141 LOG.error("Exception, ", e); 142 throw new RuntimeException(e); 143 } 144 gangliaReporter = GangliaReporter.forRegistry(metricRegistry).prefixedWith(metricsPrefix) 145 .convertRatesTo(TimeUnit.SECONDS) 146 .convertDurationsTo(TimeUnit.MILLISECONDS) 147 .build(ganglia); 148 gangliaReporter.start(metricsReportIntervalSec, TimeUnit.SECONDS); 149 } 150 } else { 151 throw new RuntimeException("Metrics Server Name should be either graphite or ganglia"); 152 } 153 } 154 else { 155 throw new RuntimeException("Metrics Server Name is not specified"); 156 } 157 } 158 159 timersLock = new ReentrantLock(); 160 gaugesLock = new ReentrantLock(); 161 countersLock = new ReentrantLock(); 162 histogramsLock = new ReentrantLock(); 163 164 // Used for writing the json for the metrics (see com.codahale.metrics.servlets.MetricsServlet) 165 // The "false" is to prevent it from printing out all of the values used in the histograms and timers 166 this.jsonMapper = new ObjectMapper().registerModule(new MetricsModule(RATE_UNIT, DURATION_UNIT, false)); 167 168 // Register the JVM memory gauges and prefix the keys 169 MemoryUsageGaugeSet memorySet = new MemoryUsageGaugeSet(); 170 for (String key : memorySet.getMetrics().keySet()) { 171 metricRegistry.register(MetricRegistry.name("jvm", "memory", key), memorySet.getMetrics().get(key)); 172 } 173 174 // By setting this up as a cache, if a counter doesn't exist when we try to retrieve it, it will automatically be created 175 counters = CacheBuilder.newBuilder().build( 176 new CacheLoader<String, Counter>() { 177 @Override 178 public Counter load(String key) throws Exception { 179 Counter counter = new Counter(); 180 metricRegistry.register(key, counter); 181 return counter; 182 } 183 } 184 ); 185 timers = CacheBuilder.newBuilder().build( 186 new CacheLoader<String, com.codahale.metrics.Timer>() { 187 @Override 188 public com.codahale.metrics.Timer load(String key) throws Exception { 189 com.codahale.metrics.Timer timer 190 = new com.codahale.metrics.Timer(new ExponentiallyDecayingReservoir()); 191 metricRegistry.register(key, timer); 192 return timer; 193 } 194 } 195 ); 196 gauges = new ConcurrentHashMap<String, Gauge>(); 197 histograms = new ConcurrentHashMap<String, Histogram>(); 198 isJMXMonitoringEnabled = ConfigurationService.getBoolean(JMX_MONITORING_ENABLE); 199 if (isJMXMonitoringEnabled) { 200 jmxReporter = JmxReporter.forRegistry(metricRegistry).build(); 201 jmxReporter.start(); 202 } 203 } 204 205 /** 206 * Reporting final metrics into the server before stopping 207 */ 208 @Override 209 public void stop() { 210 if (graphiteReporter != null) { 211 try { 212 // reporting final metrics into graphite before stopping 213 graphiteReporter.report(); 214 } finally { 215 graphiteReporter.stop(); 216 } 217 } 218 if (gangliaReporter != null) { 219 try { 220 // reporting final metrics into ganglia before stopping 221 gangliaReporter.report(); 222 } finally { 223 gangliaReporter.stop(); 224 } 225 } 226 227 if (jmxReporter != null) { 228 jmxReporter.stop(); 229 } 230 } 231 232 /** 233 * Add a cron to an instrumentation timer. The timer is created if it does not exists. <p> 234 * Internally, this is backed by a {@link com.codahale.metrics.Timer}. 235 * 236 * @param group timer group. 237 * @param name timer name. 238 * @param cron cron to add to the timer. 239 */ 240 @Override 241 public void addCron(String group, String name, Cron cron) { 242 String key = MetricRegistry.name(group, name, "timer"); 243 try { 244 timersLock.lock(); 245 com.codahale.metrics.Timer timer = timers.get(key); 246 timer.update(cron.getOwn(), TimeUnit.MILLISECONDS); 247 } catch(ExecutionException ee) { 248 throw new RuntimeException(ee); 249 } finally { 250 timersLock.unlock(); 251 } 252 } 253 254 /** 255 * Add an instrumentation variable. <p> 256 * Internally, this is backed by a {@link Gauge}. 257 * 258 * @param group counter group. 259 * @param name counter name. 260 * @param variable variable to add. 261 */ 262 @Override 263 public void addVariable(String group, String name, final Variable variable) { 264 Gauge gauge = new Gauge() { 265 @Override 266 public Object getValue() { 267 return variable.getValue(); 268 } 269 }; 270 String key = MetricRegistry.name(group, name); 271 272 try { 273 gaugesLock.lock(); 274 gauges.put(key, gauge); 275 // Metrics throws an Exception if we don't do this when the key already exists 276 if (metricRegistry.getGauges().containsKey(key)) { 277 XLog.getLog(MetricsInstrumentation.class).debug("A Variable with name [" + key + "] already exists. " 278 + " The old Variable will be overwritten, but this is not recommended"); 279 metricRegistry.remove(key); 280 } 281 metricRegistry.register(key, gauge); 282 } finally { 283 gaugesLock.unlock(); 284 } 285 } 286 287 /** 288 * Increment an instrumentation counter. The counter is created if it does not exists. <p> 289 * Internally, this is backed by a {@link Counter}. 290 * 291 * @param group counter group. 292 * @param name counter name. 293 * @param count increment to add to the counter. 294 */ 295 @Override 296 public void incr(String group, String name, long count) { 297 String key = MetricRegistry.name(group, name); 298 try { 299 countersLock.lock(); 300 counters.get(key).inc(count); 301 } catch(ExecutionException ee) { 302 throw new RuntimeException(ee); 303 } finally { 304 countersLock.unlock(); 305 } 306 } 307 308 /** 309 * Add a sampling variable. <p> 310 * Internally, this is backed by a biased (decaying) {@link Histogram}. 311 * 312 * @param group timer group. 313 * @param name timer name. 314 * @param period (ignored) 315 * @param interval sampling frequency, how often the variable is probed. 316 * @param variable variable to sample. 317 */ 318 @Override 319 public void addSampler(String group, String name, int period, int interval, Variable<Long> variable) { 320 if (scheduler == null) { 321 throw new IllegalStateException("scheduler not set, cannot sample"); 322 } 323 Histogram histogram = new Histogram(new ExponentiallyDecayingReservoir()); 324 Sampler sampler = new Sampler(variable, histogram); 325 scheduler.scheduleAtFixedRate(sampler, 0, interval, TimeUnit.SECONDS); 326 String key = MetricRegistry.name(group, name, "histogram"); 327 try { 328 histogramsLock.lock(); 329 histograms.put(key, histogram); 330 // Metrics throws an Exception if we don't do this when the key already exists 331 if (metricRegistry.getHistograms().containsKey(key)) { 332 XLog.getLog(MetricsInstrumentation.class).debug("A Sampler with name [" + key + "] already exists. " 333 + " The old Sampler will be overwritten, but this is not recommended"); 334 metricRegistry.remove(key); 335 } 336 metricRegistry.register(key, histogram); 337 } finally { 338 histogramsLock.unlock(); 339 } 340 } 341 342 public static class Sampler implements Runnable { 343 private final Variable<Long> variable; 344 private final Histogram histogram; 345 public Sampler(Variable<Long> variable, Histogram histogram) { 346 this.variable = variable; 347 this.histogram = histogram; 348 } 349 350 @Override 351 public void run() { 352 histogram.update(variable.getValue()); 353 } 354 } 355 356 /** 357 * Set the scheduler instance to handle the samplers. 358 * 359 * @param scheduler scheduler instance. 360 */ 361 @Override 362 public void setScheduler(ScheduledExecutorService scheduler) { 363 this.scheduler = scheduler; 364 } 365 366 /** 367 * Return the string representation of the instrumentation. It does a JSON pretty-print. 368 * 369 * @return the string representation of the instrumentation. 370 */ 371 @Override 372 public String toString() { 373 try { 374 return jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(metricRegistry); 375 } catch (JsonProcessingException jpe) { 376 throw new RuntimeException(jpe); 377 } 378 } 379 380 /** 381 * Converts the current state of the metrics and writes them to the OutputStream. 382 * 383 * @param os The OutputStream to write the metrics to 384 * @throws IOException in case of error during writing to the stream 385 */ 386 public void writeJSONResponse(OutputStream os) throws IOException { 387 jsonMapper.writer().writeValue(os, metricRegistry); 388 } 389 390 /** 391 * Returns the MetricRegistry: public for unit tests -- do not use. 392 * 393 * @return the MetricRegistry 394 */ 395 @VisibleForTesting 396 MetricRegistry getMetricRegistry() { 397 return metricRegistry; 398 } 399 400 /** 401 * Not Supported: throws {@link UnsupportedOperationException} 402 * 403 * @return nothing 404 */ 405 @Override 406 public Map<String, Map<String, Map<String, Object>>> getAll() { 407 throw new UnsupportedOperationException(); 408 } 409 410 /** 411 * Not Supported: throws {@link UnsupportedOperationException} 412 * 413 * @return nothing 414 */ 415 @Override 416 public Map<String, Map<String, Element<Long>>> getCounters() { 417 throw new UnsupportedOperationException(); 418 } 419 420 /** 421 * Not Supported: throws {@link UnsupportedOperationException} 422 * 423 * @return nothing 424 */ 425 @Override 426 public Map<String, Map<String, Element<Double>>> getSamplers() { 427 throw new UnsupportedOperationException(); 428 } 429 430 /** 431 * Not Supported: throws {@link UnsupportedOperationException} 432 * 433 * @return nothing 434 */ 435 @Override 436 public Map<String, Map<String, Element<Timer>>> getTimers() { 437 throw new UnsupportedOperationException(); 438 } 439 440 /** 441 * Not Supported: throws {@link UnsupportedOperationException} 442 * 443 * @return nothing 444 */ 445 @Override 446 public Map<String, Map<String, Element<Variable>>> getVariables() { 447 throw new UnsupportedOperationException(); 448 } 449}