This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.util;
020
021import org.apache.hadoop.conf.Configuration;
022import org.apache.oozie.service.ConfigurationService;
023import org.apache.oozie.service.Services;
024
025import java.util.ArrayList;
026import java.util.Collection;
027import java.util.Collections;
028import java.util.HashMap;
029import java.util.LinkedHashMap;
030import java.util.LinkedHashSet;
031import java.util.List;
032import java.util.Map;
033import java.util.Set;
034import java.util.concurrent.ConcurrentHashMap;
035import java.util.concurrent.ScheduledExecutorService;
036import java.util.concurrent.TimeUnit;
037import java.util.concurrent.atomic.AtomicLong;
038import java.util.concurrent.locks.Lock;
039import java.util.concurrent.locks.ReentrantLock;
040
041/**
042 * Instrumentation framework that supports Timers, Counters, Variables and Sampler instrumentation elements. <p/> All
043 * instrumentation elements have a group and a name.
044 */
045public class Instrumentation {
046    private ScheduledExecutorService scheduler;
047    private Lock counterLock;
048    private Lock timerLock;
049    private Lock variableLock;
050    private Lock samplerLock;
051    private Map<String, Map<String, Map<String, Object>>> all;
052    private Map<String, Map<String, Element<Long>>> counters;
053    private Map<String, Map<String, Element<Timer>>> timers;
054    private Map<String, Map<String, Element<Variable>>> variables;
055    private Map<String, Map<String, Element<Double>>> samplers;
056
057    /**
058     * Instrumentation constructor.
059     */
060    @SuppressWarnings("unchecked")
061    public Instrumentation() {
062        counterLock = new ReentrantLock();
063        timerLock = new ReentrantLock();
064        variableLock = new ReentrantLock();
065        samplerLock = new ReentrantLock();
066        all = new LinkedHashMap<String, Map<String, Map<String, Object>>>();
067        counters = new ConcurrentHashMap<String, Map<String, Element<Long>>>();
068        timers = new ConcurrentHashMap<String, Map<String, Element<Timer>>>();
069        variables = new ConcurrentHashMap<String, Map<String, Element<Variable>>>();
070        samplers = new ConcurrentHashMap<String, Map<String, Element<Double>>>();
071        all.put("variables", (Map<String, Map<String, Object>>) (Object) variables);
072        all.put("samplers", (Map<String, Map<String, Object>>) (Object) samplers);
073        all.put("counters", (Map<String, Map<String, Object>>) (Object) counters);
074        all.put("timers", (Map<String, Map<String, Object>>) (Object) timers);
075    }
076
077    /**
078     * Set the scheduler instance to handle the samplers.
079     *
080     * @param scheduler scheduler instance.
081     */
082    public void setScheduler(ScheduledExecutorService scheduler) {
083        this.scheduler = scheduler;
084    }
085
086    /**
087     * Cron is a stopwatch that can be started/stopped several times. <p/> This class is not thread safe, it does not
088     * need to be. <p/> It keeps track of the total time (first start to last stop) and the running time (total time
089     * minus the stopped intervals). <p/> Once a Cron is complete it must be added to the corresponding group/name in a
090     * Instrumentation instance.
091     */
092    public static class Cron {
093        private long start;
094        private long end;
095        private long lapStart;
096        private long own;
097        private long total;
098        private boolean running;
099
100        /**
101         * Creates new Cron, stopped, in zero.
102         */
103        public Cron() {
104            running = false;
105        }
106
107        /**
108         * Start the cron. It cannot be already started.
109         */
110        public void start() {
111            if (!running) {
112                if (lapStart == 0) {
113                    lapStart = System.currentTimeMillis();
114                    if (start == 0) {
115                        start = lapStart;
116                        end = start;
117                    }
118                }
119                running = true;
120            }
121        }
122
123        /**
124         * Stops the cron. It cannot be already stopped.
125         */
126        public void stop() {
127            if (running) {
128                end = System.currentTimeMillis();
129                if (start == 0) {
130                    start = end;
131                }
132                total = end - start;
133                if (lapStart > 0) {
134                    own += end - lapStart;
135                    lapStart = 0;
136                }
137                running = false;
138            }
139        }
140
141        /**
142         * Return the start time of the cron. It must be stopped.
143         *
144         * @return the start time of the cron.
145         */
146        public long getStart() {
147            if (running) {
148                throw new IllegalStateException("Timer running");
149            }
150            return start;
151        }
152
153        /**
154         * Return the end time of the cron.  It must be stopped.
155         *
156         * @return the end time of the cron.
157         */
158        public long getEnd() {
159            if (running) {
160                throw new IllegalStateException("Timer running");
161            }
162            return end;
163        }
164
165        /**
166         * Return the total time of the cron. It must be stopped.
167         *
168         * @return the total time of the cron.
169         */
170        public long getTotal() {
171            if (running) {
172                throw new IllegalStateException("Timer running");
173            }
174            return total;
175        }
176
177        /**
178         * Return the own time of the cron. It must be stopped.
179         *
180         * @return the own time of the cron.
181         */
182        public long getOwn() {
183            if (running) {
184                throw new IllegalStateException("Timer running");
185            }
186            return own;
187        }
188
189    }
190
191    /**
192     * Gives access to a snapshot of an Instrumentation element (Counter, Timer). <p/> Instrumentation element snapshots
193     * are returned by the {@link Instrumentation#getCounters()} and {@link Instrumentation#getTimers()} ()} methods.
194     */
195    public interface Element<T> {
196
197        /**
198         * Return the snapshot value of the Intrumentation element.
199         *
200         * @return the snapshot value of the Intrumentation element.
201         */
202        T getValue();
203    }
204
205    /**
206     * Counter Instrumentation element.
207     */
208    private static class Counter extends AtomicLong implements Element<Long> {
209
210        /**
211         * Return the counter snapshot.
212         *
213         * @return the counter snapshot.
214         */
215        public Long getValue() {
216            return get();
217        }
218
219        /**
220         * Return the String representation of the counter value.
221         *
222         * @return the String representation of the counter value.
223         */
224        public String toString() {
225            return Long.toString(get());
226        }
227
228    }
229
230    /**
231     * Timer Instrumentation element.
232     */
233    public static class Timer implements Element<Timer> {
234        Lock lock = new ReentrantLock();
235        private long ownTime;
236        private long totalTime;
237        private long ticks;
238        private long ownSquareTime;
239        private long totalSquareTime;
240        private long ownMinTime;
241        private long ownMaxTime;
242        private long totalMinTime;
243        private long totalMaxTime;
244
245        /**
246         * Timer constructor. <p/> It is project private for test purposes.
247         */
248        Timer() {
249        }
250
251        /**
252         * Return the String representation of the timer value.
253         *
254         * @return the String representation of the timer value.
255         */
256        public String toString() {
257            return XLog.format("ticks[{0}] totalAvg[{1}] ownAvg[{2}]", ticks, getTotalAvg(), getOwnAvg());
258        }
259
260        /**
261         * Return the timer snapshot.
262         *
263         * @return the timer snapshot.
264         */
265        public Timer getValue() {
266            try {
267                lock.lock();
268                Timer timer = new Timer();
269                timer.ownTime = ownTime;
270                timer.totalTime = totalTime;
271                timer.ticks = ticks;
272                timer.ownSquareTime = ownSquareTime;
273                timer.totalSquareTime = totalSquareTime;
274                timer.ownMinTime = ownMinTime;
275                timer.ownMaxTime = ownMaxTime;
276                timer.totalMinTime = totalMinTime;
277                timer.totalMaxTime = totalMaxTime;
278                return timer;
279            }
280            finally {
281                lock.unlock();
282            }
283        }
284
285        /**
286         * Add a cron to a timer. <p/> It is project private for test purposes.
287         *
288         * @param cron Cron to add.
289         */
290        void addCron(Cron cron) {
291            try {
292                lock.lock();
293                long own = cron.getOwn();
294                long total = cron.getTotal();
295                ownTime += own;
296                totalTime += total;
297                ticks++;
298                ownSquareTime += own * own;
299                totalSquareTime += total * total;
300                if (ticks == 1) {
301                    ownMinTime = own;
302                    ownMaxTime = own;
303                    totalMinTime = total;
304                    totalMaxTime = total;
305                }
306                else {
307                    ownMinTime = Math.min(ownMinTime, own);
308                    ownMaxTime = Math.max(ownMaxTime, own);
309                    totalMinTime = Math.min(totalMinTime, total);
310                    totalMaxTime = Math.max(totalMaxTime, total);
311                }
312            }
313            finally {
314                lock.unlock();
315            }
316        }
317
318        /**
319         * Return the own accumulated computing time by the timer.
320         *
321         * @return own accumulated computing time by the timer.
322         */
323        public long getOwn() {
324            return ownTime;
325        }
326
327        /**
328         * Return the total accumulated computing time by the timer.
329         *
330         * @return total accumulated computing time by the timer.
331         */
332        public long getTotal() {
333            return totalTime;
334        }
335
336        /**
337         * Return the number of times a cron was added to the timer.
338         *
339         * @return the number of times a cron was added to the timer.
340         */
341        public long getTicks() {
342            return ticks;
343        }
344
345        /**
346         * Return the sum of the square own times. <p/> It can be used to calculate the standard deviation.
347         *
348         * @return the sum of the square own timer.
349         */
350        public long getOwnSquareSum() {
351            return ownSquareTime;
352        }
353
354        /**
355         * Return the sum of the square total times. <p/> It can be used to calculate the standard deviation.
356         *
357         * @return the sum of the square own timer.
358         */
359        public long getTotalSquareSum() {
360            return totalSquareTime;
361        }
362
363        /**
364         * Returns the own minimum time.
365         *
366         * @return the own minimum time.
367         */
368        public long getOwnMin() {
369            return ownMinTime;
370        }
371
372        /**
373         * Returns the own maximum time.
374         *
375         * @return the own maximum time.
376         */
377        public long getOwnMax() {
378            return ownMaxTime;
379        }
380
381        /**
382         * Returns the total minimum time.
383         *
384         * @return the total minimum time.
385         */
386        public long getTotalMin() {
387            return totalMinTime;
388        }
389
390        /**
391         * Returns the total maximum time.
392         *
393         * @return the total maximum time.
394         */
395        public long getTotalMax() {
396            return totalMaxTime;
397        }
398
399        /**
400         * Returns the own average time.
401         *
402         * @return the own average time.
403         */
404        public long getOwnAvg() {
405            return (ticks != 0) ? ownTime / ticks : 0;
406        }
407
408        /**
409         * Returns the total average time.
410         *
411         * @return the total average time.
412         */
413        public long getTotalAvg() {
414            return (ticks != 0) ? totalTime / ticks : 0;
415        }
416
417        /**
418         * Returns the total time standard deviation.
419         *
420         * @return the total time standard deviation.
421         */
422        public double getTotalStdDev() {
423            return evalStdDev(ticks, totalTime, totalSquareTime);
424        }
425
426        /**
427         * Returns the own time standard deviation.
428         *
429         * @return the own time standard deviation.
430         */
431        public double getOwnStdDev() {
432            return evalStdDev(ticks, ownTime, ownSquareTime);
433        }
434
435        private double evalStdDev(long n, long sn, long ssn) {
436            return (n < 2) ? -1 : Math.sqrt((n * ssn - sn * sn) / (n * (n - 1)));
437        }
438
439    }
440
441    /**
442     * Add a cron to an instrumentation timer. The timer is created if it does not exists. <p/> This method is thread
443     * safe.
444     *
445     * @param group timer group.
446     * @param name timer name.
447     * @param cron cron to add to the timer.
448     */
449    public void addCron(String group, String name, Cron cron) {
450        Map<String, Element<Timer>> map = timers.get(group);
451        if (map == null) {
452            try {
453                timerLock.lock();
454                map = timers.get(group);
455                if (map == null) {
456                    map = new HashMap<String, Element<Timer>>();
457                    timers.put(group, map);
458                }
459            }
460            finally {
461                timerLock.unlock();
462            }
463        }
464        Timer timer = (Timer) map.get(name);
465        if (timer == null) {
466            try {
467                timerLock.lock();
468                timer = (Timer) map.get(name);
469                if (timer == null) {
470                    timer = new Timer();
471                    map.put(name, timer);
472                }
473            }
474            finally {
475                timerLock.unlock();
476            }
477        }
478        timer.addCron(cron);
479    }
480
481    /**
482     * Increment an instrumentation counter. The counter is created if it does not exists. <p/> This method is thread
483     * safe.
484     *
485     * @param group counter group.
486     * @param name counter name.
487     * @param count increment to add to the counter.
488     */
489    public void incr(String group, String name, long count) {
490        Map<String, Element<Long>> map = counters.get(group);
491        if (map == null) {
492            try {
493                counterLock.lock();
494                map = counters.get(group);
495                if (map == null) {
496                    map = new HashMap<String, Element<Long>>();
497                    counters.put(group, map);
498                }
499            }
500            finally {
501                counterLock.unlock();
502            }
503        }
504        Counter counter = (Counter) map.get(name);
505        if (counter == null) {
506            try {
507                counterLock.lock();
508                counter = (Counter) map.get(name);
509                if (counter == null) {
510                    counter = new Counter();
511                    map.put(name, counter);
512                }
513            }
514            finally {
515                counterLock.unlock();
516            }
517        }
518        counter.addAndGet(count);
519    }
520
521    /**
522     * Interface for instrumentation variables. <p/> For example a the database service could expose the number of
523     * currently active connections.
524     */
525    public interface Variable<T> extends Element<T> {
526    }
527
528    /**
529     * Add an instrumentation variable. The variable must not exist. <p/> This method is thread safe.
530     *
531     * @param group counter group.
532     * @param name counter name.
533     * @param variable variable to add.
534     */
535    @SuppressWarnings("unchecked")
536    public void addVariable(String group, String name, Variable variable) {
537        Map<String, Element<Variable>> map = variables.get(group);
538        if (map == null) {
539            try {
540                variableLock.lock();
541                map = variables.get(group);
542                if (map == null) {
543                    map = new HashMap<String, Element<Variable>>();
544                    variables.put(group, map);
545                }
546            }
547            finally {
548                variableLock.unlock();
549            }
550        }
551        if (map.containsKey(name)) {
552            throw new RuntimeException(XLog.format("Variable group=[{0}] name=[{1}] already defined", group, name));
553        }
554        map.put(name, variable);
555    }
556
557    /**
558     * Return the JVM system properties.
559     *
560     * @return JVM system properties.
561     */
562    @SuppressWarnings("unchecked")
563    public Map<String, String> getJavaSystemProperties() {
564        return (Map<String, String>) (Object) System.getProperties();
565    }
566
567    /**
568     * Return the OS environment used to start Oozie.
569     *
570     * @return the OS environment used to start Oozie.
571     */
572    public Map<String, String> getOSEnv() {
573        return System.getenv();
574    }
575
576    /**
577     * Return the current system configuration as a Map<String,String>.
578     *
579     * @return the current system configuration as a Map<String,String>.
580     */
581    public Map<String, String> getConfiguration() {
582        final Configuration maskedConf = Services.get().get(ConfigurationService.class).getMaskedConfiguration();
583
584        return new Map<String, String>() {
585            public int size() {
586                return maskedConf.size();
587            }
588
589            public boolean isEmpty() {
590                return maskedConf.size() == 0;
591            }
592
593            public boolean containsKey(Object o) {
594                return maskedConf.get((String) o) != null;
595            }
596
597            public boolean containsValue(Object o) {
598                throw new UnsupportedOperationException();
599            }
600
601            public String get(Object o) {
602                return maskedConf.get((String) o);
603            }
604
605            public String put(String s, String s1) {
606                throw new UnsupportedOperationException();
607            }
608
609            public String remove(Object o) {
610                throw new UnsupportedOperationException();
611            }
612
613            public void putAll(Map<? extends String, ? extends String> map) {
614                throw new UnsupportedOperationException();
615            }
616
617            public void clear() {
618                throw new UnsupportedOperationException();
619            }
620
621            public Set<String> keySet() {
622                Set<String> set = new LinkedHashSet<String>();
623                for (Entry<String, String> entry : maskedConf) {
624                    set.add(entry.getKey());
625                }
626                return set;
627            }
628
629            public Collection<String> values() {
630                Set<String> set = new LinkedHashSet<String>();
631                for (Entry<String, String> entry : maskedConf) {
632                    set.add(entry.getValue());
633                }
634                return set;
635            }
636
637            public Set<Entry<String, String>> entrySet() {
638                Set<Entry<String, String>> set = new LinkedHashSet<Entry<String, String>>();
639                for (Entry<String, String> entry : maskedConf) {
640                    set.add(entry);
641                }
642                return set;
643            }
644        };
645    }
646
647    /**
648     * Return all the counters. <p/> This method is thread safe. <p/> The counters are live. The counter value is a
649     * snapshot at the time the {@link Instrumentation.Element#getValue()} is invoked.
650     *
651     * @return all counters.
652     */
653    public Map<String, Map<String, Element<Long>>> getCounters() {
654        return counters;
655    }
656
657    /**
658     * Return all the timers. <p/> This method is thread safe. <p/> The timers are live. Once a timer is obtained, all
659     * its values are consistent (they are snapshot at the time the {@link Instrumentation.Element#getValue()} is
660     * invoked.
661     *
662     * @return all counters.
663     */
664    public Map<String, Map<String, Element<Timer>>> getTimers() {
665        return timers;
666    }
667
668    /**
669     * Return all the variables. <p/> This method is thread safe. <p/> The variables are live. The variable value is a
670     * snapshot at the time the {@link Instrumentation.Element#getValue()} is invoked.
671     *
672     * @return all counters.
673     */
674    public Map<String, Map<String, Element<Variable>>> getVariables() {
675        return variables;
676    }
677
678    /**
679     * Return a map containing all variables, counters and timers.
680     *
681     * @return a map containing all variables, counters and timers.
682     */
683    public Map<String, Map<String, Map<String, Object>>> getAll() {
684        return all;
685    }
686
687    /**
688     * Return the string representation of the instrumentation.
689     *
690     * @return the string representation of the instrumentation.
691     */
692    public String toString() {
693        String E = System.getProperty("line.separator");
694        StringBuilder sb = new StringBuilder(4096);
695        for (String element : all.keySet()) {
696            sb.append(element).append(':').append(E);
697            List<String> groups = new ArrayList<String>(all.get(element).keySet());
698            Collections.sort(groups);
699            for (String group : groups) {
700                sb.append("  ").append(group).append(':').append(E);
701                List<String> names = new ArrayList<String>(all.get(element).get(group).keySet());
702                Collections.sort(names);
703                for (String name : names) {
704                    sb.append("    ").append(name).append(": ").append(((Element) all.get(element).
705                            get(group).get(name)).getValue()).append(E);
706                }
707            }
708        }
709        return sb.toString();
710    }
711
712    private static class Sampler implements Element<Double>, Runnable {
713        private Lock lock = new ReentrantLock();
714        private int samplingInterval;
715        private Variable<Long> variable;
716        private long[] values;
717        private int current;
718        private long valuesSum;
719        private double rate;
720
721        public Sampler(int samplingPeriod, int samplingInterval, Variable<Long> variable) {
722            this.samplingInterval = samplingInterval;
723            this.variable = variable;
724            values = new long[samplingPeriod / samplingInterval];
725            valuesSum = 0;
726            current = -1;
727        }
728
729        public int getSamplingInterval() {
730            return samplingInterval;
731        }
732
733        public void run() {
734            try {
735                lock.lock();
736                long newValue = variable.getValue();
737                if (current == -1) {
738                    valuesSum = newValue;
739                    current = 0;
740                    values[current] = newValue;
741                }
742                else {
743                    current = (current + 1) % values.length;
744                    valuesSum = valuesSum - values[current] + newValue;
745                    values[current] = newValue;
746                }
747                rate = ((double) valuesSum) / values.length;
748            }
749            finally {
750                lock.unlock();
751            }
752        }
753
754        public Double getValue() {
755            return rate;
756        }
757    }
758
759    /**
760     * Add a sampling variable. <p/> This method is thread safe.
761     *
762     * @param group timer group.
763     * @param name timer name.
764     * @param period sampling period to compute rate.
765     * @param interval sampling frequency, how often the variable is probed.
766     * @param variable variable to sample.
767     */
768    public void addSampler(String group, String name, int period, int interval, Variable<Long> variable) {
769        if (scheduler == null) {
770            throw new IllegalStateException("scheduler not set, cannot sample");
771        }
772        try {
773            samplerLock.lock();
774            Map<String, Element<Double>> map = samplers.get(group);
775            if (map == null) {
776                map = samplers.get(group);
777                if (map == null) {
778                    map = new HashMap<String, Element<Double>>();
779                    samplers.put(group, map);
780                }
781            }
782            if (map.containsKey(name)) {
783                throw new RuntimeException(XLog.format("Sampler group=[{0}] name=[{1}] already defined", group, name));
784            }
785            Sampler sampler = new Sampler(period, interval, variable);
786            map.put(name, sampler);
787            scheduler.scheduleAtFixedRate(sampler, 0, sampler.getSamplingInterval(), TimeUnit.SECONDS);
788        }
789        finally {
790            samplerLock.unlock();
791        }
792    }
793
794    /**
795     * Return all the samplers. <p/> This method is thread safe. <p/> The samplers are live. The sampler value is a
796     * snapshot at the time the {@link Instrumentation.Element#getValue()} is invoked.
797     *
798     * @return all counters.
799     */
800    public Map<String, Map<String, Element<Double>>> getSamplers() {
801        return samplers;
802    }
803
804}