001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.util; 020 021import com.google.common.collect.Maps; 022import org.apache.hadoop.conf.Configuration; 023import org.apache.oozie.action.hadoop.PasswordMasker; 024import org.apache.oozie.service.ConfigurationService; 025import org.apache.oozie.service.Services; 026 027import java.util.ArrayList; 028import java.util.Collection; 029import java.util.Collections; 030import java.util.HashMap; 031import java.util.LinkedHashMap; 032import java.util.LinkedHashSet; 033import java.util.List; 034import java.util.Map; 035import java.util.Set; 036import java.util.concurrent.ConcurrentHashMap; 037import java.util.concurrent.ScheduledExecutorService; 038import java.util.concurrent.TimeUnit; 039import java.util.concurrent.atomic.AtomicLong; 040import java.util.concurrent.locks.Lock; 041import java.util.concurrent.locks.ReentrantLock; 042 043/** 044 * Instrumentation framework that supports Timers, Counters, Variables and Sampler instrumentation elements. <p> All 045 * instrumentation elements have a group and a name. 046 */ 047public class Instrumentation { 048 private ScheduledExecutorService scheduler; 049 private Lock counterLock; 050 private Lock timerLock; 051 private Lock variableLock; 052 private Lock samplerLock; 053 private Map<String, Map<String, Map<String, Object>>> all; 054 private Map<String, Map<String, Element<Long>>> counters; 055 private Map<String, Map<String, Element<Timer>>> timers; 056 private Map<String, Map<String, Element<Variable>>> variables; 057 private Map<String, Map<String, Element<Double>>> samplers; 058 059 /** 060 * Instrumentation constructor. 061 */ 062 @SuppressWarnings("unchecked") 063 public Instrumentation() { 064 counterLock = new ReentrantLock(); 065 timerLock = new ReentrantLock(); 066 variableLock = new ReentrantLock(); 067 samplerLock = new ReentrantLock(); 068 all = new LinkedHashMap<String, Map<String, Map<String, Object>>>(); 069 counters = new ConcurrentHashMap<String, Map<String, Element<Long>>>(); 070 timers = new ConcurrentHashMap<String, Map<String, Element<Timer>>>(); 071 variables = new ConcurrentHashMap<String, Map<String, Element<Variable>>>(); 072 samplers = new ConcurrentHashMap<String, Map<String, Element<Double>>>(); 073 all.put("variables", (Map<String, Map<String, Object>>) (Object) variables); 074 all.put("samplers", (Map<String, Map<String, Object>>) (Object) samplers); 075 all.put("counters", (Map<String, Map<String, Object>>) (Object) counters); 076 all.put("timers", (Map<String, Map<String, Object>>) (Object) timers); 077 } 078 079 /** 080 * Set the scheduler instance to handle the samplers. 081 * 082 * @param scheduler scheduler instance. 083 */ 084 public void setScheduler(ScheduledExecutorService scheduler) { 085 this.scheduler = scheduler; 086 } 087 088 /** 089 * Cron is a stopwatch that can be started/stopped several times. <p> This class is not thread safe, it does not 090 * need to be. <p> It keeps track of the total time (first start to last stop) and the running time (total time 091 * minus the stopped intervals). <p> Once a Cron is complete it must be added to the corresponding group/name in a 092 * Instrumentation instance. 093 */ 094 public static class Cron { 095 private long start; 096 private long end; 097 private long lapStart; 098 private long own; 099 private long total; 100 private boolean running; 101 102 /** 103 * Creates new Cron, stopped, in zero. 104 */ 105 public Cron() { 106 running = false; 107 } 108 109 /** 110 * Start the cron. It cannot be already started. 111 */ 112 public void start() { 113 if (!running) { 114 if (lapStart == 0) { 115 lapStart = System.currentTimeMillis(); 116 if (start == 0) { 117 start = lapStart; 118 end = start; 119 } 120 } 121 running = true; 122 } 123 } 124 125 /** 126 * Stops the cron. It cannot be already stopped. 127 */ 128 public void stop() { 129 if (running) { 130 end = System.currentTimeMillis(); 131 if (start == 0) { 132 start = end; 133 } 134 total = end - start; 135 if (lapStart > 0) { 136 own += end - lapStart; 137 lapStart = 0; 138 } 139 running = false; 140 } 141 } 142 143 /** 144 * Return the start time of the cron. It must be stopped. 145 * 146 * @return the start time of the cron. 147 */ 148 public long getStart() { 149 if (running) { 150 throw new IllegalStateException("Timer running"); 151 } 152 return start; 153 } 154 155 /** 156 * Return the end time of the cron. It must be stopped. 157 * 158 * @return the end time of the cron. 159 */ 160 public long getEnd() { 161 if (running) { 162 throw new IllegalStateException("Timer running"); 163 } 164 return end; 165 } 166 167 /** 168 * Return the total time of the cron. It must be stopped. 169 * 170 * @return the total time of the cron. 171 */ 172 public long getTotal() { 173 if (running) { 174 throw new IllegalStateException("Timer running"); 175 } 176 return total; 177 } 178 179 /** 180 * Return the own time of the cron. It must be stopped. 181 * 182 * @return the own time of the cron. 183 */ 184 public long getOwn() { 185 if (running) { 186 throw new IllegalStateException("Timer running"); 187 } 188 return own; 189 } 190 191 } 192 193 /** 194 * Gives access to a snapshot of an Instrumentation element (Counter, Timer). <p> Instrumentation element snapshots 195 * are returned by the {@link Instrumentation#getCounters()} and {@link Instrumentation#getTimers()} ()} methods. 196 */ 197 public interface Element<T> { 198 199 /** 200 * Return the snapshot value of the Intrumentation element. 201 * 202 * @return the snapshot value of the Intrumentation element. 203 */ 204 T getValue(); 205 } 206 207 /** 208 * Counter Instrumentation element. 209 */ 210 private static class Counter extends AtomicLong implements Element<Long> { 211 212 /** 213 * Return the counter snapshot. 214 * 215 * @return the counter snapshot. 216 */ 217 public Long getValue() { 218 return get(); 219 } 220 221 /** 222 * Return the String representation of the counter value. 223 * 224 * @return the String representation of the counter value. 225 */ 226 public String toString() { 227 return Long.toString(get()); 228 } 229 230 } 231 232 /** 233 * Timer Instrumentation element. 234 */ 235 public static class Timer implements Element<Timer> { 236 Lock lock = new ReentrantLock(); 237 private long ownTime; 238 private long totalTime; 239 private long ticks; 240 private long ownSquareTime; 241 private long totalSquareTime; 242 private long ownMinTime; 243 private long ownMaxTime; 244 private long totalMinTime; 245 private long totalMaxTime; 246 247 /** 248 * Timer constructor. <p> It is project private for test purposes. 249 */ 250 Timer() { 251 } 252 253 /** 254 * Return the String representation of the timer value. 255 * 256 * @return the String representation of the timer value. 257 */ 258 public String toString() { 259 return XLog.format("ticks[{0}] totalAvg[{1}] ownAvg[{2}]", ticks, getTotalAvg(), getOwnAvg()); 260 } 261 262 /** 263 * Return the timer snapshot. 264 * 265 * @return the timer snapshot. 266 */ 267 public Timer getValue() { 268 try { 269 lock.lock(); 270 Timer timer = new Timer(); 271 timer.ownTime = ownTime; 272 timer.totalTime = totalTime; 273 timer.ticks = ticks; 274 timer.ownSquareTime = ownSquareTime; 275 timer.totalSquareTime = totalSquareTime; 276 timer.ownMinTime = ownMinTime; 277 timer.ownMaxTime = ownMaxTime; 278 timer.totalMinTime = totalMinTime; 279 timer.totalMaxTime = totalMaxTime; 280 return timer; 281 } 282 finally { 283 lock.unlock(); 284 } 285 } 286 287 /** 288 * Add a cron to a timer. <p> It is project private for test purposes. 289 * 290 * @param cron Cron to add. 291 */ 292 void addCron(Cron cron) { 293 try { 294 lock.lock(); 295 long own = cron.getOwn(); 296 long total = cron.getTotal(); 297 ownTime += own; 298 totalTime += total; 299 ticks++; 300 ownSquareTime += own * own; 301 totalSquareTime += total * total; 302 if (ticks == 1) { 303 ownMinTime = own; 304 ownMaxTime = own; 305 totalMinTime = total; 306 totalMaxTime = total; 307 } 308 else { 309 ownMinTime = Math.min(ownMinTime, own); 310 ownMaxTime = Math.max(ownMaxTime, own); 311 totalMinTime = Math.min(totalMinTime, total); 312 totalMaxTime = Math.max(totalMaxTime, total); 313 } 314 } 315 finally { 316 lock.unlock(); 317 } 318 } 319 320 /** 321 * Return the own accumulated computing time by the timer. 322 * 323 * @return own accumulated computing time by the timer. 324 */ 325 public long getOwn() { 326 return ownTime; 327 } 328 329 /** 330 * Return the total accumulated computing time by the timer. 331 * 332 * @return total accumulated computing time by the timer. 333 */ 334 public long getTotal() { 335 return totalTime; 336 } 337 338 /** 339 * Return the number of times a cron was added to the timer. 340 * 341 * @return the number of times a cron was added to the timer. 342 */ 343 public long getTicks() { 344 return ticks; 345 } 346 347 /** 348 * Return the sum of the square own times. <p> It can be used to calculate the standard deviation. 349 * 350 * @return the sum of the square own timer. 351 */ 352 public long getOwnSquareSum() { 353 return ownSquareTime; 354 } 355 356 /** 357 * Return the sum of the square total times. <p> It can be used to calculate the standard deviation. 358 * 359 * @return the sum of the square own timer. 360 */ 361 public long getTotalSquareSum() { 362 return totalSquareTime; 363 } 364 365 /** 366 * Returns the own minimum time. 367 * 368 * @return the own minimum time. 369 */ 370 public long getOwnMin() { 371 return ownMinTime; 372 } 373 374 /** 375 * Returns the own maximum time. 376 * 377 * @return the own maximum time. 378 */ 379 public long getOwnMax() { 380 return ownMaxTime; 381 } 382 383 /** 384 * Returns the total minimum time. 385 * 386 * @return the total minimum time. 387 */ 388 public long getTotalMin() { 389 return totalMinTime; 390 } 391 392 /** 393 * Returns the total maximum time. 394 * 395 * @return the total maximum time. 396 */ 397 public long getTotalMax() { 398 return totalMaxTime; 399 } 400 401 /** 402 * Returns the own average time. 403 * 404 * @return the own average time. 405 */ 406 public long getOwnAvg() { 407 return (ticks != 0) ? ownTime / ticks : 0; 408 } 409 410 /** 411 * Returns the total average time. 412 * 413 * @return the total average time. 414 */ 415 public long getTotalAvg() { 416 return (ticks != 0) ? totalTime / ticks : 0; 417 } 418 419 /** 420 * Returns the total time standard deviation. 421 * 422 * @return the total time standard deviation. 423 */ 424 public double getTotalStdDev() { 425 return evalStdDev(ticks, totalTime, totalSquareTime); 426 } 427 428 /** 429 * Returns the own time standard deviation. 430 * 431 * @return the own time standard deviation. 432 */ 433 public double getOwnStdDev() { 434 return evalStdDev(ticks, ownTime, ownSquareTime); 435 } 436 437 private double evalStdDev(long n, long sn, long ssn) { 438 return (n < 2) ? -1 : Math.sqrt((n * ssn - sn * sn) / (n * (n - 1))); 439 } 440 441 } 442 443 /** 444 * Add a cron to an instrumentation timer. The timer is created if it does not exists. <p> This method is thread 445 * safe. 446 * 447 * @param group timer group. 448 * @param name timer name. 449 * @param cron cron to add to the timer. 450 */ 451 public void addCron(String group, String name, Cron cron) { 452 Map<String, Element<Timer>> map = timers.get(group); 453 if (map == null) { 454 try { 455 timerLock.lock(); 456 map = timers.get(group); 457 if (map == null) { 458 map = new HashMap<String, Element<Timer>>(); 459 timers.put(group, map); 460 } 461 } 462 finally { 463 timerLock.unlock(); 464 } 465 } 466 Timer timer = (Timer) map.get(name); 467 if (timer == null) { 468 try { 469 timerLock.lock(); 470 timer = (Timer) map.get(name); 471 if (timer == null) { 472 timer = new Timer(); 473 map.put(name, timer); 474 } 475 } 476 finally { 477 timerLock.unlock(); 478 } 479 } 480 timer.addCron(cron); 481 } 482 483 /** 484 * Increment an instrumentation counter. The counter is created if it does not exists. <p> This method is thread 485 * safe. 486 * 487 * @param group counter group. 488 * @param name counter name. 489 * @param count increment to add to the counter. 490 */ 491 public void incr(String group, String name, long count) { 492 Map<String, Element<Long>> map = counters.get(group); 493 if (map == null) { 494 try { 495 counterLock.lock(); 496 map = counters.get(group); 497 if (map == null) { 498 map = new HashMap<String, Element<Long>>(); 499 counters.put(group, map); 500 } 501 } 502 finally { 503 counterLock.unlock(); 504 } 505 } 506 Counter counter = (Counter) map.get(name); 507 if (counter == null) { 508 try { 509 counterLock.lock(); 510 counter = (Counter) map.get(name); 511 if (counter == null) { 512 counter = new Counter(); 513 map.put(name, counter); 514 } 515 } 516 finally { 517 counterLock.unlock(); 518 } 519 } 520 counter.addAndGet(count); 521 } 522 523 /** 524 * Interface for instrumentation variables. <p> For example a the database service could expose the number of 525 * currently active connections. 526 */ 527 public interface Variable<T> extends Element<T> { 528 } 529 530 /** 531 * Add an instrumentation variable. The variable must not exist. <p> This method is thread safe. 532 * 533 * @param group counter group. 534 * @param name counter name. 535 * @param variable variable to add. 536 */ 537 @SuppressWarnings("unchecked") 538 public void addVariable(String group, String name, Variable variable) { 539 Map<String, Element<Variable>> map = variables.get(group); 540 if (map == null) { 541 try { 542 variableLock.lock(); 543 map = variables.get(group); 544 if (map == null) { 545 map = new HashMap<String, Element<Variable>>(); 546 variables.put(group, map); 547 } 548 } 549 finally { 550 variableLock.unlock(); 551 } 552 } 553 if (map.containsKey(name)) { 554 throw new RuntimeException(XLog.format("Variable group=[{0}] name=[{1}] already defined", group, name)); 555 } 556 map.put(name, variable); 557 } 558 559 /** 560 * Return the JVM system properties. 561 * 562 * @return JVM system properties. 563 */ 564 public Map<String, String> getJavaSystemProperties() { 565 Map<String, String> unmasked = Maps.fromProperties(System.getProperties()); 566 return new PasswordMasker().mask(unmasked); 567 } 568 569 /** 570 * Return the OS environment used to start Oozie. 571 * 572 * @return the OS environment used to start Oozie. 573 */ 574 public Map<String, String> getOSEnv() { 575 Map<String, String> unmasked = System.getenv(); 576 return new PasswordMasker().mask(unmasked); 577 } 578 579 /** 580 * Return the current system configuration as a Map<String,String>. 581 * 582 * @return the current system configuration as a Map<String,String>. 583 */ 584 public Map<String, String> getConfiguration() { 585 final Configuration maskedConf = Services.get().get(ConfigurationService.class).getMaskedConfiguration(); 586 587 return new Map<String, String>() { 588 public int size() { 589 return maskedConf.size(); 590 } 591 592 public boolean isEmpty() { 593 return maskedConf.size() == 0; 594 } 595 596 public boolean containsKey(Object o) { 597 return maskedConf.get((String) o) != null; 598 } 599 600 public boolean containsValue(Object o) { 601 throw new UnsupportedOperationException(); 602 } 603 604 public String get(Object o) { 605 return maskedConf.get((String) o); 606 } 607 608 public String put(String s, String s1) { 609 throw new UnsupportedOperationException(); 610 } 611 612 public String remove(Object o) { 613 throw new UnsupportedOperationException(); 614 } 615 616 public void putAll(Map<? extends String, ? extends String> map) { 617 throw new UnsupportedOperationException(); 618 } 619 620 public void clear() { 621 throw new UnsupportedOperationException(); 622 } 623 624 public Set<String> keySet() { 625 Set<String> set = new LinkedHashSet<String>(); 626 for (Entry<String, String> entry : maskedConf) { 627 set.add(entry.getKey()); 628 } 629 return set; 630 } 631 632 public Collection<String> values() { 633 Set<String> set = new LinkedHashSet<String>(); 634 for (Entry<String, String> entry : maskedConf) { 635 set.add(entry.getValue()); 636 } 637 return set; 638 } 639 640 public Set<Entry<String, String>> entrySet() { 641 Set<Entry<String, String>> set = new LinkedHashSet<Entry<String, String>>(); 642 for (Entry<String, String> entry : maskedConf) { 643 set.add(entry); 644 } 645 return set; 646 } 647 }; 648 } 649 650 /** 651 * Return all the counters. <p> This method is thread safe. <p> The counters are live. The counter value is a 652 * snapshot at the time the {@link Instrumentation.Element#getValue()} is invoked. 653 * 654 * @return all counters. 655 */ 656 public Map<String, Map<String, Element<Long>>> getCounters() { 657 return counters; 658 } 659 660 /** 661 * Return all the timers. <p> This method is thread safe. <p> The timers are live. Once a timer is obtained, all 662 * its values are consistent (they are snapshot at the time the {@link Instrumentation.Element#getValue()} is 663 * invoked. 664 * 665 * @return all counters. 666 */ 667 public Map<String, Map<String, Element<Timer>>> getTimers() { 668 return timers; 669 } 670 671 /** 672 * Return all the variables. <p> This method is thread safe. <p> The variables are live. The variable value is a 673 * snapshot at the time the {@link Instrumentation.Element#getValue()} is invoked. 674 * 675 * @return all counters. 676 */ 677 public Map<String, Map<String, Element<Variable>>> getVariables() { 678 return variables; 679 } 680 681 /** 682 * Return a map containing all variables, counters and timers. 683 * 684 * @return a map containing all variables, counters and timers. 685 */ 686 public Map<String, Map<String, Map<String, Object>>> getAll() { 687 return all; 688 } 689 690 /** 691 * Return the string representation of the instrumentation. 692 * 693 * @return the string representation of the instrumentation. 694 */ 695 public String toString() { 696 String E = System.getProperty("line.separator"); 697 StringBuilder sb = new StringBuilder(4096); 698 for (String element : all.keySet()) { 699 sb.append(element).append(':').append(E); 700 List<String> groups = new ArrayList<String>(all.get(element).keySet()); 701 Collections.sort(groups); 702 for (String group : groups) { 703 sb.append(" ").append(group).append(':').append(E); 704 List<String> names = new ArrayList<String>(all.get(element).get(group).keySet()); 705 Collections.sort(names); 706 for (String name : names) { 707 sb.append(" ").append(name).append(": ").append(((Element) all.get(element). 708 get(group).get(name)).getValue()).append(E); 709 } 710 } 711 } 712 return sb.toString(); 713 } 714 715 private static class Sampler implements Element<Double>, Runnable { 716 private Lock lock = new ReentrantLock(); 717 private int samplingInterval; 718 private Variable<Long> variable; 719 private long[] values; 720 private int current; 721 private long valuesSum; 722 private double rate; 723 724 public Sampler(int samplingPeriod, int samplingInterval, Variable<Long> variable) { 725 this.samplingInterval = samplingInterval; 726 this.variable = variable; 727 values = new long[samplingPeriod / samplingInterval]; 728 valuesSum = 0; 729 current = -1; 730 } 731 732 public int getSamplingInterval() { 733 return samplingInterval; 734 } 735 736 public void run() { 737 try { 738 lock.lock(); 739 long newValue = variable.getValue(); 740 if (current == -1) { 741 valuesSum = newValue; 742 current = 0; 743 values[current] = newValue; 744 } 745 else { 746 current = (current + 1) % values.length; 747 valuesSum = valuesSum - values[current] + newValue; 748 values[current] = newValue; 749 } 750 rate = ((double) valuesSum) / values.length; 751 } 752 finally { 753 lock.unlock(); 754 } 755 } 756 757 public Double getValue() { 758 return rate; 759 } 760 } 761 762 /** 763 * Add a sampling variable. <p> This method is thread safe. 764 * 765 * @param group timer group. 766 * @param name timer name. 767 * @param period sampling period to compute rate. 768 * @param interval sampling frequency, how often the variable is probed. 769 * @param variable variable to sample. 770 */ 771 public void addSampler(String group, String name, int period, int interval, Variable<Long> variable) { 772 if (scheduler == null) { 773 throw new IllegalStateException("scheduler not set, cannot sample"); 774 } 775 try { 776 samplerLock.lock(); 777 Map<String, Element<Double>> map = samplers.get(group); 778 if (map == null) { 779 map = samplers.get(group); 780 if (map == null) { 781 map = new HashMap<String, Element<Double>>(); 782 samplers.put(group, map); 783 } 784 } 785 if (map.containsKey(name)) { 786 throw new RuntimeException(XLog.format("Sampler group=[{0}] name=[{1}] already defined", group, name)); 787 } 788 Sampler sampler = new Sampler(period, interval, variable); 789 map.put(name, sampler); 790 scheduler.scheduleAtFixedRate(sampler, 0, sampler.getSamplingInterval(), TimeUnit.SECONDS); 791 } 792 finally { 793 samplerLock.unlock(); 794 } 795 } 796 797 /** 798 * Return all the samplers. <p> This method is thread safe. <p> The samplers are live. The sampler value is a 799 * snapshot at the time the {@link Instrumentation.Element#getValue()} is invoked. 800 * 801 * @return all counters. 802 */ 803 public Map<String, Map<String, Element<Double>>> getSamplers() { 804 return samplers; 805 } 806 807 public void stop() { 808 809 } 810 811}