001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.util; 020 021import com.google.common.collect.Maps; 022import org.apache.hadoop.conf.Configuration; 023import org.apache.oozie.service.ConfigurationService; 024import org.apache.oozie.service.Services; 025 026import java.util.ArrayList; 027import java.util.Collection; 028import java.util.Collections; 029import java.util.HashMap; 030import java.util.LinkedHashMap; 031import java.util.LinkedHashSet; 032import java.util.List; 033import java.util.Map; 034import java.util.Set; 035import java.util.concurrent.ConcurrentHashMap; 036import java.util.concurrent.ScheduledExecutorService; 037import java.util.concurrent.TimeUnit; 038import java.util.concurrent.atomic.AtomicLong; 039import java.util.concurrent.locks.Lock; 040import java.util.concurrent.locks.ReentrantLock; 041 042/** 043 * Instrumentation framework that supports Timers, Counters, Variables and Sampler instrumentation elements. <p> All 044 * instrumentation elements have a group and a name. 045 */ 046public class Instrumentation { 047 private ScheduledExecutorService scheduler; 048 private Lock counterLock; 049 private Lock timerLock; 050 private Lock variableLock; 051 private Lock samplerLock; 052 private Map<String, Map<String, Map<String, Object>>> all; 053 private Map<String, Map<String, Element<Long>>> counters; 054 private Map<String, Map<String, Element<Timer>>> timers; 055 private Map<String, Map<String, Element<Variable>>> variables; 056 private Map<String, Map<String, Element<Double>>> samplers; 057 058 /** 059 * Instrumentation constructor. 060 */ 061 @SuppressWarnings("unchecked") 062 public Instrumentation() { 063 counterLock = new ReentrantLock(); 064 timerLock = new ReentrantLock(); 065 variableLock = new ReentrantLock(); 066 samplerLock = new ReentrantLock(); 067 all = new LinkedHashMap<String, Map<String, Map<String, Object>>>(); 068 counters = new ConcurrentHashMap<String, Map<String, Element<Long>>>(); 069 timers = new ConcurrentHashMap<String, Map<String, Element<Timer>>>(); 070 variables = new ConcurrentHashMap<String, Map<String, Element<Variable>>>(); 071 samplers = new ConcurrentHashMap<String, Map<String, Element<Double>>>(); 072 all.put("variables", (Map<String, Map<String, Object>>) (Object) variables); 073 all.put("samplers", (Map<String, Map<String, Object>>) (Object) samplers); 074 all.put("counters", (Map<String, Map<String, Object>>) (Object) counters); 075 all.put("timers", (Map<String, Map<String, Object>>) (Object) timers); 076 } 077 078 /** 079 * Set the scheduler instance to handle the samplers. 080 * 081 * @param scheduler scheduler instance. 082 */ 083 public void setScheduler(ScheduledExecutorService scheduler) { 084 this.scheduler = scheduler; 085 } 086 087 /** 088 * Cron is a stopwatch that can be started/stopped several times. <p> This class is not thread safe, it does not 089 * need to be. <p> It keeps track of the total time (first start to last stop) and the running time (total time 090 * minus the stopped intervals). <p> Once a Cron is complete it must be added to the corresponding group/name in a 091 * Instrumentation instance. 092 */ 093 public static class Cron { 094 private long start; 095 private long end; 096 private long lapStart; 097 private long own; 098 private long total; 099 private boolean running; 100 101 /** 102 * Creates new Cron, stopped, in zero. 103 */ 104 public Cron() { 105 running = false; 106 } 107 108 /** 109 * Start the cron. It cannot be already started. 110 */ 111 public void start() { 112 if (!running) { 113 if (lapStart == 0) { 114 lapStart = System.currentTimeMillis(); 115 if (start == 0) { 116 start = lapStart; 117 end = start; 118 } 119 } 120 running = true; 121 } 122 } 123 124 /** 125 * Stops the cron. It cannot be already stopped. 126 */ 127 public void stop() { 128 if (running) { 129 end = System.currentTimeMillis(); 130 if (start == 0) { 131 start = end; 132 } 133 total = end - start; 134 if (lapStart > 0) { 135 own += end - lapStart; 136 lapStart = 0; 137 } 138 running = false; 139 } 140 } 141 142 /** 143 * Return the start time of the cron. It must be stopped. 144 * 145 * @return the start time of the cron. 146 */ 147 public long getStart() { 148 if (running) { 149 throw new IllegalStateException("Timer running"); 150 } 151 return start; 152 } 153 154 /** 155 * Return the end time of the cron. It must be stopped. 156 * 157 * @return the end time of the cron. 158 */ 159 public long getEnd() { 160 if (running) { 161 throw new IllegalStateException("Timer running"); 162 } 163 return end; 164 } 165 166 /** 167 * Return the total time of the cron. It must be stopped. 168 * 169 * @return the total time of the cron. 170 */ 171 public long getTotal() { 172 if (running) { 173 throw new IllegalStateException("Timer running"); 174 } 175 return total; 176 } 177 178 /** 179 * Return the own time of the cron. It must be stopped. 180 * 181 * @return the own time of the cron. 182 */ 183 public long getOwn() { 184 if (running) { 185 throw new IllegalStateException("Timer running"); 186 } 187 return own; 188 } 189 190 } 191 192 /** 193 * Gives access to a snapshot of an Instrumentation element (Counter, Timer). <p> Instrumentation element snapshots 194 * are returned by the {@link Instrumentation#getCounters()} and {@link Instrumentation#getTimers()} ()} methods. 195 */ 196 public interface Element<T> { 197 198 /** 199 * Return the snapshot value of the Intrumentation element. 200 * 201 * @return the snapshot value of the Intrumentation element. 202 */ 203 T getValue(); 204 } 205 206 /** 207 * Counter Instrumentation element. 208 */ 209 private static class Counter extends AtomicLong implements Element<Long> { 210 211 /** 212 * Return the counter snapshot. 213 * 214 * @return the counter snapshot. 215 */ 216 public Long getValue() { 217 return get(); 218 } 219 220 /** 221 * Return the String representation of the counter value. 222 * 223 * @return the String representation of the counter value. 224 */ 225 public String toString() { 226 return Long.toString(get()); 227 } 228 229 } 230 231 /** 232 * Timer Instrumentation element. 233 */ 234 public static class Timer implements Element<Timer> { 235 Lock lock = new ReentrantLock(); 236 private long ownTime; 237 private long totalTime; 238 private long ticks; 239 private long ownSquareTime; 240 private long totalSquareTime; 241 private long ownMinTime; 242 private long ownMaxTime; 243 private long totalMinTime; 244 private long totalMaxTime; 245 246 /** 247 * Timer constructor. <p> It is project private for test purposes. 248 */ 249 Timer() { 250 } 251 252 /** 253 * Return the String representation of the timer value. 254 * 255 * @return the String representation of the timer value. 256 */ 257 public String toString() { 258 return XLog.format("ticks[{0}] totalAvg[{1}] ownAvg[{2}]", ticks, getTotalAvg(), getOwnAvg()); 259 } 260 261 /** 262 * Return the timer snapshot. 263 * 264 * @return the timer snapshot. 265 */ 266 public Timer getValue() { 267 try { 268 lock.lock(); 269 Timer timer = new Timer(); 270 timer.ownTime = ownTime; 271 timer.totalTime = totalTime; 272 timer.ticks = ticks; 273 timer.ownSquareTime = ownSquareTime; 274 timer.totalSquareTime = totalSquareTime; 275 timer.ownMinTime = ownMinTime; 276 timer.ownMaxTime = ownMaxTime; 277 timer.totalMinTime = totalMinTime; 278 timer.totalMaxTime = totalMaxTime; 279 return timer; 280 } 281 finally { 282 lock.unlock(); 283 } 284 } 285 286 /** 287 * Add a cron to a timer. <p> It is project private for test purposes. 288 * 289 * @param cron Cron to add. 290 */ 291 void addCron(Cron cron) { 292 try { 293 lock.lock(); 294 long own = cron.getOwn(); 295 long total = cron.getTotal(); 296 ownTime += own; 297 totalTime += total; 298 ticks++; 299 ownSquareTime += own * own; 300 totalSquareTime += total * total; 301 if (ticks == 1) { 302 ownMinTime = own; 303 ownMaxTime = own; 304 totalMinTime = total; 305 totalMaxTime = total; 306 } 307 else { 308 ownMinTime = Math.min(ownMinTime, own); 309 ownMaxTime = Math.max(ownMaxTime, own); 310 totalMinTime = Math.min(totalMinTime, total); 311 totalMaxTime = Math.max(totalMaxTime, total); 312 } 313 } 314 finally { 315 lock.unlock(); 316 } 317 } 318 319 /** 320 * Return the own accumulated computing time by the timer. 321 * 322 * @return own accumulated computing time by the timer. 323 */ 324 public long getOwn() { 325 return ownTime; 326 } 327 328 /** 329 * Return the total accumulated computing time by the timer. 330 * 331 * @return total accumulated computing time by the timer. 332 */ 333 public long getTotal() { 334 return totalTime; 335 } 336 337 /** 338 * Return the number of times a cron was added to the timer. 339 * 340 * @return the number of times a cron was added to the timer. 341 */ 342 public long getTicks() { 343 return ticks; 344 } 345 346 /** 347 * Return the sum of the square own times. <p> It can be used to calculate the standard deviation. 348 * 349 * @return the sum of the square own timer. 350 */ 351 public long getOwnSquareSum() { 352 return ownSquareTime; 353 } 354 355 /** 356 * Return the sum of the square total times. <p> It can be used to calculate the standard deviation. 357 * 358 * @return the sum of the square own timer. 359 */ 360 public long getTotalSquareSum() { 361 return totalSquareTime; 362 } 363 364 /** 365 * Returns the own minimum time. 366 * 367 * @return the own minimum time. 368 */ 369 public long getOwnMin() { 370 return ownMinTime; 371 } 372 373 /** 374 * Returns the own maximum time. 375 * 376 * @return the own maximum time. 377 */ 378 public long getOwnMax() { 379 return ownMaxTime; 380 } 381 382 /** 383 * Returns the total minimum time. 384 * 385 * @return the total minimum time. 386 */ 387 public long getTotalMin() { 388 return totalMinTime; 389 } 390 391 /** 392 * Returns the total maximum time. 393 * 394 * @return the total maximum time. 395 */ 396 public long getTotalMax() { 397 return totalMaxTime; 398 } 399 400 /** 401 * Returns the own average time. 402 * 403 * @return the own average time. 404 */ 405 public long getOwnAvg() { 406 return (ticks != 0) ? ownTime / ticks : 0; 407 } 408 409 /** 410 * Returns the total average time. 411 * 412 * @return the total average time. 413 */ 414 public long getTotalAvg() { 415 return (ticks != 0) ? totalTime / ticks : 0; 416 } 417 418 /** 419 * Returns the total time standard deviation. 420 * 421 * @return the total time standard deviation. 422 */ 423 public double getTotalStdDev() { 424 return evalStdDev(ticks, totalTime, totalSquareTime); 425 } 426 427 /** 428 * Returns the own time standard deviation. 429 * 430 * @return the own time standard deviation. 431 */ 432 public double getOwnStdDev() { 433 return evalStdDev(ticks, ownTime, ownSquareTime); 434 } 435 436 private double evalStdDev(long n, long sn, long ssn) { 437 return (n < 2) ? -1 : Math.sqrt((n * ssn - sn * sn) / (n * (n - 1))); 438 } 439 440 } 441 442 /** 443 * Add a cron to an instrumentation timer. The timer is created if it does not exists. <p> This method is thread 444 * safe. 445 * 446 * @param group timer group. 447 * @param name timer name. 448 * @param cron cron to add to the timer. 449 */ 450 public void addCron(String group, String name, Cron cron) { 451 Map<String, Element<Timer>> map = timers.get(group); 452 if (map == null) { 453 try { 454 timerLock.lock(); 455 map = timers.get(group); 456 if (map == null) { 457 map = new HashMap<String, Element<Timer>>(); 458 timers.put(group, map); 459 } 460 } 461 finally { 462 timerLock.unlock(); 463 } 464 } 465 Timer timer = (Timer) map.get(name); 466 if (timer == null) { 467 try { 468 timerLock.lock(); 469 timer = (Timer) map.get(name); 470 if (timer == null) { 471 timer = new Timer(); 472 map.put(name, timer); 473 } 474 } 475 finally { 476 timerLock.unlock(); 477 } 478 } 479 timer.addCron(cron); 480 } 481 482 /** 483 * Increment an instrumentation counter. The counter is created if it does not exists. <p> This method is thread 484 * safe. 485 * 486 * @param group counter group. 487 * @param name counter name. 488 * @param count increment to add to the counter. 489 */ 490 public void incr(String group, String name, long count) { 491 Map<String, Element<Long>> map = counters.get(group); 492 if (map == null) { 493 try { 494 counterLock.lock(); 495 map = counters.get(group); 496 if (map == null) { 497 map = new HashMap<String, Element<Long>>(); 498 counters.put(group, map); 499 } 500 } 501 finally { 502 counterLock.unlock(); 503 } 504 } 505 Counter counter = (Counter) map.get(name); 506 if (counter == null) { 507 try { 508 counterLock.lock(); 509 counter = (Counter) map.get(name); 510 if (counter == null) { 511 counter = new Counter(); 512 map.put(name, counter); 513 } 514 } 515 finally { 516 counterLock.unlock(); 517 } 518 } 519 counter.addAndGet(count); 520 } 521 522 /** 523 * Interface for instrumentation variables. <p> For example a the database service could expose the number of 524 * currently active connections. 525 */ 526 public interface Variable<T> extends Element<T> { 527 } 528 529 /** 530 * Add an instrumentation variable. The variable must not exist. <p> This method is thread safe. 531 * 532 * @param group counter group. 533 * @param name counter name. 534 * @param variable variable to add. 535 */ 536 @SuppressWarnings("unchecked") 537 public void addVariable(String group, String name, Variable variable) { 538 Map<String, Element<Variable>> map = variables.get(group); 539 if (map == null) { 540 try { 541 variableLock.lock(); 542 map = variables.get(group); 543 if (map == null) { 544 map = new HashMap<String, Element<Variable>>(); 545 variables.put(group, map); 546 } 547 } 548 finally { 549 variableLock.unlock(); 550 } 551 } 552 if (map.containsKey(name)) { 553 throw new RuntimeException(XLog.format("Variable group=[{0}] name=[{1}] already defined", group, name)); 554 } 555 map.put(name, variable); 556 } 557 558 /** 559 * Return the JVM system properties. 560 * 561 * @return JVM system properties. 562 */ 563 public Map<String, String> getJavaSystemProperties() { 564 Map<String, String> unmasked = Maps.fromProperties(System.getProperties()); 565 return new PasswordMasker().mask(unmasked); 566 } 567 568 /** 569 * Return the OS environment used to start Oozie. 570 * 571 * @return the OS environment used to start Oozie. 572 */ 573 public Map<String, String> getOSEnv() { 574 Map<String, String> unmasked = System.getenv(); 575 return new PasswordMasker().mask(unmasked); 576 } 577 578 /** 579 * Return the current system configuration as a Map<String,String>. 580 * 581 * @return the current system configuration as a Map<String,String>. 582 */ 583 public Map<String, String> getConfiguration() { 584 final Configuration maskedConf = Services.get().get(ConfigurationService.class).getMaskedConfiguration(); 585 586 return new Map<String, String>() { 587 public int size() { 588 return maskedConf.size(); 589 } 590 591 public boolean isEmpty() { 592 return maskedConf.size() == 0; 593 } 594 595 public boolean containsKey(Object o) { 596 return maskedConf.get((String) o) != null; 597 } 598 599 public boolean containsValue(Object o) { 600 throw new UnsupportedOperationException(); 601 } 602 603 public String get(Object o) { 604 return maskedConf.get((String) o); 605 } 606 607 public String put(String s, String s1) { 608 throw new UnsupportedOperationException(); 609 } 610 611 public String remove(Object o) { 612 throw new UnsupportedOperationException(); 613 } 614 615 public void putAll(Map<? extends String, ? extends String> map) { 616 throw new UnsupportedOperationException(); 617 } 618 619 public void clear() { 620 throw new UnsupportedOperationException(); 621 } 622 623 public Set<String> keySet() { 624 Set<String> set = new LinkedHashSet<String>(); 625 for (Entry<String, String> entry : maskedConf) { 626 set.add(entry.getKey()); 627 } 628 return set; 629 } 630 631 public Collection<String> values() { 632 Set<String> set = new LinkedHashSet<String>(); 633 for (Entry<String, String> entry : maskedConf) { 634 set.add(entry.getValue()); 635 } 636 return set; 637 } 638 639 public Set<Entry<String, String>> entrySet() { 640 Set<Entry<String, String>> set = new LinkedHashSet<Entry<String, String>>(); 641 for (Entry<String, String> entry : maskedConf) { 642 set.add(entry); 643 } 644 return set; 645 } 646 }; 647 } 648 649 /** 650 * Return all the counters. <p> This method is thread safe. <p> The counters are live. The counter value is a 651 * snapshot at the time the {@link Instrumentation.Element#getValue()} is invoked. 652 * 653 * @return all counters. 654 */ 655 public Map<String, Map<String, Element<Long>>> getCounters() { 656 return counters; 657 } 658 659 /** 660 * Return all the timers. <p> This method is thread safe. <p> The timers are live. Once a timer is obtained, all 661 * its values are consistent (they are snapshot at the time the {@link Instrumentation.Element#getValue()} is 662 * invoked. 663 * 664 * @return all counters. 665 */ 666 public Map<String, Map<String, Element<Timer>>> getTimers() { 667 return timers; 668 } 669 670 /** 671 * Return all the variables. <p> This method is thread safe. <p> The variables are live. The variable value is a 672 * snapshot at the time the {@link Instrumentation.Element#getValue()} is invoked. 673 * 674 * @return all counters. 675 */ 676 public Map<String, Map<String, Element<Variable>>> getVariables() { 677 return variables; 678 } 679 680 /** 681 * Return a map containing all variables, counters and timers. 682 * 683 * @return a map containing all variables, counters and timers. 684 */ 685 public Map<String, Map<String, Map<String, Object>>> getAll() { 686 return all; 687 } 688 689 /** 690 * Return the string representation of the instrumentation. 691 * 692 * @return the string representation of the instrumentation. 693 */ 694 public String toString() { 695 String E = System.getProperty("line.separator"); 696 StringBuilder sb = new StringBuilder(4096); 697 for (String element : all.keySet()) { 698 sb.append(element).append(':').append(E); 699 List<String> groups = new ArrayList<String>(all.get(element).keySet()); 700 Collections.sort(groups); 701 for (String group : groups) { 702 sb.append(" ").append(group).append(':').append(E); 703 List<String> names = new ArrayList<String>(all.get(element).get(group).keySet()); 704 Collections.sort(names); 705 for (String name : names) { 706 sb.append(" ").append(name).append(": ").append(((Element) all.get(element). 707 get(group).get(name)).getValue()).append(E); 708 } 709 } 710 } 711 return sb.toString(); 712 } 713 714 private static class Sampler implements Element<Double>, Runnable { 715 private Lock lock = new ReentrantLock(); 716 private int samplingInterval; 717 private Variable<Long> variable; 718 private long[] values; 719 private int current; 720 private long valuesSum; 721 private double rate; 722 723 public Sampler(int samplingPeriod, int samplingInterval, Variable<Long> variable) { 724 this.samplingInterval = samplingInterval; 725 this.variable = variable; 726 values = new long[samplingPeriod / samplingInterval]; 727 valuesSum = 0; 728 current = -1; 729 } 730 731 public int getSamplingInterval() { 732 return samplingInterval; 733 } 734 735 public void run() { 736 try { 737 lock.lock(); 738 long newValue = variable.getValue(); 739 if (current == -1) { 740 valuesSum = newValue; 741 current = 0; 742 values[current] = newValue; 743 } 744 else { 745 current = (current + 1) % values.length; 746 valuesSum = valuesSum - values[current] + newValue; 747 values[current] = newValue; 748 } 749 rate = ((double) valuesSum) / values.length; 750 } 751 finally { 752 lock.unlock(); 753 } 754 } 755 756 public Double getValue() { 757 return rate; 758 } 759 } 760 761 /** 762 * Add a sampling variable. <p> This method is thread safe. 763 * 764 * @param group timer group. 765 * @param name timer name. 766 * @param period sampling period to compute rate. 767 * @param interval sampling frequency, how often the variable is probed. 768 * @param variable variable to sample. 769 */ 770 public void addSampler(String group, String name, int period, int interval, Variable<Long> variable) { 771 if (scheduler == null) { 772 throw new IllegalStateException("scheduler not set, cannot sample"); 773 } 774 try { 775 samplerLock.lock(); 776 Map<String, Element<Double>> map = samplers.get(group); 777 if (map == null) { 778 map = samplers.get(group); 779 if (map == null) { 780 map = new HashMap<String, Element<Double>>(); 781 samplers.put(group, map); 782 } 783 } 784 if (map.containsKey(name)) { 785 throw new RuntimeException(XLog.format("Sampler group=[{0}] name=[{1}] already defined", group, name)); 786 } 787 Sampler sampler = new Sampler(period, interval, variable); 788 map.put(name, sampler); 789 scheduler.scheduleAtFixedRate(sampler, 0, sampler.getSamplingInterval(), TimeUnit.SECONDS); 790 } 791 finally { 792 samplerLock.unlock(); 793 } 794 } 795 796 /** 797 * Return all the samplers. <p> This method is thread safe. <p> The samplers are live. The sampler value is a 798 * snapshot at the time the {@link Instrumentation.Element#getValue()} is invoked. 799 * 800 * @return all counters. 801 */ 802 public Map<String, Map<String, Element<Double>>> getSamplers() { 803 return samplers; 804 } 805 806 public void stop() { 807 808 } 809 810}