001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.util; 019 020 import org.apache.hadoop.conf.Configuration; 021 import org.apache.oozie.service.ConfigurationService; 022 023 import java.util.ArrayList; 024 import java.util.Collection; 025 import java.util.Collections; 026 import java.util.HashMap; 027 import java.util.LinkedHashMap; 028 import java.util.LinkedHashSet; 029 import java.util.List; 030 import java.util.Map; 031 import java.util.Set; 032 import java.util.concurrent.ConcurrentHashMap; 033 import java.util.concurrent.ScheduledExecutorService; 034 import java.util.concurrent.TimeUnit; 035 import java.util.concurrent.atomic.AtomicLong; 036 import java.util.concurrent.locks.Lock; 037 import java.util.concurrent.locks.ReentrantLock; 038 039 /** 040 * Instrumentation framework that supports Timers, Counters, Variables and Sampler instrumentation elements. <p/> All 041 * instrumentation elements have a group and a name. 042 */ 043 public class Instrumentation { 044 private ScheduledExecutorService scheduler; 045 private Lock counterLock; 046 private Lock timerLock; 047 private Lock variableLock; 048 private Lock samplerLock; 049 private Configuration configuration; 050 private Map<String, Map<String, Map<String, Object>>> all; 051 private Map<String, Map<String, Element<Long>>> counters; 052 private Map<String, Map<String, Element<Timer>>> timers; 053 private Map<String, Map<String, Element<Variable>>> variables; 054 private Map<String, Map<String, Element<Double>>> samplers; 055 056 /** 057 * Instrumentation constructor. 058 */ 059 @SuppressWarnings("unchecked") 060 public Instrumentation() { 061 counterLock = new ReentrantLock(); 062 timerLock = new ReentrantLock(); 063 variableLock = new ReentrantLock(); 064 samplerLock = new ReentrantLock(); 065 all = new LinkedHashMap<String, Map<String, Map<String, Object>>>(); 066 counters = new ConcurrentHashMap<String, Map<String, Element<Long>>>(); 067 timers = new ConcurrentHashMap<String, Map<String, Element<Timer>>>(); 068 variables = new ConcurrentHashMap<String, Map<String, Element<Variable>>>(); 069 samplers = new ConcurrentHashMap<String, Map<String, Element<Double>>>(); 070 all.put("variables", (Map<String, Map<String, Object>>) (Object) variables); 071 all.put("samplers", (Map<String, Map<String, Object>>) (Object) samplers); 072 all.put("counters", (Map<String, Map<String, Object>>) (Object) counters); 073 all.put("timers", (Map<String, Map<String, Object>>) (Object) timers); 074 } 075 076 /** 077 * Set the scheduler instance to handle the samplers. 078 * 079 * @param scheduler scheduler instance. 080 */ 081 public void setScheduler(ScheduledExecutorService scheduler) { 082 this.scheduler = scheduler; 083 } 084 085 /** 086 * Cron is a stopwatch that can be started/stopped several times. <p/> This class is not thread safe, it does not 087 * need to be. <p/> It keeps track of the total time (first start to last stop) and the running time (total time 088 * minus the stopped intervals). <p/> Once a Cron is complete it must be added to the corresponding group/name in a 089 * Instrumentation instance. 090 */ 091 public static class Cron { 092 private long start; 093 private long end; 094 private long lapStart; 095 private long own; 096 private long total; 097 private boolean running; 098 099 /** 100 * Creates new Cron, stopped, in zero. 101 */ 102 public Cron() { 103 running = false; 104 } 105 106 /** 107 * Start the cron. It cannot be already started. 108 */ 109 public void start() { 110 if (!running) { 111 if (lapStart == 0) { 112 lapStart = System.currentTimeMillis(); 113 if (start == 0) { 114 start = lapStart; 115 end = start; 116 } 117 } 118 running = true; 119 } 120 } 121 122 /** 123 * Stops the cron. It cannot be already stopped. 124 */ 125 public void stop() { 126 if (running) { 127 end = System.currentTimeMillis(); 128 if (start == 0) { 129 start = end; 130 } 131 total = end - start; 132 if (lapStart > 0) { 133 own += end - lapStart; 134 lapStart = 0; 135 } 136 running = false; 137 } 138 } 139 140 /** 141 * Return the start time of the cron. It must be stopped. 142 * 143 * @return the start time of the cron. 144 */ 145 public long getStart() { 146 if (running) { 147 throw new IllegalStateException("Timer running"); 148 } 149 return start; 150 } 151 152 /** 153 * Return the end time of the cron. It must be stopped. 154 * 155 * @return the end time of the cron. 156 */ 157 public long getEnd() { 158 if (running) { 159 throw new IllegalStateException("Timer running"); 160 } 161 return end; 162 } 163 164 /** 165 * Return the total time of the cron. It must be stopped. 166 * 167 * @return the total time of the cron. 168 */ 169 public long getTotal() { 170 if (running) { 171 throw new IllegalStateException("Timer running"); 172 } 173 return total; 174 } 175 176 /** 177 * Return the own time of the cron. It must be stopped. 178 * 179 * @return the own time of the cron. 180 */ 181 public long getOwn() { 182 if (running) { 183 throw new IllegalStateException("Timer running"); 184 } 185 return own; 186 } 187 188 } 189 190 /** 191 * Gives access to a snapshot of an Instrumentation element (Counter, Timer). <p/> Instrumentation element snapshots 192 * are returned by the {@link Instrumentation#getCounters()} and {@link Instrumentation#getTimers()} ()} methods. 193 */ 194 public interface Element<T> { 195 196 /** 197 * Return the snapshot value of the Intrumentation element. 198 * 199 * @return the snapshot value of the Intrumentation element. 200 */ 201 T getValue(); 202 } 203 204 /** 205 * Counter Instrumentation element. 206 */ 207 private static class Counter extends AtomicLong implements Element<Long> { 208 209 /** 210 * Return the counter snapshot. 211 * 212 * @return the counter snapshot. 213 */ 214 public Long getValue() { 215 return get(); 216 } 217 218 /** 219 * Return the String representation of the counter value. 220 * 221 * @return the String representation of the counter value. 222 */ 223 public String toString() { 224 return Long.toString(get()); 225 } 226 227 } 228 229 /** 230 * Timer Instrumentation element. 231 */ 232 public static class Timer implements Element<Timer> { 233 Lock lock = new ReentrantLock(); 234 private long ownTime; 235 private long totalTime; 236 private long ticks; 237 private long ownSquareTime; 238 private long totalSquareTime; 239 private long ownMinTime; 240 private long ownMaxTime; 241 private long totalMinTime; 242 private long totalMaxTime; 243 244 /** 245 * Timer constructor. <p/> It is project private for test purposes. 246 */ 247 Timer() { 248 } 249 250 /** 251 * Return the String representation of the timer value. 252 * 253 * @return the String representation of the timer value. 254 */ 255 public String toString() { 256 return XLog.format("ticks[{0}] totalAvg[{1}] ownAvg[{2}]", ticks, getTotalAvg(), getOwnAvg()); 257 } 258 259 /** 260 * Return the timer snapshot. 261 * 262 * @return the timer snapshot. 263 */ 264 public Timer getValue() { 265 try { 266 lock.lock(); 267 Timer timer = new Timer(); 268 timer.ownTime = ownTime; 269 timer.totalTime = totalTime; 270 timer.ticks = ticks; 271 timer.ownSquareTime = ownSquareTime; 272 timer.totalSquareTime = totalSquareTime; 273 timer.ownMinTime = ownMinTime; 274 timer.ownMaxTime = ownMaxTime; 275 timer.totalMinTime = totalMinTime; 276 timer.totalMaxTime = totalMaxTime; 277 return timer; 278 } 279 finally { 280 lock.unlock(); 281 } 282 } 283 284 /** 285 * Add a cron to a timer. <p/> It is project private for test purposes. 286 * 287 * @param cron Cron to add. 288 */ 289 void addCron(Cron cron) { 290 try { 291 lock.lock(); 292 long own = cron.getOwn(); 293 long total = cron.getTotal(); 294 ownTime += own; 295 totalTime += total; 296 ticks++; 297 ownSquareTime += own * own; 298 totalSquareTime += total * total; 299 if (ticks == 1) { 300 ownMinTime = own; 301 ownMaxTime = own; 302 totalMinTime = total; 303 totalMaxTime = total; 304 } 305 else { 306 ownMinTime = Math.min(ownMinTime, own); 307 ownMaxTime = Math.max(ownMaxTime, own); 308 totalMinTime = Math.min(totalMinTime, total); 309 totalMaxTime = Math.max(totalMaxTime, total); 310 } 311 } 312 finally { 313 lock.unlock(); 314 } 315 } 316 317 /** 318 * Return the own accumulated computing time by the timer. 319 * 320 * @return own accumulated computing time by the timer. 321 */ 322 public long getOwn() { 323 return ownTime; 324 } 325 326 /** 327 * Return the total accumulated computing time by the timer. 328 * 329 * @return total accumulated computing time by the timer. 330 */ 331 public long getTotal() { 332 return totalTime; 333 } 334 335 /** 336 * Return the number of times a cron was added to the timer. 337 * 338 * @return the number of times a cron was added to the timer. 339 */ 340 public long getTicks() { 341 return ticks; 342 } 343 344 /** 345 * Return the sum of the square own times. <p/> It can be used to calculate the standard deviation. 346 * 347 * @return the sum of the square own timer. 348 */ 349 public long getOwnSquareSum() { 350 return ownSquareTime; 351 } 352 353 /** 354 * Return the sum of the square total times. <p/> It can be used to calculate the standard deviation. 355 * 356 * @return the sum of the square own timer. 357 */ 358 public long getTotalSquareSum() { 359 return totalSquareTime; 360 } 361 362 /** 363 * Returns the own minimum time. 364 * 365 * @return the own minimum time. 366 */ 367 public long getOwnMin() { 368 return ownMinTime; 369 } 370 371 /** 372 * Returns the own maximum time. 373 * 374 * @return the own maximum time. 375 */ 376 public long getOwnMax() { 377 return ownMaxTime; 378 } 379 380 /** 381 * Returns the total minimum time. 382 * 383 * @return the total minimum time. 384 */ 385 public long getTotalMin() { 386 return totalMinTime; 387 } 388 389 /** 390 * Returns the total maximum time. 391 * 392 * @return the total maximum time. 393 */ 394 public long getTotalMax() { 395 return totalMaxTime; 396 } 397 398 /** 399 * Returns the own average time. 400 * 401 * @return the own average time. 402 */ 403 public long getOwnAvg() { 404 return (ticks != 0) ? ownTime / ticks : 0; 405 } 406 407 /** 408 * Returns the total average time. 409 * 410 * @return the total average time. 411 */ 412 public long getTotalAvg() { 413 return (ticks != 0) ? totalTime / ticks : 0; 414 } 415 416 /** 417 * Returns the total time standard deviation. 418 * 419 * @return the total time standard deviation. 420 */ 421 public double getTotalStdDev() { 422 return evalStdDev(ticks, totalTime, totalSquareTime); 423 } 424 425 /** 426 * Returns the own time standard deviation. 427 * 428 * @return the own time standard deviation. 429 */ 430 public double getOwnStdDev() { 431 return evalStdDev(ticks, ownTime, ownSquareTime); 432 } 433 434 private double evalStdDev(long n, long sn, long ssn) { 435 return (n < 2) ? -1 : Math.sqrt((n * ssn - sn * sn) / (n * (n - 1))); 436 } 437 438 } 439 440 /** 441 * Add a cron to an instrumentation timer. The timer is created if it does not exists. <p/> This method is thread 442 * safe. 443 * 444 * @param group timer group. 445 * @param name timer name. 446 * @param cron cron to add to the timer. 447 */ 448 public void addCron(String group, String name, Cron cron) { 449 Map<String, Element<Timer>> map = timers.get(group); 450 if (map == null) { 451 try { 452 timerLock.lock(); 453 map = timers.get(group); 454 if (map == null) { 455 map = new HashMap<String, Element<Timer>>(); 456 timers.put(group, map); 457 } 458 } 459 finally { 460 timerLock.unlock(); 461 } 462 } 463 Timer timer = (Timer) map.get(name); 464 if (timer == null) { 465 try { 466 timerLock.lock(); 467 timer = (Timer) map.get(name); 468 if (timer == null) { 469 timer = new Timer(); 470 map.put(name, timer); 471 } 472 } 473 finally { 474 timerLock.unlock(); 475 } 476 } 477 timer.addCron(cron); 478 } 479 480 /** 481 * Increment an instrumentation counter. The counter is created if it does not exists. <p/> This method is thread 482 * safe. 483 * 484 * @param group counter group. 485 * @param name counter name. 486 * @param count increment to add to the counter. 487 */ 488 public void incr(String group, String name, long count) { 489 Map<String, Element<Long>> map = counters.get(group); 490 if (map == null) { 491 try { 492 counterLock.lock(); 493 map = counters.get(group); 494 if (map == null) { 495 map = new HashMap<String, Element<Long>>(); 496 counters.put(group, map); 497 } 498 } 499 finally { 500 counterLock.unlock(); 501 } 502 } 503 Counter counter = (Counter) map.get(name); 504 if (counter == null) { 505 try { 506 counterLock.lock(); 507 counter = (Counter) map.get(name); 508 if (counter == null) { 509 counter = new Counter(); 510 map.put(name, counter); 511 } 512 } 513 finally { 514 counterLock.unlock(); 515 } 516 } 517 counter.addAndGet(count); 518 } 519 520 /** 521 * Interface for instrumentation variables. <p/> For example a the database service could expose the number of 522 * currently active connections. 523 */ 524 public interface Variable<T> extends Element<T> { 525 } 526 527 /** 528 * Add an instrumentation variable. The variable must not exist. <p/> This method is thread safe. 529 * 530 * @param group counter group. 531 * @param name counter name. 532 * @param variable variable to add. 533 */ 534 @SuppressWarnings("unchecked") 535 public void addVariable(String group, String name, Variable variable) { 536 Map<String, Element<Variable>> map = variables.get(group); 537 if (map == null) { 538 try { 539 variableLock.lock(); 540 map = variables.get(group); 541 if (map == null) { 542 map = new HashMap<String, Element<Variable>>(); 543 variables.put(group, map); 544 } 545 } 546 finally { 547 variableLock.unlock(); 548 } 549 } 550 if (map.containsKey(name)) { 551 throw new RuntimeException(XLog.format("Variable group=[{0}] name=[{1}] already defined", group, name)); 552 } 553 map.put(name, variable); 554 } 555 556 /** 557 * Set the system configuration. 558 * 559 * @param configuration system configuration. 560 */ 561 public void setConfiguration(Configuration configuration) { 562 this.configuration = configuration; 563 } 564 565 /** 566 * Return the JVM system properties. 567 * 568 * @return JVM system properties. 569 */ 570 @SuppressWarnings("unchecked") 571 public Map<String, String> getJavaSystemProperties() { 572 return (Map<String, String>) (Object) System.getProperties(); 573 } 574 575 /** 576 * Return the OS environment used to start Oozie. 577 * 578 * @return the OS environment used to start Oozie. 579 */ 580 public Map<String, String> getOSEnv() { 581 return System.getenv(); 582 } 583 584 /** 585 * Return the current system configuration as a Map<String,String>. 586 * 587 * @return the current system configuration as a Map<String,String>. 588 */ 589 public Map<String, String> getConfiguration() { 590 final Configuration maskedConf = ConfigurationService.maskPasswords(configuration); 591 592 return new Map<String, String>() { 593 public int size() { 594 return maskedConf.size(); 595 } 596 597 public boolean isEmpty() { 598 return maskedConf.size() == 0; 599 } 600 601 public boolean containsKey(Object o) { 602 return maskedConf.get((String) o) != null; 603 } 604 605 public boolean containsValue(Object o) { 606 throw new UnsupportedOperationException(); 607 } 608 609 public String get(Object o) { 610 return maskedConf.get((String) o); 611 } 612 613 public String put(String s, String s1) { 614 throw new UnsupportedOperationException(); 615 } 616 617 public String remove(Object o) { 618 throw new UnsupportedOperationException(); 619 } 620 621 public void putAll(Map<? extends String, ? extends String> map) { 622 throw new UnsupportedOperationException(); 623 } 624 625 public void clear() { 626 throw new UnsupportedOperationException(); 627 } 628 629 public Set<String> keySet() { 630 Set<String> set = new LinkedHashSet<String>(); 631 for (Entry<String, String> entry : maskedConf) { 632 set.add(entry.getKey()); 633 } 634 return set; 635 } 636 637 public Collection<String> values() { 638 Set<String> set = new LinkedHashSet<String>(); 639 for (Entry<String, String> entry : maskedConf) { 640 set.add(entry.getValue()); 641 } 642 return set; 643 } 644 645 public Set<Entry<String, String>> entrySet() { 646 Set<Entry<String, String>> set = new LinkedHashSet<Entry<String, String>>(); 647 for (Entry<String, String> entry : maskedConf) { 648 set.add(entry); 649 } 650 return set; 651 } 652 }; 653 } 654 655 /** 656 * Return all the counters. <p/> This method is thread safe. <p/> The counters are live. The counter value is a 657 * snapshot at the time the {@link Instrumentation.Element#getValue()} is invoked. 658 * 659 * @return all counters. 660 */ 661 public Map<String, Map<String, Element<Long>>> getCounters() { 662 return counters; 663 } 664 665 /** 666 * Return all the timers. <p/> This method is thread safe. <p/> The timers are live. Once a timer is obtained, all 667 * its values are consistent (they are snapshot at the time the {@link Instrumentation.Element#getValue()} is 668 * invoked. 669 * 670 * @return all counters. 671 */ 672 public Map<String, Map<String, Element<Timer>>> getTimers() { 673 return timers; 674 } 675 676 /** 677 * Return all the variables. <p/> This method is thread safe. <p/> The variables are live. The variable value is a 678 * snapshot at the time the {@link Instrumentation.Element#getValue()} is invoked. 679 * 680 * @return all counters. 681 */ 682 public Map<String, Map<String, Element<Variable>>> getVariables() { 683 return variables; 684 } 685 686 /** 687 * Return a map containing all variables, counters and timers. 688 * 689 * @return a map containing all variables, counters and timers. 690 */ 691 public Map<String, Map<String, Map<String, Object>>> getAll() { 692 return all; 693 } 694 695 /** 696 * Return the string representation of the instrumentation. 697 * 698 * @return the string representation of the instrumentation. 699 */ 700 public String toString() { 701 String E = System.getProperty("line.separator"); 702 StringBuilder sb = new StringBuilder(4096); 703 for (String element : all.keySet()) { 704 sb.append(element).append(':').append(E); 705 List<String> groups = new ArrayList<String>(all.get(element).keySet()); 706 Collections.sort(groups); 707 for (String group : groups) { 708 sb.append(" ").append(group).append(':').append(E); 709 List<String> names = new ArrayList<String>(all.get(element).get(group).keySet()); 710 Collections.sort(names); 711 for (String name : names) { 712 sb.append(" ").append(name).append(": ").append(((Element) all.get(element). 713 get(group).get(name)).getValue()).append(E); 714 } 715 } 716 } 717 return sb.toString(); 718 } 719 720 private static class Sampler implements Element<Double>, Runnable { 721 private Lock lock = new ReentrantLock(); 722 private int samplingInterval; 723 private Variable<Long> variable; 724 private long[] values; 725 private int current; 726 private long valuesSum; 727 private double rate; 728 729 public Sampler(int samplingPeriod, int samplingInterval, Variable<Long> variable) { 730 this.samplingInterval = samplingInterval; 731 this.variable = variable; 732 values = new long[samplingPeriod / samplingInterval]; 733 valuesSum = 0; 734 current = -1; 735 } 736 737 public int getSamplingInterval() { 738 return samplingInterval; 739 } 740 741 public void run() { 742 try { 743 lock.lock(); 744 long newValue = variable.getValue(); 745 if (current == -1) { 746 valuesSum = newValue; 747 current = 0; 748 values[current] = newValue; 749 } 750 else { 751 current = (current + 1) % values.length; 752 valuesSum = valuesSum - values[current] + newValue; 753 values[current] = newValue; 754 } 755 rate = ((double) valuesSum) / values.length; 756 } 757 finally { 758 lock.unlock(); 759 } 760 } 761 762 public Double getValue() { 763 return rate; 764 } 765 } 766 767 /** 768 * Add a sampling variable. <p/> This method is thread safe. 769 * 770 * @param group timer group. 771 * @param name timer name. 772 * @param period sampling period to compute rate. 773 * @param interval sampling frequency, how often the variable is probed. 774 * @param variable variable to sample. 775 */ 776 public void addSampler(String group, String name, int period, int interval, Variable<Long> variable) { 777 if (scheduler == null) { 778 throw new IllegalStateException("scheduler not set, cannot sample"); 779 } 780 try { 781 samplerLock.lock(); 782 Map<String, Element<Double>> map = samplers.get(group); 783 if (map == null) { 784 map = samplers.get(group); 785 if (map == null) { 786 map = new HashMap<String, Element<Double>>(); 787 samplers.put(group, map); 788 } 789 } 790 if (map.containsKey(name)) { 791 throw new RuntimeException(XLog.format("Sampler group=[{0}] name=[{1}] already defined", group, name)); 792 } 793 Sampler sampler = new Sampler(period, interval, variable); 794 map.put(name, sampler); 795 scheduler.scheduleAtFixedRate(sampler, 0, sampler.getSamplingInterval(), TimeUnit.SECONDS); 796 } 797 finally { 798 samplerLock.unlock(); 799 } 800 } 801 802 /** 803 * Return all the samplers. <p/> This method is thread safe. <p/> The samplers are live. The sampler value is a 804 * snapshot at the time the {@link Instrumentation.Element#getValue()} is invoked. 805 * 806 * @return all counters. 807 */ 808 public Map<String, Map<String, Element<Double>>> getSamplers() { 809 return samplers; 810 } 811 812 }