This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.util;
019
020 import org.apache.hadoop.conf.Configuration;
021 import org.apache.oozie.service.ConfigurationService;
022
023 import java.util.ArrayList;
024 import java.util.Collection;
025 import java.util.Collections;
026 import java.util.HashMap;
027 import java.util.LinkedHashMap;
028 import java.util.LinkedHashSet;
029 import java.util.List;
030 import java.util.Map;
031 import java.util.Set;
032 import java.util.concurrent.ConcurrentHashMap;
033 import java.util.concurrent.ScheduledExecutorService;
034 import java.util.concurrent.TimeUnit;
035 import java.util.concurrent.atomic.AtomicLong;
036 import java.util.concurrent.locks.Lock;
037 import java.util.concurrent.locks.ReentrantLock;
038
039 /**
040 * Instrumentation framework that supports Timers, Counters, Variables and Sampler instrumentation elements. <p/> All
041 * instrumentation elements have a group and a name.
042 */
043 public class Instrumentation {
044 private ScheduledExecutorService scheduler;
045 private Lock counterLock;
046 private Lock timerLock;
047 private Lock variableLock;
048 private Lock samplerLock;
049 private Configuration configuration;
050 private Map<String, Map<String, Map<String, Object>>> all;
051 private Map<String, Map<String, Element<Long>>> counters;
052 private Map<String, Map<String, Element<Timer>>> timers;
053 private Map<String, Map<String, Element<Variable>>> variables;
054 private Map<String, Map<String, Element<Double>>> samplers;
055
056 /**
057 * Instrumentation constructor.
058 */
059 @SuppressWarnings("unchecked")
060 public Instrumentation() {
061 counterLock = new ReentrantLock();
062 timerLock = new ReentrantLock();
063 variableLock = new ReentrantLock();
064 samplerLock = new ReentrantLock();
065 all = new LinkedHashMap<String, Map<String, Map<String, Object>>>();
066 counters = new ConcurrentHashMap<String, Map<String, Element<Long>>>();
067 timers = new ConcurrentHashMap<String, Map<String, Element<Timer>>>();
068 variables = new ConcurrentHashMap<String, Map<String, Element<Variable>>>();
069 samplers = new ConcurrentHashMap<String, Map<String, Element<Double>>>();
070 all.put("variables", (Map<String, Map<String, Object>>) (Object) variables);
071 all.put("samplers", (Map<String, Map<String, Object>>) (Object) samplers);
072 all.put("counters", (Map<String, Map<String, Object>>) (Object) counters);
073 all.put("timers", (Map<String, Map<String, Object>>) (Object) timers);
074 }
075
076 /**
077 * Set the scheduler instance to handle the samplers.
078 *
079 * @param scheduler scheduler instance.
080 */
081 public void setScheduler(ScheduledExecutorService scheduler) {
082 this.scheduler = scheduler;
083 }
084
085 /**
086 * Cron is a stopwatch that can be started/stopped several times. <p/> This class is not thread safe, it does not
087 * need to be. <p/> It keeps track of the total time (first start to last stop) and the running time (total time
088 * minus the stopped intervals). <p/> Once a Cron is complete it must be added to the corresponding group/name in a
089 * Instrumentation instance.
090 */
091 public static class Cron {
092 private long start;
093 private long end;
094 private long lapStart;
095 private long own;
096 private long total;
097 private boolean running;
098
099 /**
100 * Creates new Cron, stopped, in zero.
101 */
102 public Cron() {
103 running = false;
104 }
105
106 /**
107 * Start the cron. It cannot be already started.
108 */
109 public void start() {
110 if (!running) {
111 if (lapStart == 0) {
112 lapStart = System.currentTimeMillis();
113 if (start == 0) {
114 start = lapStart;
115 end = start;
116 }
117 }
118 running = true;
119 }
120 }
121
122 /**
123 * Stops the cron. It cannot be already stopped.
124 */
125 public void stop() {
126 if (running) {
127 end = System.currentTimeMillis();
128 if (start == 0) {
129 start = end;
130 }
131 total = end - start;
132 if (lapStart > 0) {
133 own += end - lapStart;
134 lapStart = 0;
135 }
136 running = false;
137 }
138 }
139
140 /**
141 * Return the start time of the cron. It must be stopped.
142 *
143 * @return the start time of the cron.
144 */
145 public long getStart() {
146 if (running) {
147 throw new IllegalStateException("Timer running");
148 }
149 return start;
150 }
151
152 /**
153 * Return the end time of the cron. It must be stopped.
154 *
155 * @return the end time of the cron.
156 */
157 public long getEnd() {
158 if (running) {
159 throw new IllegalStateException("Timer running");
160 }
161 return end;
162 }
163
164 /**
165 * Return the total time of the cron. It must be stopped.
166 *
167 * @return the total time of the cron.
168 */
169 public long getTotal() {
170 if (running) {
171 throw new IllegalStateException("Timer running");
172 }
173 return total;
174 }
175
176 /**
177 * Return the own time of the cron. It must be stopped.
178 *
179 * @return the own time of the cron.
180 */
181 public long getOwn() {
182 if (running) {
183 throw new IllegalStateException("Timer running");
184 }
185 return own;
186 }
187
188 }
189
190 /**
191 * Gives access to a snapshot of an Instrumentation element (Counter, Timer). <p/> Instrumentation element snapshots
192 * are returned by the {@link Instrumentation#getCounters()} and {@link Instrumentation#getTimers()} ()} methods.
193 */
194 public interface Element<T> {
195
196 /**
197 * Return the snapshot value of the Intrumentation element.
198 *
199 * @return the snapshot value of the Intrumentation element.
200 */
201 T getValue();
202 }
203
204 /**
205 * Counter Instrumentation element.
206 */
207 private static class Counter extends AtomicLong implements Element<Long> {
208
209 /**
210 * Return the counter snapshot.
211 *
212 * @return the counter snapshot.
213 */
214 public Long getValue() {
215 return get();
216 }
217
218 /**
219 * Return the String representation of the counter value.
220 *
221 * @return the String representation of the counter value.
222 */
223 public String toString() {
224 return Long.toString(get());
225 }
226
227 }
228
229 /**
230 * Timer Instrumentation element.
231 */
232 public static class Timer implements Element<Timer> {
233 Lock lock = new ReentrantLock();
234 private long ownTime;
235 private long totalTime;
236 private long ticks;
237 private long ownSquareTime;
238 private long totalSquareTime;
239 private long ownMinTime;
240 private long ownMaxTime;
241 private long totalMinTime;
242 private long totalMaxTime;
243
244 /**
245 * Timer constructor. <p/> It is project private for test purposes.
246 */
247 Timer() {
248 }
249
250 /**
251 * Return the String representation of the timer value.
252 *
253 * @return the String representation of the timer value.
254 */
255 public String toString() {
256 return XLog.format("ticks[{0}] totalAvg[{1}] ownAvg[{2}]", ticks, getTotalAvg(), getOwnAvg());
257 }
258
259 /**
260 * Return the timer snapshot.
261 *
262 * @return the timer snapshot.
263 */
264 public Timer getValue() {
265 try {
266 lock.lock();
267 Timer timer = new Timer();
268 timer.ownTime = ownTime;
269 timer.totalTime = totalTime;
270 timer.ticks = ticks;
271 timer.ownSquareTime = ownSquareTime;
272 timer.totalSquareTime = totalSquareTime;
273 timer.ownMinTime = ownMinTime;
274 timer.ownMaxTime = ownMaxTime;
275 timer.totalMinTime = totalMinTime;
276 timer.totalMaxTime = totalMaxTime;
277 return timer;
278 }
279 finally {
280 lock.unlock();
281 }
282 }
283
284 /**
285 * Add a cron to a timer. <p/> It is project private for test purposes.
286 *
287 * @param cron Cron to add.
288 */
289 void addCron(Cron cron) {
290 try {
291 lock.lock();
292 long own = cron.getOwn();
293 long total = cron.getTotal();
294 ownTime += own;
295 totalTime += total;
296 ticks++;
297 ownSquareTime += own * own;
298 totalSquareTime += total * total;
299 if (ticks == 1) {
300 ownMinTime = own;
301 ownMaxTime = own;
302 totalMinTime = total;
303 totalMaxTime = total;
304 }
305 else {
306 ownMinTime = Math.min(ownMinTime, own);
307 ownMaxTime = Math.max(ownMaxTime, own);
308 totalMinTime = Math.min(totalMinTime, total);
309 totalMaxTime = Math.max(totalMaxTime, total);
310 }
311 }
312 finally {
313 lock.unlock();
314 }
315 }
316
317 /**
318 * Return the own accumulated computing time by the timer.
319 *
320 * @return own accumulated computing time by the timer.
321 */
322 public long getOwn() {
323 return ownTime;
324 }
325
326 /**
327 * Return the total accumulated computing time by the timer.
328 *
329 * @return total accumulated computing time by the timer.
330 */
331 public long getTotal() {
332 return totalTime;
333 }
334
335 /**
336 * Return the number of times a cron was added to the timer.
337 *
338 * @return the number of times a cron was added to the timer.
339 */
340 public long getTicks() {
341 return ticks;
342 }
343
344 /**
345 * Return the sum of the square own times. <p/> It can be used to calculate the standard deviation.
346 *
347 * @return the sum of the square own timer.
348 */
349 public long getOwnSquareSum() {
350 return ownSquareTime;
351 }
352
353 /**
354 * Return the sum of the square total times. <p/> It can be used to calculate the standard deviation.
355 *
356 * @return the sum of the square own timer.
357 */
358 public long getTotalSquareSum() {
359 return totalSquareTime;
360 }
361
362 /**
363 * Returns the own minimum time.
364 *
365 * @return the own minimum time.
366 */
367 public long getOwnMin() {
368 return ownMinTime;
369 }
370
371 /**
372 * Returns the own maximum time.
373 *
374 * @return the own maximum time.
375 */
376 public long getOwnMax() {
377 return ownMaxTime;
378 }
379
380 /**
381 * Returns the total minimum time.
382 *
383 * @return the total minimum time.
384 */
385 public long getTotalMin() {
386 return totalMinTime;
387 }
388
389 /**
390 * Returns the total maximum time.
391 *
392 * @return the total maximum time.
393 */
394 public long getTotalMax() {
395 return totalMaxTime;
396 }
397
398 /**
399 * Returns the own average time.
400 *
401 * @return the own average time.
402 */
403 public long getOwnAvg() {
404 return (ticks != 0) ? ownTime / ticks : 0;
405 }
406
407 /**
408 * Returns the total average time.
409 *
410 * @return the total average time.
411 */
412 public long getTotalAvg() {
413 return (ticks != 0) ? totalTime / ticks : 0;
414 }
415
416 /**
417 * Returns the total time standard deviation.
418 *
419 * @return the total time standard deviation.
420 */
421 public double getTotalStdDev() {
422 return evalStdDev(ticks, totalTime, totalSquareTime);
423 }
424
425 /**
426 * Returns the own time standard deviation.
427 *
428 * @return the own time standard deviation.
429 */
430 public double getOwnStdDev() {
431 return evalStdDev(ticks, ownTime, ownSquareTime);
432 }
433
434 private double evalStdDev(long n, long sn, long ssn) {
435 return (n < 2) ? -1 : Math.sqrt((n * ssn - sn * sn) / (n * (n - 1)));
436 }
437
438 }
439
440 /**
441 * Add a cron to an instrumentation timer. The timer is created if it does not exists. <p/> This method is thread
442 * safe.
443 *
444 * @param group timer group.
445 * @param name timer name.
446 * @param cron cron to add to the timer.
447 */
448 public void addCron(String group, String name, Cron cron) {
449 Map<String, Element<Timer>> map = timers.get(group);
450 if (map == null) {
451 try {
452 timerLock.lock();
453 map = timers.get(group);
454 if (map == null) {
455 map = new HashMap<String, Element<Timer>>();
456 timers.put(group, map);
457 }
458 }
459 finally {
460 timerLock.unlock();
461 }
462 }
463 Timer timer = (Timer) map.get(name);
464 if (timer == null) {
465 try {
466 timerLock.lock();
467 timer = (Timer) map.get(name);
468 if (timer == null) {
469 timer = new Timer();
470 map.put(name, timer);
471 }
472 }
473 finally {
474 timerLock.unlock();
475 }
476 }
477 timer.addCron(cron);
478 }
479
480 /**
481 * Increment an instrumentation counter. The counter is created if it does not exists. <p/> This method is thread
482 * safe.
483 *
484 * @param group counter group.
485 * @param name counter name.
486 * @param count increment to add to the counter.
487 */
488 public void incr(String group, String name, long count) {
489 Map<String, Element<Long>> map = counters.get(group);
490 if (map == null) {
491 try {
492 counterLock.lock();
493 map = counters.get(group);
494 if (map == null) {
495 map = new HashMap<String, Element<Long>>();
496 counters.put(group, map);
497 }
498 }
499 finally {
500 counterLock.unlock();
501 }
502 }
503 Counter counter = (Counter) map.get(name);
504 if (counter == null) {
505 try {
506 counterLock.lock();
507 counter = (Counter) map.get(name);
508 if (counter == null) {
509 counter = new Counter();
510 map.put(name, counter);
511 }
512 }
513 finally {
514 counterLock.unlock();
515 }
516 }
517 counter.addAndGet(count);
518 }
519
520 /**
521 * Interface for instrumentation variables. <p/> For example a the database service could expose the number of
522 * currently active connections.
523 */
524 public interface Variable<T> extends Element<T> {
525 }
526
527 /**
528 * Add an instrumentation variable. The variable must not exist. <p/> This method is thread safe.
529 *
530 * @param group counter group.
531 * @param name counter name.
532 * @param variable variable to add.
533 */
534 @SuppressWarnings("unchecked")
535 public void addVariable(String group, String name, Variable variable) {
536 Map<String, Element<Variable>> map = variables.get(group);
537 if (map == null) {
538 try {
539 variableLock.lock();
540 map = variables.get(group);
541 if (map == null) {
542 map = new HashMap<String, Element<Variable>>();
543 variables.put(group, map);
544 }
545 }
546 finally {
547 variableLock.unlock();
548 }
549 }
550 if (map.containsKey(name)) {
551 throw new RuntimeException(XLog.format("Variable group=[{0}] name=[{1}] already defined", group, name));
552 }
553 map.put(name, variable);
554 }
555
556 /**
557 * Set the system configuration.
558 *
559 * @param configuration system configuration.
560 */
561 public void setConfiguration(Configuration configuration) {
562 this.configuration = configuration;
563 }
564
565 /**
566 * Return the JVM system properties.
567 *
568 * @return JVM system properties.
569 */
570 @SuppressWarnings("unchecked")
571 public Map<String, String> getJavaSystemProperties() {
572 return (Map<String, String>) (Object) System.getProperties();
573 }
574
575 /**
576 * Return the OS environment used to start Oozie.
577 *
578 * @return the OS environment used to start Oozie.
579 */
580 public Map<String, String> getOSEnv() {
581 return System.getenv();
582 }
583
584 /**
585 * Return the current system configuration as a Map<String,String>.
586 *
587 * @return the current system configuration as a Map<String,String>.
588 */
589 public Map<String, String> getConfiguration() {
590 final Configuration maskedConf = ConfigurationService.maskPasswords(configuration);
591
592 return new Map<String, String>() {
593 public int size() {
594 return maskedConf.size();
595 }
596
597 public boolean isEmpty() {
598 return maskedConf.size() == 0;
599 }
600
601 public boolean containsKey(Object o) {
602 return maskedConf.get((String) o) != null;
603 }
604
605 public boolean containsValue(Object o) {
606 throw new UnsupportedOperationException();
607 }
608
609 public String get(Object o) {
610 return maskedConf.get((String) o);
611 }
612
613 public String put(String s, String s1) {
614 throw new UnsupportedOperationException();
615 }
616
617 public String remove(Object o) {
618 throw new UnsupportedOperationException();
619 }
620
621 public void putAll(Map<? extends String, ? extends String> map) {
622 throw new UnsupportedOperationException();
623 }
624
625 public void clear() {
626 throw new UnsupportedOperationException();
627 }
628
629 public Set<String> keySet() {
630 Set<String> set = new LinkedHashSet<String>();
631 for (Entry<String, String> entry : maskedConf) {
632 set.add(entry.getKey());
633 }
634 return set;
635 }
636
637 public Collection<String> values() {
638 Set<String> set = new LinkedHashSet<String>();
639 for (Entry<String, String> entry : maskedConf) {
640 set.add(entry.getValue());
641 }
642 return set;
643 }
644
645 public Set<Entry<String, String>> entrySet() {
646 Set<Entry<String, String>> set = new LinkedHashSet<Entry<String, String>>();
647 for (Entry<String, String> entry : maskedConf) {
648 set.add(entry);
649 }
650 return set;
651 }
652 };
653 }
654
655 /**
656 * Return all the counters. <p/> This method is thread safe. <p/> The counters are live. The counter value is a
657 * snapshot at the time the {@link Instrumentation.Element#getValue()} is invoked.
658 *
659 * @return all counters.
660 */
661 public Map<String, Map<String, Element<Long>>> getCounters() {
662 return counters;
663 }
664
665 /**
666 * Return all the timers. <p/> This method is thread safe. <p/> The timers are live. Once a timer is obtained, all
667 * its values are consistent (they are snapshot at the time the {@link Instrumentation.Element#getValue()} is
668 * invoked.
669 *
670 * @return all counters.
671 */
672 public Map<String, Map<String, Element<Timer>>> getTimers() {
673 return timers;
674 }
675
676 /**
677 * Return all the variables. <p/> This method is thread safe. <p/> The variables are live. The variable value is a
678 * snapshot at the time the {@link Instrumentation.Element#getValue()} is invoked.
679 *
680 * @return all counters.
681 */
682 public Map<String, Map<String, Element<Variable>>> getVariables() {
683 return variables;
684 }
685
686 /**
687 * Return a map containing all variables, counters and timers.
688 *
689 * @return a map containing all variables, counters and timers.
690 */
691 public Map<String, Map<String, Map<String, Object>>> getAll() {
692 return all;
693 }
694
695 /**
696 * Return the string representation of the instrumentation.
697 *
698 * @return the string representation of the instrumentation.
699 */
700 public String toString() {
701 String E = System.getProperty("line.separator");
702 StringBuilder sb = new StringBuilder(4096);
703 for (String element : all.keySet()) {
704 sb.append(element).append(':').append(E);
705 List<String> groups = new ArrayList<String>(all.get(element).keySet());
706 Collections.sort(groups);
707 for (String group : groups) {
708 sb.append(" ").append(group).append(':').append(E);
709 List<String> names = new ArrayList<String>(all.get(element).get(group).keySet());
710 Collections.sort(names);
711 for (String name : names) {
712 sb.append(" ").append(name).append(": ").append(((Element) all.get(element).
713 get(group).get(name)).getValue()).append(E);
714 }
715 }
716 }
717 return sb.toString();
718 }
719
720 private static class Sampler implements Element<Double>, Runnable {
721 private Lock lock = new ReentrantLock();
722 private int samplingInterval;
723 private Variable<Long> variable;
724 private long[] values;
725 private int current;
726 private long valuesSum;
727 private double rate;
728
729 public Sampler(int samplingPeriod, int samplingInterval, Variable<Long> variable) {
730 this.samplingInterval = samplingInterval;
731 this.variable = variable;
732 values = new long[samplingPeriod / samplingInterval];
733 valuesSum = 0;
734 current = -1;
735 }
736
737 public int getSamplingInterval() {
738 return samplingInterval;
739 }
740
741 public void run() {
742 try {
743 lock.lock();
744 long newValue = variable.getValue();
745 if (current == -1) {
746 valuesSum = newValue;
747 current = 0;
748 values[current] = newValue;
749 }
750 else {
751 current = (current + 1) % values.length;
752 valuesSum = valuesSum - values[current] + newValue;
753 values[current] = newValue;
754 }
755 rate = ((double) valuesSum) / values.length;
756 }
757 finally {
758 lock.unlock();
759 }
760 }
761
762 public Double getValue() {
763 return rate;
764 }
765 }
766
767 /**
768 * Add a sampling variable. <p/> This method is thread safe.
769 *
770 * @param group timer group.
771 * @param name timer name.
772 * @param period sampling period to compute rate.
773 * @param interval sampling frequency, how often the variable is probed.
774 * @param variable variable to sample.
775 */
776 public void addSampler(String group, String name, int period, int interval, Variable<Long> variable) {
777 if (scheduler == null) {
778 throw new IllegalStateException("scheduler not set, cannot sample");
779 }
780 try {
781 samplerLock.lock();
782 Map<String, Element<Double>> map = samplers.get(group);
783 if (map == null) {
784 map = samplers.get(group);
785 if (map == null) {
786 map = new HashMap<String, Element<Double>>();
787 samplers.put(group, map);
788 }
789 }
790 if (map.containsKey(name)) {
791 throw new RuntimeException(XLog.format("Sampler group=[{0}] name=[{1}] already defined", group, name));
792 }
793 Sampler sampler = new Sampler(period, interval, variable);
794 map.put(name, sampler);
795 scheduler.scheduleAtFixedRate(sampler, 0, sampler.getSamplingInterval(), TimeUnit.SECONDS);
796 }
797 finally {
798 samplerLock.unlock();
799 }
800 }
801
802 /**
803 * Return all the samplers. <p/> This method is thread safe. <p/> The samplers are live. The sampler value is a
804 * snapshot at the time the {@link Instrumentation.Element#getValue()} is invoked.
805 *
806 * @return all counters.
807 */
808 public Map<String, Map<String, Element<Double>>> getSamplers() {
809 return samplers;
810 }
811
812 }