001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.util; 020 021import com.google.common.collect.Maps; 022import org.apache.hadoop.conf.Configuration; 023import org.apache.oozie.action.hadoop.PasswordMasker; 024import org.apache.oozie.service.ConfigurationService; 025import org.apache.oozie.service.Services; 026 027import java.util.ArrayList; 028import java.util.Collection; 029import java.util.Collections; 030import java.util.HashMap; 031import java.util.LinkedHashMap; 032import java.util.LinkedHashSet; 033import java.util.List; 034import java.util.Map; 035import java.util.Set; 036import java.util.concurrent.ConcurrentHashMap; 037import java.util.concurrent.ScheduledExecutorService; 038import java.util.concurrent.TimeUnit; 039import java.util.concurrent.atomic.AtomicLong; 040import java.util.concurrent.locks.Lock; 041import java.util.concurrent.locks.ReentrantLock; 042 043/** 044 * Instrumentation framework that supports Timers, Counters, Variables and Sampler instrumentation elements. <p> All 045 * instrumentation elements have a group and a name. 046 */ 047public class Instrumentation { 048 private ScheduledExecutorService scheduler; 049 private Lock counterLock; 050 private Lock timerLock; 051 private Lock variableLock; 052 private Lock samplerLock; 053 private Map<String, Map<String, Map<String, Object>>> all; 054 private Map<String, Map<String, Element<Long>>> counters; 055 private Map<String, Map<String, Element<Timer>>> timers; 056 private Map<String, Map<String, Element<Variable>>> variables; 057 private Map<String, Map<String, Element<Double>>> samplers; 058 059 /** 060 * Instrumentation constructor. 061 */ 062 @SuppressWarnings("unchecked") 063 public Instrumentation() { 064 counterLock = new ReentrantLock(); 065 timerLock = new ReentrantLock(); 066 variableLock = new ReentrantLock(); 067 samplerLock = new ReentrantLock(); 068 all = new LinkedHashMap<String, Map<String, Map<String, Object>>>(); 069 counters = new ConcurrentHashMap<String, Map<String, Element<Long>>>(); 070 timers = new ConcurrentHashMap<String, Map<String, Element<Timer>>>(); 071 variables = new ConcurrentHashMap<String, Map<String, Element<Variable>>>(); 072 samplers = new ConcurrentHashMap<String, Map<String, Element<Double>>>(); 073 all.put("variables", (Map<String, Map<String, Object>>) (Object) variables); 074 all.put("samplers", (Map<String, Map<String, Object>>) (Object) samplers); 075 all.put("counters", (Map<String, Map<String, Object>>) (Object) counters); 076 all.put("timers", (Map<String, Map<String, Object>>) (Object) timers); 077 } 078 079 /** 080 * Set the scheduler instance to handle the samplers. 081 * 082 * @param scheduler scheduler instance. 083 */ 084 public void setScheduler(ScheduledExecutorService scheduler) { 085 this.scheduler = scheduler; 086 } 087 088 /** 089 * Cron is a stopwatch that can be started/stopped several times. <p> This class is not thread safe, it does not 090 * need to be. <p> It keeps track of the total time (first start to last stop) and the running time (total time 091 * minus the stopped intervals). <p> Once a Cron is complete it must be added to the corresponding group/name in a 092 * Instrumentation instance. 093 */ 094 public static class Cron { 095 private long start; 096 private long end; 097 private long lapStart; 098 private long own; 099 private long total; 100 private boolean running; 101 102 /** 103 * Creates new Cron, stopped, in zero. 104 */ 105 public Cron() { 106 running = false; 107 } 108 109 /** 110 * Start the cron. It cannot be already started. 111 */ 112 public void start() { 113 if (!running) { 114 if (lapStart == 0) { 115 lapStart = System.currentTimeMillis(); 116 if (start == 0) { 117 start = lapStart; 118 end = start; 119 } 120 } 121 running = true; 122 } 123 } 124 125 /** 126 * Stops the cron. It cannot be already stopped. 127 */ 128 public void stop() { 129 if (running) { 130 end = System.currentTimeMillis(); 131 if (start == 0) { 132 start = end; 133 } 134 total = end - start; 135 if (lapStart > 0) { 136 own += end - lapStart; 137 lapStart = 0; 138 } 139 running = false; 140 } 141 } 142 143 /** 144 * Return the start time of the cron. It must be stopped. 145 * 146 * @return the start time of the cron. 147 */ 148 public long getStart() { 149 if (running) { 150 throw new IllegalStateException("Timer running"); 151 } 152 return start; 153 } 154 155 /** 156 * Return the end time of the cron. It must be stopped. 157 * 158 * @return the end time of the cron. 159 */ 160 public long getEnd() { 161 if (running) { 162 throw new IllegalStateException("Timer running"); 163 } 164 return end; 165 } 166 167 /** 168 * Return the total time of the cron. It must be stopped. 169 * 170 * @return the total time of the cron. 171 */ 172 public long getTotal() { 173 if (running) { 174 throw new IllegalStateException("Timer running"); 175 } 176 return total; 177 } 178 179 /** 180 * Return the own time of the cron. It must be stopped. 181 * 182 * @return the own time of the cron. 183 */ 184 public long getOwn() { 185 if (running) { 186 throw new IllegalStateException("Timer running"); 187 } 188 return own; 189 } 190 191 } 192 193 /** 194 * Gives access to a snapshot of an Instrumentation element (Counter, Timer). <p> Instrumentation element snapshots 195 * are returned by the {@link Instrumentation#getCounters()} and {@link Instrumentation#getTimers()} ()} methods. 196 */ 197 public interface Element<T> { 198 199 /** 200 * Return the snapshot value of the Intrumentation element. 201 * 202 * @return the snapshot value of the Intrumentation element. 203 */ 204 T getValue(); 205 } 206 207 /** 208 * Counter Instrumentation element. 209 */ 210 private static class Counter extends AtomicLong implements Element<Long> { 211 212 /** 213 * Return the counter snapshot. 214 * 215 * @return the counter snapshot. 216 */ 217 public Long getValue() { 218 return get(); 219 } 220 221 /** 222 * Return the String representation of the counter value. 223 * 224 * @return the String representation of the counter value. 225 */ 226 public String toString() { 227 return Long.toString(get()); 228 } 229 230 } 231 232 /** 233 * Timer Instrumentation element. 234 */ 235 public static class Timer implements Element<Timer> { 236 Lock lock = new ReentrantLock(); 237 private long ownTime; 238 private long totalTime; 239 private long ticks; 240 private long ownSquareTime; 241 private long totalSquareTime; 242 private long ownMinTime; 243 private long ownMaxTime; 244 private long totalMinTime; 245 private long totalMaxTime; 246 247 /** 248 * Timer constructor. <p> It is project private for test purposes. 249 */ 250 Timer() { 251 } 252 253 /** 254 * Return the String representation of the timer value. 255 * 256 * @return the String representation of the timer value. 257 */ 258 public String toString() { 259 return XLog.format("ticks[{0}] totalAvg[{1}] ownAvg[{2}]", ticks, getTotalAvg(), getOwnAvg()); 260 } 261 262 /** 263 * Return the timer snapshot. 264 * 265 * @return the timer snapshot. 266 */ 267 public Timer getValue() { 268 try { 269 lock.lock(); 270 Timer timer = new Timer(); 271 timer.ownTime = ownTime; 272 timer.totalTime = totalTime; 273 timer.ticks = ticks; 274 timer.ownSquareTime = ownSquareTime; 275 timer.totalSquareTime = totalSquareTime; 276 timer.ownMinTime = ownMinTime; 277 timer.ownMaxTime = ownMaxTime; 278 timer.totalMinTime = totalMinTime; 279 timer.totalMaxTime = totalMaxTime; 280 return timer; 281 } 282 finally { 283 lock.unlock(); 284 } 285 } 286 287 /** 288 * Add a cron to a timer. <p> It is project private for test purposes. 289 * 290 * @param cron Cron to add. 291 */ 292 void addCron(Cron cron) { 293 try { 294 lock.lock(); 295 long own = cron.getOwn(); 296 long total = cron.getTotal(); 297 ownTime += own; 298 totalTime += total; 299 ticks++; 300 ownSquareTime += own * own; 301 totalSquareTime += total * total; 302 if (ticks == 1) { 303 ownMinTime = own; 304 ownMaxTime = own; 305 totalMinTime = total; 306 totalMaxTime = total; 307 } 308 else { 309 ownMinTime = Math.min(ownMinTime, own); 310 ownMaxTime = Math.max(ownMaxTime, own); 311 totalMinTime = Math.min(totalMinTime, total); 312 totalMaxTime = Math.max(totalMaxTime, total); 313 } 314 } 315 finally { 316 lock.unlock(); 317 } 318 } 319 320 /** 321 * Return the own accumulated computing time by the timer. 322 * 323 * @return own accumulated computing time by the timer. 324 */ 325 public long getOwn() { 326 return ownTime; 327 } 328 329 /** 330 * Return the total accumulated computing time by the timer. 331 * 332 * @return total accumulated computing time by the timer. 333 */ 334 public long getTotal() { 335 return totalTime; 336 } 337 338 /** 339 * Return the number of times a cron was added to the timer. 340 * 341 * @return the number of times a cron was added to the timer. 342 */ 343 public long getTicks() { 344 return ticks; 345 } 346 347 /** 348 * Return the sum of the square own times. <p> It can be used to calculate the standard deviation. 349 * 350 * @return the sum of the square own timer. 351 */ 352 public long getOwnSquareSum() { 353 return ownSquareTime; 354 } 355 356 /** 357 * Return the sum of the square total times. <p> It can be used to calculate the standard deviation. 358 * 359 * @return the sum of the square own timer. 360 */ 361 public long getTotalSquareSum() { 362 return totalSquareTime; 363 } 364 365 /** 366 * Returns the own minimum time. 367 * 368 * @return the own minimum time. 369 */ 370 public long getOwnMin() { 371 return ownMinTime; 372 } 373 374 /** 375 * Returns the own maximum time. 376 * 377 * @return the own maximum time. 378 */ 379 public long getOwnMax() { 380 return ownMaxTime; 381 } 382 383 /** 384 * Returns the total minimum time. 385 * 386 * @return the total minimum time. 387 */ 388 public long getTotalMin() { 389 return totalMinTime; 390 } 391 392 /** 393 * Returns the total maximum time. 394 * 395 * @return the total maximum time. 396 */ 397 public long getTotalMax() { 398 return totalMaxTime; 399 } 400 401 /** 402 * Returns the own average time. 403 * 404 * @return the own average time. 405 */ 406 public long getOwnAvg() { 407 return (ticks != 0) ? ownTime / ticks : 0; 408 } 409 410 /** 411 * Returns the total average time. 412 * 413 * @return the total average time. 414 */ 415 public long getTotalAvg() { 416 return (ticks != 0) ? totalTime / ticks : 0; 417 } 418 419 /** 420 * Returns the total time standard deviation. 421 * 422 * @return the total time standard deviation. 423 */ 424 public double getTotalStdDev() { 425 return evalStdDev(ticks, totalTime, totalSquareTime); 426 } 427 428 /** 429 * Returns the own time standard deviation. 430 * 431 * @return the own time standard deviation. 432 */ 433 public double getOwnStdDev() { 434 return evalStdDev(ticks, ownTime, ownSquareTime); 435 } 436 437 private double evalStdDev(long n, long sn, long ssn) { 438 return (n < 2) ? -1 : Math.sqrt((n * ssn - sn * sn) / (n * (n - 1))); 439 } 440 441 } 442 443 /** 444 * Add a cron to an instrumentation timer. The timer is created if it does not exists. <p> This method is thread 445 * safe. 446 * 447 * @param group timer group. 448 * @param name timer name. 449 * @param cron cron to add to the timer. 450 */ 451 public void addCron(String group, String name, Cron cron) { 452 Map<String, Element<Timer>> map = timers.get(group); 453 if (map == null) { 454 try { 455 timerLock.lock(); 456 map = timers.get(group); 457 if (map == null) { 458 map = new HashMap<String, Element<Timer>>(); 459 timers.put(group, map); 460 } 461 } 462 finally { 463 timerLock.unlock(); 464 } 465 } 466 Timer timer = (Timer) map.get(name); 467 if (timer == null) { 468 try { 469 timerLock.lock(); 470 timer = (Timer) map.get(name); 471 if (timer == null) { 472 timer = new Timer(); 473 map.put(name, timer); 474 } 475 } 476 finally { 477 timerLock.unlock(); 478 } 479 } 480 timer.addCron(cron); 481 } 482 483 /** 484 * Increment an instrumentation counter. The counter is created if it does not exists. <p> This method is thread 485 * safe. 486 * 487 * @param group counter group. 488 * @param name counter name. 489 * @param count increment to add to the counter. 490 */ 491 public void incr(String group, String name, long count) { 492 Map<String, Element<Long>> map = counters.get(group); 493 if (map == null) { 494 try { 495 counterLock.lock(); 496 map = counters.get(group); 497 if (map == null) { 498 map = new HashMap<String, Element<Long>>(); 499 counters.put(group, map); 500 } 501 } 502 finally { 503 counterLock.unlock(); 504 } 505 } 506 Counter counter = (Counter) map.get(name); 507 if (counter == null) { 508 try { 509 counterLock.lock(); 510 counter = (Counter) map.get(name); 511 if (counter == null) { 512 counter = new Counter(); 513 map.put(name, counter); 514 } 515 } 516 finally { 517 counterLock.unlock(); 518 } 519 } 520 counter.addAndGet(count); 521 } 522 523 /** 524 * Decrement an instrumentation counter. The counter is created if it does not exists. <p> This method is thread 525 * safe. 526 * 527 * @param group counter group. 528 * @param name counter name. 529 * @param count decrement to add to the counter. 530 */ 531 public void decr(final String group, final String name, final long count) { 532 incr(group, name, -count); 533 } 534 535 /** 536 * Interface for instrumentation variables. <p> For example a the database service could expose the number of 537 * currently active connections. 538 */ 539 public interface Variable<T> extends Element<T> { 540 } 541 542 /** 543 * Add an instrumentation variable. The variable must not exist. <p> This method is thread safe. 544 * 545 * @param group counter group. 546 * @param name counter name. 547 * @param variable variable to add. 548 */ 549 @SuppressWarnings("unchecked") 550 public void addVariable(String group, String name, Variable variable) { 551 Map<String, Element<Variable>> map = variables.get(group); 552 if (map == null) { 553 try { 554 variableLock.lock(); 555 map = variables.get(group); 556 if (map == null) { 557 map = new HashMap<String, Element<Variable>>(); 558 variables.put(group, map); 559 } 560 } 561 finally { 562 variableLock.unlock(); 563 } 564 } 565 if (map.containsKey(name)) { 566 throw new RuntimeException(XLog.format("Variable group=[{0}] name=[{1}] already defined", group, name)); 567 } 568 map.put(name, variable); 569 } 570 571 /** 572 * Return the JVM system properties. 573 * 574 * @return JVM system properties. 575 */ 576 public Map<String, String> getJavaSystemProperties() { 577 Map<String, String> unmasked = Maps.fromProperties(System.getProperties()); 578 return new PasswordMasker().mask(unmasked); 579 } 580 581 /** 582 * Return the OS environment used to start Oozie. 583 * 584 * @return the OS environment used to start Oozie. 585 */ 586 public Map<String, String> getOSEnv() { 587 Map<String, String> unmasked = System.getenv(); 588 return new PasswordMasker().mask(unmasked); 589 } 590 591 /** 592 * Return the current system configuration as a Map<String,String>. 593 * 594 * @return the current system configuration as a Map<String,String>. 595 */ 596 public Map<String, String> getConfiguration() { 597 final Configuration maskedConf = Services.get().get(ConfigurationService.class).getMaskedConfiguration(); 598 599 return new Map<String, String>() { 600 public int size() { 601 return maskedConf.size(); 602 } 603 604 public boolean isEmpty() { 605 return maskedConf.size() == 0; 606 } 607 608 public boolean containsKey(Object o) { 609 return maskedConf.get((String) o) != null; 610 } 611 612 public boolean containsValue(Object o) { 613 throw new UnsupportedOperationException(); 614 } 615 616 public String get(Object o) { 617 return maskedConf.get((String) o); 618 } 619 620 public String put(String s, String s1) { 621 throw new UnsupportedOperationException(); 622 } 623 624 public String remove(Object o) { 625 throw new UnsupportedOperationException(); 626 } 627 628 public void putAll(Map<? extends String, ? extends String> map) { 629 throw new UnsupportedOperationException(); 630 } 631 632 public void clear() { 633 throw new UnsupportedOperationException(); 634 } 635 636 public Set<String> keySet() { 637 Set<String> set = new LinkedHashSet<String>(); 638 for (Entry<String, String> entry : maskedConf) { 639 set.add(entry.getKey()); 640 } 641 return set; 642 } 643 644 public Collection<String> values() { 645 Set<String> set = new LinkedHashSet<String>(); 646 for (Entry<String, String> entry : maskedConf) { 647 set.add(entry.getValue()); 648 } 649 return set; 650 } 651 652 public Set<Entry<String, String>> entrySet() { 653 Set<Entry<String, String>> set = new LinkedHashSet<Entry<String, String>>(); 654 for (Entry<String, String> entry : maskedConf) { 655 set.add(entry); 656 } 657 return set; 658 } 659 }; 660 } 661 662 /** 663 * Return all the counters. <p> This method is thread safe. <p> The counters are live. The counter value is a 664 * snapshot at the time the {@link Instrumentation.Element#getValue()} is invoked. 665 * 666 * @return all counters. 667 */ 668 public Map<String, Map<String, Element<Long>>> getCounters() { 669 return counters; 670 } 671 672 /** 673 * Return all the timers. <p> This method is thread safe. <p> The timers are live. Once a timer is obtained, all 674 * its values are consistent (they are snapshot at the time the {@link Instrumentation.Element#getValue()} is 675 * invoked. 676 * 677 * @return all counters. 678 */ 679 public Map<String, Map<String, Element<Timer>>> getTimers() { 680 return timers; 681 } 682 683 /** 684 * Return all the variables. <p> This method is thread safe. <p> The variables are live. The variable value is a 685 * snapshot at the time the {@link Instrumentation.Element#getValue()} is invoked. 686 * 687 * @return all counters. 688 */ 689 public Map<String, Map<String, Element<Variable>>> getVariables() { 690 return variables; 691 } 692 693 /** 694 * Return a map containing all variables, counters and timers. 695 * 696 * @return a map containing all variables, counters and timers. 697 */ 698 public Map<String, Map<String, Map<String, Object>>> getAll() { 699 return all; 700 } 701 702 /** 703 * Return the string representation of the instrumentation. 704 * 705 * @return the string representation of the instrumentation. 706 */ 707 public String toString() { 708 String E = System.getProperty("line.separator"); 709 StringBuilder sb = new StringBuilder(4096); 710 for (String element : all.keySet()) { 711 sb.append(element).append(':').append(E); 712 List<String> groups = new ArrayList<String>(all.get(element).keySet()); 713 Collections.sort(groups); 714 for (String group : groups) { 715 sb.append(" ").append(group).append(':').append(E); 716 List<String> names = new ArrayList<String>(all.get(element).get(group).keySet()); 717 Collections.sort(names); 718 for (String name : names) { 719 sb.append(" ").append(name).append(": ").append(((Element) all.get(element). 720 get(group).get(name)).getValue()).append(E); 721 } 722 } 723 } 724 return sb.toString(); 725 } 726 727 private static class Sampler implements Element<Double>, Runnable { 728 private Lock lock = new ReentrantLock(); 729 private int samplingInterval; 730 private Variable<Long> variable; 731 private long[] values; 732 private int current; 733 private long valuesSum; 734 private double rate; 735 736 public Sampler(int samplingPeriod, int samplingInterval, Variable<Long> variable) { 737 this.samplingInterval = samplingInterval; 738 this.variable = variable; 739 values = new long[samplingPeriod / samplingInterval]; 740 valuesSum = 0; 741 current = -1; 742 } 743 744 public int getSamplingInterval() { 745 return samplingInterval; 746 } 747 748 public void run() { 749 try { 750 lock.lock(); 751 long newValue = variable.getValue(); 752 if (current == -1) { 753 valuesSum = newValue; 754 current = 0; 755 values[current] = newValue; 756 } 757 else { 758 current = (current + 1) % values.length; 759 valuesSum = valuesSum - values[current] + newValue; 760 values[current] = newValue; 761 } 762 rate = ((double) valuesSum) / values.length; 763 } 764 finally { 765 lock.unlock(); 766 } 767 } 768 769 public Double getValue() { 770 return rate; 771 } 772 } 773 774 /** 775 * Add a sampling variable. <p> This method is thread safe. 776 * 777 * @param group timer group. 778 * @param name timer name. 779 * @param period sampling period to compute rate. 780 * @param interval sampling frequency, how often the variable is probed. 781 * @param variable variable to sample. 782 */ 783 public void addSampler(String group, String name, int period, int interval, Variable<Long> variable) { 784 if (scheduler == null) { 785 throw new IllegalStateException("scheduler not set, cannot sample"); 786 } 787 try { 788 samplerLock.lock(); 789 Map<String, Element<Double>> map = samplers.get(group); 790 if (map == null) { 791 map = samplers.get(group); 792 if (map == null) { 793 map = new HashMap<String, Element<Double>>(); 794 samplers.put(group, map); 795 } 796 } 797 if (map.containsKey(name)) { 798 throw new RuntimeException(XLog.format("Sampler group=[{0}] name=[{1}] already defined", group, name)); 799 } 800 else { 801 Sampler sampler = new Sampler(period, interval, variable); 802 map.put(name, sampler); 803 scheduler.scheduleAtFixedRate(sampler, 0, sampler.getSamplingInterval(), TimeUnit.SECONDS); 804 } 805 } 806 finally { 807 samplerLock.unlock(); 808 } 809 } 810 811 /** 812 * Return all the samplers. <p> This method is thread safe. <p> The samplers are live. The sampler value is a 813 * snapshot at the time the {@link Instrumentation.Element#getValue()} is invoked. 814 * 815 * @return all counters. 816 */ 817 public Map<String, Map<String, Element<Double>>> getSamplers() { 818 return samplers; 819 } 820 821 public void stop() { 822 823 } 824 825}