001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.util; 020 021import org.apache.hadoop.conf.Configuration; 022import org.apache.oozie.service.ConfigurationService; 023import org.apache.oozie.service.Services; 024 025import java.util.ArrayList; 026import java.util.Collection; 027import java.util.Collections; 028import java.util.HashMap; 029import java.util.LinkedHashMap; 030import java.util.LinkedHashSet; 031import java.util.List; 032import java.util.Map; 033import java.util.Set; 034import java.util.concurrent.ConcurrentHashMap; 035import java.util.concurrent.ScheduledExecutorService; 036import java.util.concurrent.TimeUnit; 037import java.util.concurrent.atomic.AtomicLong; 038import java.util.concurrent.locks.Lock; 039import java.util.concurrent.locks.ReentrantLock; 040 041/** 042 * Instrumentation framework that supports Timers, Counters, Variables and Sampler instrumentation elements. <p/> All 043 * instrumentation elements have a group and a name. 044 */ 045public class Instrumentation { 046 private ScheduledExecutorService scheduler; 047 private Lock counterLock; 048 private Lock timerLock; 049 private Lock variableLock; 050 private Lock samplerLock; 051 private Map<String, Map<String, Map<String, Object>>> all; 052 private Map<String, Map<String, Element<Long>>> counters; 053 private Map<String, Map<String, Element<Timer>>> timers; 054 private Map<String, Map<String, Element<Variable>>> variables; 055 private Map<String, Map<String, Element<Double>>> samplers; 056 057 /** 058 * Instrumentation constructor. 059 */ 060 @SuppressWarnings("unchecked") 061 public Instrumentation() { 062 counterLock = new ReentrantLock(); 063 timerLock = new ReentrantLock(); 064 variableLock = new ReentrantLock(); 065 samplerLock = new ReentrantLock(); 066 all = new LinkedHashMap<String, Map<String, Map<String, Object>>>(); 067 counters = new ConcurrentHashMap<String, Map<String, Element<Long>>>(); 068 timers = new ConcurrentHashMap<String, Map<String, Element<Timer>>>(); 069 variables = new ConcurrentHashMap<String, Map<String, Element<Variable>>>(); 070 samplers = new ConcurrentHashMap<String, Map<String, Element<Double>>>(); 071 all.put("variables", (Map<String, Map<String, Object>>) (Object) variables); 072 all.put("samplers", (Map<String, Map<String, Object>>) (Object) samplers); 073 all.put("counters", (Map<String, Map<String, Object>>) (Object) counters); 074 all.put("timers", (Map<String, Map<String, Object>>) (Object) timers); 075 } 076 077 /** 078 * Set the scheduler instance to handle the samplers. 079 * 080 * @param scheduler scheduler instance. 081 */ 082 public void setScheduler(ScheduledExecutorService scheduler) { 083 this.scheduler = scheduler; 084 } 085 086 /** 087 * Cron is a stopwatch that can be started/stopped several times. <p/> This class is not thread safe, it does not 088 * need to be. <p/> It keeps track of the total time (first start to last stop) and the running time (total time 089 * minus the stopped intervals). <p/> Once a Cron is complete it must be added to the corresponding group/name in a 090 * Instrumentation instance. 091 */ 092 public static class Cron { 093 private long start; 094 private long end; 095 private long lapStart; 096 private long own; 097 private long total; 098 private boolean running; 099 100 /** 101 * Creates new Cron, stopped, in zero. 102 */ 103 public Cron() { 104 running = false; 105 } 106 107 /** 108 * Start the cron. It cannot be already started. 109 */ 110 public void start() { 111 if (!running) { 112 if (lapStart == 0) { 113 lapStart = System.currentTimeMillis(); 114 if (start == 0) { 115 start = lapStart; 116 end = start; 117 } 118 } 119 running = true; 120 } 121 } 122 123 /** 124 * Stops the cron. It cannot be already stopped. 125 */ 126 public void stop() { 127 if (running) { 128 end = System.currentTimeMillis(); 129 if (start == 0) { 130 start = end; 131 } 132 total = end - start; 133 if (lapStart > 0) { 134 own += end - lapStart; 135 lapStart = 0; 136 } 137 running = false; 138 } 139 } 140 141 /** 142 * Return the start time of the cron. It must be stopped. 143 * 144 * @return the start time of the cron. 145 */ 146 public long getStart() { 147 if (running) { 148 throw new IllegalStateException("Timer running"); 149 } 150 return start; 151 } 152 153 /** 154 * Return the end time of the cron. It must be stopped. 155 * 156 * @return the end time of the cron. 157 */ 158 public long getEnd() { 159 if (running) { 160 throw new IllegalStateException("Timer running"); 161 } 162 return end; 163 } 164 165 /** 166 * Return the total time of the cron. It must be stopped. 167 * 168 * @return the total time of the cron. 169 */ 170 public long getTotal() { 171 if (running) { 172 throw new IllegalStateException("Timer running"); 173 } 174 return total; 175 } 176 177 /** 178 * Return the own time of the cron. It must be stopped. 179 * 180 * @return the own time of the cron. 181 */ 182 public long getOwn() { 183 if (running) { 184 throw new IllegalStateException("Timer running"); 185 } 186 return own; 187 } 188 189 } 190 191 /** 192 * Gives access to a snapshot of an Instrumentation element (Counter, Timer). <p/> Instrumentation element snapshots 193 * are returned by the {@link Instrumentation#getCounters()} and {@link Instrumentation#getTimers()} ()} methods. 194 */ 195 public interface Element<T> { 196 197 /** 198 * Return the snapshot value of the Intrumentation element. 199 * 200 * @return the snapshot value of the Intrumentation element. 201 */ 202 T getValue(); 203 } 204 205 /** 206 * Counter Instrumentation element. 207 */ 208 private static class Counter extends AtomicLong implements Element<Long> { 209 210 /** 211 * Return the counter snapshot. 212 * 213 * @return the counter snapshot. 214 */ 215 public Long getValue() { 216 return get(); 217 } 218 219 /** 220 * Return the String representation of the counter value. 221 * 222 * @return the String representation of the counter value. 223 */ 224 public String toString() { 225 return Long.toString(get()); 226 } 227 228 } 229 230 /** 231 * Timer Instrumentation element. 232 */ 233 public static class Timer implements Element<Timer> { 234 Lock lock = new ReentrantLock(); 235 private long ownTime; 236 private long totalTime; 237 private long ticks; 238 private long ownSquareTime; 239 private long totalSquareTime; 240 private long ownMinTime; 241 private long ownMaxTime; 242 private long totalMinTime; 243 private long totalMaxTime; 244 245 /** 246 * Timer constructor. <p/> It is project private for test purposes. 247 */ 248 Timer() { 249 } 250 251 /** 252 * Return the String representation of the timer value. 253 * 254 * @return the String representation of the timer value. 255 */ 256 public String toString() { 257 return XLog.format("ticks[{0}] totalAvg[{1}] ownAvg[{2}]", ticks, getTotalAvg(), getOwnAvg()); 258 } 259 260 /** 261 * Return the timer snapshot. 262 * 263 * @return the timer snapshot. 264 */ 265 public Timer getValue() { 266 try { 267 lock.lock(); 268 Timer timer = new Timer(); 269 timer.ownTime = ownTime; 270 timer.totalTime = totalTime; 271 timer.ticks = ticks; 272 timer.ownSquareTime = ownSquareTime; 273 timer.totalSquareTime = totalSquareTime; 274 timer.ownMinTime = ownMinTime; 275 timer.ownMaxTime = ownMaxTime; 276 timer.totalMinTime = totalMinTime; 277 timer.totalMaxTime = totalMaxTime; 278 return timer; 279 } 280 finally { 281 lock.unlock(); 282 } 283 } 284 285 /** 286 * Add a cron to a timer. <p/> It is project private for test purposes. 287 * 288 * @param cron Cron to add. 289 */ 290 void addCron(Cron cron) { 291 try { 292 lock.lock(); 293 long own = cron.getOwn(); 294 long total = cron.getTotal(); 295 ownTime += own; 296 totalTime += total; 297 ticks++; 298 ownSquareTime += own * own; 299 totalSquareTime += total * total; 300 if (ticks == 1) { 301 ownMinTime = own; 302 ownMaxTime = own; 303 totalMinTime = total; 304 totalMaxTime = total; 305 } 306 else { 307 ownMinTime = Math.min(ownMinTime, own); 308 ownMaxTime = Math.max(ownMaxTime, own); 309 totalMinTime = Math.min(totalMinTime, total); 310 totalMaxTime = Math.max(totalMaxTime, total); 311 } 312 } 313 finally { 314 lock.unlock(); 315 } 316 } 317 318 /** 319 * Return the own accumulated computing time by the timer. 320 * 321 * @return own accumulated computing time by the timer. 322 */ 323 public long getOwn() { 324 return ownTime; 325 } 326 327 /** 328 * Return the total accumulated computing time by the timer. 329 * 330 * @return total accumulated computing time by the timer. 331 */ 332 public long getTotal() { 333 return totalTime; 334 } 335 336 /** 337 * Return the number of times a cron was added to the timer. 338 * 339 * @return the number of times a cron was added to the timer. 340 */ 341 public long getTicks() { 342 return ticks; 343 } 344 345 /** 346 * Return the sum of the square own times. <p/> It can be used to calculate the standard deviation. 347 * 348 * @return the sum of the square own timer. 349 */ 350 public long getOwnSquareSum() { 351 return ownSquareTime; 352 } 353 354 /** 355 * Return the sum of the square total times. <p/> It can be used to calculate the standard deviation. 356 * 357 * @return the sum of the square own timer. 358 */ 359 public long getTotalSquareSum() { 360 return totalSquareTime; 361 } 362 363 /** 364 * Returns the own minimum time. 365 * 366 * @return the own minimum time. 367 */ 368 public long getOwnMin() { 369 return ownMinTime; 370 } 371 372 /** 373 * Returns the own maximum time. 374 * 375 * @return the own maximum time. 376 */ 377 public long getOwnMax() { 378 return ownMaxTime; 379 } 380 381 /** 382 * Returns the total minimum time. 383 * 384 * @return the total minimum time. 385 */ 386 public long getTotalMin() { 387 return totalMinTime; 388 } 389 390 /** 391 * Returns the total maximum time. 392 * 393 * @return the total maximum time. 394 */ 395 public long getTotalMax() { 396 return totalMaxTime; 397 } 398 399 /** 400 * Returns the own average time. 401 * 402 * @return the own average time. 403 */ 404 public long getOwnAvg() { 405 return (ticks != 0) ? ownTime / ticks : 0; 406 } 407 408 /** 409 * Returns the total average time. 410 * 411 * @return the total average time. 412 */ 413 public long getTotalAvg() { 414 return (ticks != 0) ? totalTime / ticks : 0; 415 } 416 417 /** 418 * Returns the total time standard deviation. 419 * 420 * @return the total time standard deviation. 421 */ 422 public double getTotalStdDev() { 423 return evalStdDev(ticks, totalTime, totalSquareTime); 424 } 425 426 /** 427 * Returns the own time standard deviation. 428 * 429 * @return the own time standard deviation. 430 */ 431 public double getOwnStdDev() { 432 return evalStdDev(ticks, ownTime, ownSquareTime); 433 } 434 435 private double evalStdDev(long n, long sn, long ssn) { 436 return (n < 2) ? -1 : Math.sqrt((n * ssn - sn * sn) / (n * (n - 1))); 437 } 438 439 } 440 441 /** 442 * Add a cron to an instrumentation timer. The timer is created if it does not exists. <p/> This method is thread 443 * safe. 444 * 445 * @param group timer group. 446 * @param name timer name. 447 * @param cron cron to add to the timer. 448 */ 449 public void addCron(String group, String name, Cron cron) { 450 Map<String, Element<Timer>> map = timers.get(group); 451 if (map == null) { 452 try { 453 timerLock.lock(); 454 map = timers.get(group); 455 if (map == null) { 456 map = new HashMap<String, Element<Timer>>(); 457 timers.put(group, map); 458 } 459 } 460 finally { 461 timerLock.unlock(); 462 } 463 } 464 Timer timer = (Timer) map.get(name); 465 if (timer == null) { 466 try { 467 timerLock.lock(); 468 timer = (Timer) map.get(name); 469 if (timer == null) { 470 timer = new Timer(); 471 map.put(name, timer); 472 } 473 } 474 finally { 475 timerLock.unlock(); 476 } 477 } 478 timer.addCron(cron); 479 } 480 481 /** 482 * Increment an instrumentation counter. The counter is created if it does not exists. <p/> This method is thread 483 * safe. 484 * 485 * @param group counter group. 486 * @param name counter name. 487 * @param count increment to add to the counter. 488 */ 489 public void incr(String group, String name, long count) { 490 Map<String, Element<Long>> map = counters.get(group); 491 if (map == null) { 492 try { 493 counterLock.lock(); 494 map = counters.get(group); 495 if (map == null) { 496 map = new HashMap<String, Element<Long>>(); 497 counters.put(group, map); 498 } 499 } 500 finally { 501 counterLock.unlock(); 502 } 503 } 504 Counter counter = (Counter) map.get(name); 505 if (counter == null) { 506 try { 507 counterLock.lock(); 508 counter = (Counter) map.get(name); 509 if (counter == null) { 510 counter = new Counter(); 511 map.put(name, counter); 512 } 513 } 514 finally { 515 counterLock.unlock(); 516 } 517 } 518 counter.addAndGet(count); 519 } 520 521 /** 522 * Interface for instrumentation variables. <p/> For example a the database service could expose the number of 523 * currently active connections. 524 */ 525 public interface Variable<T> extends Element<T> { 526 } 527 528 /** 529 * Add an instrumentation variable. The variable must not exist. <p/> This method is thread safe. 530 * 531 * @param group counter group. 532 * @param name counter name. 533 * @param variable variable to add. 534 */ 535 @SuppressWarnings("unchecked") 536 public void addVariable(String group, String name, Variable variable) { 537 Map<String, Element<Variable>> map = variables.get(group); 538 if (map == null) { 539 try { 540 variableLock.lock(); 541 map = variables.get(group); 542 if (map == null) { 543 map = new HashMap<String, Element<Variable>>(); 544 variables.put(group, map); 545 } 546 } 547 finally { 548 variableLock.unlock(); 549 } 550 } 551 if (map.containsKey(name)) { 552 throw new RuntimeException(XLog.format("Variable group=[{0}] name=[{1}] already defined", group, name)); 553 } 554 map.put(name, variable); 555 } 556 557 /** 558 * Return the JVM system properties. 559 * 560 * @return JVM system properties. 561 */ 562 @SuppressWarnings("unchecked") 563 public Map<String, String> getJavaSystemProperties() { 564 return (Map<String, String>) (Object) System.getProperties(); 565 } 566 567 /** 568 * Return the OS environment used to start Oozie. 569 * 570 * @return the OS environment used to start Oozie. 571 */ 572 public Map<String, String> getOSEnv() { 573 return System.getenv(); 574 } 575 576 /** 577 * Return the current system configuration as a Map<String,String>. 578 * 579 * @return the current system configuration as a Map<String,String>. 580 */ 581 public Map<String, String> getConfiguration() { 582 final Configuration maskedConf = Services.get().get(ConfigurationService.class).getMaskedConfiguration(); 583 584 return new Map<String, String>() { 585 public int size() { 586 return maskedConf.size(); 587 } 588 589 public boolean isEmpty() { 590 return maskedConf.size() == 0; 591 } 592 593 public boolean containsKey(Object o) { 594 return maskedConf.get((String) o) != null; 595 } 596 597 public boolean containsValue(Object o) { 598 throw new UnsupportedOperationException(); 599 } 600 601 public String get(Object o) { 602 return maskedConf.get((String) o); 603 } 604 605 public String put(String s, String s1) { 606 throw new UnsupportedOperationException(); 607 } 608 609 public String remove(Object o) { 610 throw new UnsupportedOperationException(); 611 } 612 613 public void putAll(Map<? extends String, ? extends String> map) { 614 throw new UnsupportedOperationException(); 615 } 616 617 public void clear() { 618 throw new UnsupportedOperationException(); 619 } 620 621 public Set<String> keySet() { 622 Set<String> set = new LinkedHashSet<String>(); 623 for (Entry<String, String> entry : maskedConf) { 624 set.add(entry.getKey()); 625 } 626 return set; 627 } 628 629 public Collection<String> values() { 630 Set<String> set = new LinkedHashSet<String>(); 631 for (Entry<String, String> entry : maskedConf) { 632 set.add(entry.getValue()); 633 } 634 return set; 635 } 636 637 public Set<Entry<String, String>> entrySet() { 638 Set<Entry<String, String>> set = new LinkedHashSet<Entry<String, String>>(); 639 for (Entry<String, String> entry : maskedConf) { 640 set.add(entry); 641 } 642 return set; 643 } 644 }; 645 } 646 647 /** 648 * Return all the counters. <p/> This method is thread safe. <p/> The counters are live. The counter value is a 649 * snapshot at the time the {@link Instrumentation.Element#getValue()} is invoked. 650 * 651 * @return all counters. 652 */ 653 public Map<String, Map<String, Element<Long>>> getCounters() { 654 return counters; 655 } 656 657 /** 658 * Return all the timers. <p/> This method is thread safe. <p/> The timers are live. Once a timer is obtained, all 659 * its values are consistent (they are snapshot at the time the {@link Instrumentation.Element#getValue()} is 660 * invoked. 661 * 662 * @return all counters. 663 */ 664 public Map<String, Map<String, Element<Timer>>> getTimers() { 665 return timers; 666 } 667 668 /** 669 * Return all the variables. <p/> This method is thread safe. <p/> The variables are live. The variable value is a 670 * snapshot at the time the {@link Instrumentation.Element#getValue()} is invoked. 671 * 672 * @return all counters. 673 */ 674 public Map<String, Map<String, Element<Variable>>> getVariables() { 675 return variables; 676 } 677 678 /** 679 * Return a map containing all variables, counters and timers. 680 * 681 * @return a map containing all variables, counters and timers. 682 */ 683 public Map<String, Map<String, Map<String, Object>>> getAll() { 684 return all; 685 } 686 687 /** 688 * Return the string representation of the instrumentation. 689 * 690 * @return the string representation of the instrumentation. 691 */ 692 public String toString() { 693 String E = System.getProperty("line.separator"); 694 StringBuilder sb = new StringBuilder(4096); 695 for (String element : all.keySet()) { 696 sb.append(element).append(':').append(E); 697 List<String> groups = new ArrayList<String>(all.get(element).keySet()); 698 Collections.sort(groups); 699 for (String group : groups) { 700 sb.append(" ").append(group).append(':').append(E); 701 List<String> names = new ArrayList<String>(all.get(element).get(group).keySet()); 702 Collections.sort(names); 703 for (String name : names) { 704 sb.append(" ").append(name).append(": ").append(((Element) all.get(element). 705 get(group).get(name)).getValue()).append(E); 706 } 707 } 708 } 709 return sb.toString(); 710 } 711 712 private static class Sampler implements Element<Double>, Runnable { 713 private Lock lock = new ReentrantLock(); 714 private int samplingInterval; 715 private Variable<Long> variable; 716 private long[] values; 717 private int current; 718 private long valuesSum; 719 private double rate; 720 721 public Sampler(int samplingPeriod, int samplingInterval, Variable<Long> variable) { 722 this.samplingInterval = samplingInterval; 723 this.variable = variable; 724 values = new long[samplingPeriod / samplingInterval]; 725 valuesSum = 0; 726 current = -1; 727 } 728 729 public int getSamplingInterval() { 730 return samplingInterval; 731 } 732 733 public void run() { 734 try { 735 lock.lock(); 736 long newValue = variable.getValue(); 737 if (current == -1) { 738 valuesSum = newValue; 739 current = 0; 740 values[current] = newValue; 741 } 742 else { 743 current = (current + 1) % values.length; 744 valuesSum = valuesSum - values[current] + newValue; 745 values[current] = newValue; 746 } 747 rate = ((double) valuesSum) / values.length; 748 } 749 finally { 750 lock.unlock(); 751 } 752 } 753 754 public Double getValue() { 755 return rate; 756 } 757 } 758 759 /** 760 * Add a sampling variable. <p/> This method is thread safe. 761 * 762 * @param group timer group. 763 * @param name timer name. 764 * @param period sampling period to compute rate. 765 * @param interval sampling frequency, how often the variable is probed. 766 * @param variable variable to sample. 767 */ 768 public void addSampler(String group, String name, int period, int interval, Variable<Long> variable) { 769 if (scheduler == null) { 770 throw new IllegalStateException("scheduler not set, cannot sample"); 771 } 772 try { 773 samplerLock.lock(); 774 Map<String, Element<Double>> map = samplers.get(group); 775 if (map == null) { 776 map = samplers.get(group); 777 if (map == null) { 778 map = new HashMap<String, Element<Double>>(); 779 samplers.put(group, map); 780 } 781 } 782 if (map.containsKey(name)) { 783 throw new RuntimeException(XLog.format("Sampler group=[{0}] name=[{1}] already defined", group, name)); 784 } 785 Sampler sampler = new Sampler(period, interval, variable); 786 map.put(name, sampler); 787 scheduler.scheduleAtFixedRate(sampler, 0, sampler.getSamplingInterval(), TimeUnit.SECONDS); 788 } 789 finally { 790 samplerLock.unlock(); 791 } 792 } 793 794 /** 795 * Return all the samplers. <p/> This method is thread safe. <p/> The samplers are live. The sampler value is a 796 * snapshot at the time the {@link Instrumentation.Element#getValue()} is invoked. 797 * 798 * @return all counters. 799 */ 800 public Map<String, Map<String, Element<Double>>> getSamplers() { 801 return samplers; 802 } 803 804}