001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.util; 019 020import org.apache.hadoop.conf.Configuration; 021import org.apache.oozie.service.ConfigurationService; 022import org.apache.oozie.service.Services; 023 024import java.util.ArrayList; 025import java.util.Collection; 026import java.util.Collections; 027import java.util.HashMap; 028import java.util.LinkedHashMap; 029import java.util.LinkedHashSet; 030import java.util.List; 031import java.util.Map; 032import java.util.Set; 033import java.util.concurrent.ConcurrentHashMap; 034import java.util.concurrent.ScheduledExecutorService; 035import java.util.concurrent.TimeUnit; 036import java.util.concurrent.atomic.AtomicLong; 037import java.util.concurrent.locks.Lock; 038import java.util.concurrent.locks.ReentrantLock; 039 040/** 041 * Instrumentation framework that supports Timers, Counters, Variables and Sampler instrumentation elements. <p/> All 042 * instrumentation elements have a group and a name. 043 */ 044public class Instrumentation { 045 private ScheduledExecutorService scheduler; 046 private Lock counterLock; 047 private Lock timerLock; 048 private Lock variableLock; 049 private Lock samplerLock; 050 private Map<String, Map<String, Map<String, Object>>> all; 051 private Map<String, Map<String, Element<Long>>> counters; 052 private Map<String, Map<String, Element<Timer>>> timers; 053 private Map<String, Map<String, Element<Variable>>> variables; 054 private Map<String, Map<String, Element<Double>>> samplers; 055 056 /** 057 * Instrumentation constructor. 058 */ 059 @SuppressWarnings("unchecked") 060 public Instrumentation() { 061 counterLock = new ReentrantLock(); 062 timerLock = new ReentrantLock(); 063 variableLock = new ReentrantLock(); 064 samplerLock = new ReentrantLock(); 065 all = new LinkedHashMap<String, Map<String, Map<String, Object>>>(); 066 counters = new ConcurrentHashMap<String, Map<String, Element<Long>>>(); 067 timers = new ConcurrentHashMap<String, Map<String, Element<Timer>>>(); 068 variables = new ConcurrentHashMap<String, Map<String, Element<Variable>>>(); 069 samplers = new ConcurrentHashMap<String, Map<String, Element<Double>>>(); 070 all.put("variables", (Map<String, Map<String, Object>>) (Object) variables); 071 all.put("samplers", (Map<String, Map<String, Object>>) (Object) samplers); 072 all.put("counters", (Map<String, Map<String, Object>>) (Object) counters); 073 all.put("timers", (Map<String, Map<String, Object>>) (Object) timers); 074 } 075 076 /** 077 * Set the scheduler instance to handle the samplers. 078 * 079 * @param scheduler scheduler instance. 080 */ 081 public void setScheduler(ScheduledExecutorService scheduler) { 082 this.scheduler = scheduler; 083 } 084 085 /** 086 * Cron is a stopwatch that can be started/stopped several times. <p/> This class is not thread safe, it does not 087 * need to be. <p/> It keeps track of the total time (first start to last stop) and the running time (total time 088 * minus the stopped intervals). <p/> Once a Cron is complete it must be added to the corresponding group/name in a 089 * Instrumentation instance. 090 */ 091 public static class Cron { 092 private long start; 093 private long end; 094 private long lapStart; 095 private long own; 096 private long total; 097 private boolean running; 098 099 /** 100 * Creates new Cron, stopped, in zero. 101 */ 102 public Cron() { 103 running = false; 104 } 105 106 /** 107 * Start the cron. It cannot be already started. 108 */ 109 public void start() { 110 if (!running) { 111 if (lapStart == 0) { 112 lapStart = System.currentTimeMillis(); 113 if (start == 0) { 114 start = lapStart; 115 end = start; 116 } 117 } 118 running = true; 119 } 120 } 121 122 /** 123 * Stops the cron. It cannot be already stopped. 124 */ 125 public void stop() { 126 if (running) { 127 end = System.currentTimeMillis(); 128 if (start == 0) { 129 start = end; 130 } 131 total = end - start; 132 if (lapStart > 0) { 133 own += end - lapStart; 134 lapStart = 0; 135 } 136 running = false; 137 } 138 } 139 140 /** 141 * Return the start time of the cron. It must be stopped. 142 * 143 * @return the start time of the cron. 144 */ 145 public long getStart() { 146 if (running) { 147 throw new IllegalStateException("Timer running"); 148 } 149 return start; 150 } 151 152 /** 153 * Return the end time of the cron. It must be stopped. 154 * 155 * @return the end time of the cron. 156 */ 157 public long getEnd() { 158 if (running) { 159 throw new IllegalStateException("Timer running"); 160 } 161 return end; 162 } 163 164 /** 165 * Return the total time of the cron. It must be stopped. 166 * 167 * @return the total time of the cron. 168 */ 169 public long getTotal() { 170 if (running) { 171 throw new IllegalStateException("Timer running"); 172 } 173 return total; 174 } 175 176 /** 177 * Return the own time of the cron. It must be stopped. 178 * 179 * @return the own time of the cron. 180 */ 181 public long getOwn() { 182 if (running) { 183 throw new IllegalStateException("Timer running"); 184 } 185 return own; 186 } 187 188 } 189 190 /** 191 * Gives access to a snapshot of an Instrumentation element (Counter, Timer). <p/> Instrumentation element snapshots 192 * are returned by the {@link Instrumentation#getCounters()} and {@link Instrumentation#getTimers()} ()} methods. 193 */ 194 public interface Element<T> { 195 196 /** 197 * Return the snapshot value of the Intrumentation element. 198 * 199 * @return the snapshot value of the Intrumentation element. 200 */ 201 T getValue(); 202 } 203 204 /** 205 * Counter Instrumentation element. 206 */ 207 private static class Counter extends AtomicLong implements Element<Long> { 208 209 /** 210 * Return the counter snapshot. 211 * 212 * @return the counter snapshot. 213 */ 214 public Long getValue() { 215 return get(); 216 } 217 218 /** 219 * Return the String representation of the counter value. 220 * 221 * @return the String representation of the counter value. 222 */ 223 public String toString() { 224 return Long.toString(get()); 225 } 226 227 } 228 229 /** 230 * Timer Instrumentation element. 231 */ 232 public static class Timer implements Element<Timer> { 233 Lock lock = new ReentrantLock(); 234 private long ownTime; 235 private long totalTime; 236 private long ticks; 237 private long ownSquareTime; 238 private long totalSquareTime; 239 private long ownMinTime; 240 private long ownMaxTime; 241 private long totalMinTime; 242 private long totalMaxTime; 243 244 /** 245 * Timer constructor. <p/> It is project private for test purposes. 246 */ 247 Timer() { 248 } 249 250 /** 251 * Return the String representation of the timer value. 252 * 253 * @return the String representation of the timer value. 254 */ 255 public String toString() { 256 return XLog.format("ticks[{0}] totalAvg[{1}] ownAvg[{2}]", ticks, getTotalAvg(), getOwnAvg()); 257 } 258 259 /** 260 * Return the timer snapshot. 261 * 262 * @return the timer snapshot. 263 */ 264 public Timer getValue() { 265 try { 266 lock.lock(); 267 Timer timer = new Timer(); 268 timer.ownTime = ownTime; 269 timer.totalTime = totalTime; 270 timer.ticks = ticks; 271 timer.ownSquareTime = ownSquareTime; 272 timer.totalSquareTime = totalSquareTime; 273 timer.ownMinTime = ownMinTime; 274 timer.ownMaxTime = ownMaxTime; 275 timer.totalMinTime = totalMinTime; 276 timer.totalMaxTime = totalMaxTime; 277 return timer; 278 } 279 finally { 280 lock.unlock(); 281 } 282 } 283 284 /** 285 * Add a cron to a timer. <p/> It is project private for test purposes. 286 * 287 * @param cron Cron to add. 288 */ 289 void addCron(Cron cron) { 290 try { 291 lock.lock(); 292 long own = cron.getOwn(); 293 long total = cron.getTotal(); 294 ownTime += own; 295 totalTime += total; 296 ticks++; 297 ownSquareTime += own * own; 298 totalSquareTime += total * total; 299 if (ticks == 1) { 300 ownMinTime = own; 301 ownMaxTime = own; 302 totalMinTime = total; 303 totalMaxTime = total; 304 } 305 else { 306 ownMinTime = Math.min(ownMinTime, own); 307 ownMaxTime = Math.max(ownMaxTime, own); 308 totalMinTime = Math.min(totalMinTime, total); 309 totalMaxTime = Math.max(totalMaxTime, total); 310 } 311 } 312 finally { 313 lock.unlock(); 314 } 315 } 316 317 /** 318 * Return the own accumulated computing time by the timer. 319 * 320 * @return own accumulated computing time by the timer. 321 */ 322 public long getOwn() { 323 return ownTime; 324 } 325 326 /** 327 * Return the total accumulated computing time by the timer. 328 * 329 * @return total accumulated computing time by the timer. 330 */ 331 public long getTotal() { 332 return totalTime; 333 } 334 335 /** 336 * Return the number of times a cron was added to the timer. 337 * 338 * @return the number of times a cron was added to the timer. 339 */ 340 public long getTicks() { 341 return ticks; 342 } 343 344 /** 345 * Return the sum of the square own times. <p/> It can be used to calculate the standard deviation. 346 * 347 * @return the sum of the square own timer. 348 */ 349 public long getOwnSquareSum() { 350 return ownSquareTime; 351 } 352 353 /** 354 * Return the sum of the square total times. <p/> It can be used to calculate the standard deviation. 355 * 356 * @return the sum of the square own timer. 357 */ 358 public long getTotalSquareSum() { 359 return totalSquareTime; 360 } 361 362 /** 363 * Returns the own minimum time. 364 * 365 * @return the own minimum time. 366 */ 367 public long getOwnMin() { 368 return ownMinTime; 369 } 370 371 /** 372 * Returns the own maximum time. 373 * 374 * @return the own maximum time. 375 */ 376 public long getOwnMax() { 377 return ownMaxTime; 378 } 379 380 /** 381 * Returns the total minimum time. 382 * 383 * @return the total minimum time. 384 */ 385 public long getTotalMin() { 386 return totalMinTime; 387 } 388 389 /** 390 * Returns the total maximum time. 391 * 392 * @return the total maximum time. 393 */ 394 public long getTotalMax() { 395 return totalMaxTime; 396 } 397 398 /** 399 * Returns the own average time. 400 * 401 * @return the own average time. 402 */ 403 public long getOwnAvg() { 404 return (ticks != 0) ? ownTime / ticks : 0; 405 } 406 407 /** 408 * Returns the total average time. 409 * 410 * @return the total average time. 411 */ 412 public long getTotalAvg() { 413 return (ticks != 0) ? totalTime / ticks : 0; 414 } 415 416 /** 417 * Returns the total time standard deviation. 418 * 419 * @return the total time standard deviation. 420 */ 421 public double getTotalStdDev() { 422 return evalStdDev(ticks, totalTime, totalSquareTime); 423 } 424 425 /** 426 * Returns the own time standard deviation. 427 * 428 * @return the own time standard deviation. 429 */ 430 public double getOwnStdDev() { 431 return evalStdDev(ticks, ownTime, ownSquareTime); 432 } 433 434 private double evalStdDev(long n, long sn, long ssn) { 435 return (n < 2) ? -1 : Math.sqrt((n * ssn - sn * sn) / (n * (n - 1))); 436 } 437 438 } 439 440 /** 441 * Add a cron to an instrumentation timer. The timer is created if it does not exists. <p/> This method is thread 442 * safe. 443 * 444 * @param group timer group. 445 * @param name timer name. 446 * @param cron cron to add to the timer. 447 */ 448 public void addCron(String group, String name, Cron cron) { 449 Map<String, Element<Timer>> map = timers.get(group); 450 if (map == null) { 451 try { 452 timerLock.lock(); 453 map = timers.get(group); 454 if (map == null) { 455 map = new HashMap<String, Element<Timer>>(); 456 timers.put(group, map); 457 } 458 } 459 finally { 460 timerLock.unlock(); 461 } 462 } 463 Timer timer = (Timer) map.get(name); 464 if (timer == null) { 465 try { 466 timerLock.lock(); 467 timer = (Timer) map.get(name); 468 if (timer == null) { 469 timer = new Timer(); 470 map.put(name, timer); 471 } 472 } 473 finally { 474 timerLock.unlock(); 475 } 476 } 477 timer.addCron(cron); 478 } 479 480 /** 481 * Increment an instrumentation counter. The counter is created if it does not exists. <p/> This method is thread 482 * safe. 483 * 484 * @param group counter group. 485 * @param name counter name. 486 * @param count increment to add to the counter. 487 */ 488 public void incr(String group, String name, long count) { 489 Map<String, Element<Long>> map = counters.get(group); 490 if (map == null) { 491 try { 492 counterLock.lock(); 493 map = counters.get(group); 494 if (map == null) { 495 map = new HashMap<String, Element<Long>>(); 496 counters.put(group, map); 497 } 498 } 499 finally { 500 counterLock.unlock(); 501 } 502 } 503 Counter counter = (Counter) map.get(name); 504 if (counter == null) { 505 try { 506 counterLock.lock(); 507 counter = (Counter) map.get(name); 508 if (counter == null) { 509 counter = new Counter(); 510 map.put(name, counter); 511 } 512 } 513 finally { 514 counterLock.unlock(); 515 } 516 } 517 counter.addAndGet(count); 518 } 519 520 /** 521 * Interface for instrumentation variables. <p/> For example a the database service could expose the number of 522 * currently active connections. 523 */ 524 public interface Variable<T> extends Element<T> { 525 } 526 527 /** 528 * Add an instrumentation variable. The variable must not exist. <p/> This method is thread safe. 529 * 530 * @param group counter group. 531 * @param name counter name. 532 * @param variable variable to add. 533 */ 534 @SuppressWarnings("unchecked") 535 public void addVariable(String group, String name, Variable variable) { 536 Map<String, Element<Variable>> map = variables.get(group); 537 if (map == null) { 538 try { 539 variableLock.lock(); 540 map = variables.get(group); 541 if (map == null) { 542 map = new HashMap<String, Element<Variable>>(); 543 variables.put(group, map); 544 } 545 } 546 finally { 547 variableLock.unlock(); 548 } 549 } 550 if (map.containsKey(name)) { 551 throw new RuntimeException(XLog.format("Variable group=[{0}] name=[{1}] already defined", group, name)); 552 } 553 map.put(name, variable); 554 } 555 556 /** 557 * Return the JVM system properties. 558 * 559 * @return JVM system properties. 560 */ 561 @SuppressWarnings("unchecked") 562 public Map<String, String> getJavaSystemProperties() { 563 return (Map<String, String>) (Object) System.getProperties(); 564 } 565 566 /** 567 * Return the OS environment used to start Oozie. 568 * 569 * @return the OS environment used to start Oozie. 570 */ 571 public Map<String, String> getOSEnv() { 572 return System.getenv(); 573 } 574 575 /** 576 * Return the current system configuration as a Map<String,String>. 577 * 578 * @return the current system configuration as a Map<String,String>. 579 */ 580 public Map<String, String> getConfiguration() { 581 final Configuration maskedConf = Services.get().get(ConfigurationService.class).getMaskedConfiguration(); 582 583 return new Map<String, String>() { 584 public int size() { 585 return maskedConf.size(); 586 } 587 588 public boolean isEmpty() { 589 return maskedConf.size() == 0; 590 } 591 592 public boolean containsKey(Object o) { 593 return maskedConf.get((String) o) != null; 594 } 595 596 public boolean containsValue(Object o) { 597 throw new UnsupportedOperationException(); 598 } 599 600 public String get(Object o) { 601 return maskedConf.get((String) o); 602 } 603 604 public String put(String s, String s1) { 605 throw new UnsupportedOperationException(); 606 } 607 608 public String remove(Object o) { 609 throw new UnsupportedOperationException(); 610 } 611 612 public void putAll(Map<? extends String, ? extends String> map) { 613 throw new UnsupportedOperationException(); 614 } 615 616 public void clear() { 617 throw new UnsupportedOperationException(); 618 } 619 620 public Set<String> keySet() { 621 Set<String> set = new LinkedHashSet<String>(); 622 for (Entry<String, String> entry : maskedConf) { 623 set.add(entry.getKey()); 624 } 625 return set; 626 } 627 628 public Collection<String> values() { 629 Set<String> set = new LinkedHashSet<String>(); 630 for (Entry<String, String> entry : maskedConf) { 631 set.add(entry.getValue()); 632 } 633 return set; 634 } 635 636 public Set<Entry<String, String>> entrySet() { 637 Set<Entry<String, String>> set = new LinkedHashSet<Entry<String, String>>(); 638 for (Entry<String, String> entry : maskedConf) { 639 set.add(entry); 640 } 641 return set; 642 } 643 }; 644 } 645 646 /** 647 * Return all the counters. <p/> This method is thread safe. <p/> The counters are live. The counter value is a 648 * snapshot at the time the {@link Instrumentation.Element#getValue()} is invoked. 649 * 650 * @return all counters. 651 */ 652 public Map<String, Map<String, Element<Long>>> getCounters() { 653 return counters; 654 } 655 656 /** 657 * Return all the timers. <p/> This method is thread safe. <p/> The timers are live. Once a timer is obtained, all 658 * its values are consistent (they are snapshot at the time the {@link Instrumentation.Element#getValue()} is 659 * invoked. 660 * 661 * @return all counters. 662 */ 663 public Map<String, Map<String, Element<Timer>>> getTimers() { 664 return timers; 665 } 666 667 /** 668 * Return all the variables. <p/> This method is thread safe. <p/> The variables are live. The variable value is a 669 * snapshot at the time the {@link Instrumentation.Element#getValue()} is invoked. 670 * 671 * @return all counters. 672 */ 673 public Map<String, Map<String, Element<Variable>>> getVariables() { 674 return variables; 675 } 676 677 /** 678 * Return a map containing all variables, counters and timers. 679 * 680 * @return a map containing all variables, counters and timers. 681 */ 682 public Map<String, Map<String, Map<String, Object>>> getAll() { 683 return all; 684 } 685 686 /** 687 * Return the string representation of the instrumentation. 688 * 689 * @return the string representation of the instrumentation. 690 */ 691 public String toString() { 692 String E = System.getProperty("line.separator"); 693 StringBuilder sb = new StringBuilder(4096); 694 for (String element : all.keySet()) { 695 sb.append(element).append(':').append(E); 696 List<String> groups = new ArrayList<String>(all.get(element).keySet()); 697 Collections.sort(groups); 698 for (String group : groups) { 699 sb.append(" ").append(group).append(':').append(E); 700 List<String> names = new ArrayList<String>(all.get(element).get(group).keySet()); 701 Collections.sort(names); 702 for (String name : names) { 703 sb.append(" ").append(name).append(": ").append(((Element) all.get(element). 704 get(group).get(name)).getValue()).append(E); 705 } 706 } 707 } 708 return sb.toString(); 709 } 710 711 private static class Sampler implements Element<Double>, Runnable { 712 private Lock lock = new ReentrantLock(); 713 private int samplingInterval; 714 private Variable<Long> variable; 715 private long[] values; 716 private int current; 717 private long valuesSum; 718 private double rate; 719 720 public Sampler(int samplingPeriod, int samplingInterval, Variable<Long> variable) { 721 this.samplingInterval = samplingInterval; 722 this.variable = variable; 723 values = new long[samplingPeriod / samplingInterval]; 724 valuesSum = 0; 725 current = -1; 726 } 727 728 public int getSamplingInterval() { 729 return samplingInterval; 730 } 731 732 public void run() { 733 try { 734 lock.lock(); 735 long newValue = variable.getValue(); 736 if (current == -1) { 737 valuesSum = newValue; 738 current = 0; 739 values[current] = newValue; 740 } 741 else { 742 current = (current + 1) % values.length; 743 valuesSum = valuesSum - values[current] + newValue; 744 values[current] = newValue; 745 } 746 rate = ((double) valuesSum) / values.length; 747 } 748 finally { 749 lock.unlock(); 750 } 751 } 752 753 public Double getValue() { 754 return rate; 755 } 756 } 757 758 /** 759 * Add a sampling variable. <p/> This method is thread safe. 760 * 761 * @param group timer group. 762 * @param name timer name. 763 * @param period sampling period to compute rate. 764 * @param interval sampling frequency, how often the variable is probed. 765 * @param variable variable to sample. 766 */ 767 public void addSampler(String group, String name, int period, int interval, Variable<Long> variable) { 768 if (scheduler == null) { 769 throw new IllegalStateException("scheduler not set, cannot sample"); 770 } 771 try { 772 samplerLock.lock(); 773 Map<String, Element<Double>> map = samplers.get(group); 774 if (map == null) { 775 map = samplers.get(group); 776 if (map == null) { 777 map = new HashMap<String, Element<Double>>(); 778 samplers.put(group, map); 779 } 780 } 781 if (map.containsKey(name)) { 782 throw new RuntimeException(XLog.format("Sampler group=[{0}] name=[{1}] already defined", group, name)); 783 } 784 Sampler sampler = new Sampler(period, interval, variable); 785 map.put(name, sampler); 786 scheduler.scheduleAtFixedRate(sampler, 0, sampler.getSamplingInterval(), TimeUnit.SECONDS); 787 } 788 finally { 789 samplerLock.unlock(); 790 } 791 } 792 793 /** 794 * Return all the samplers. <p/> This method is thread safe. <p/> The samplers are live. The sampler value is a 795 * snapshot at the time the {@link Instrumentation.Element#getValue()} is invoked. 796 * 797 * @return all counters. 798 */ 799 public Map<String, Map<String, Element<Double>>> getSamplers() { 800 return samplers; 801 } 802 803}