001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.util;
019    
020    import org.apache.hadoop.conf.Configuration;
021    import org.apache.oozie.service.ConfigurationService;
022    
023    import java.util.ArrayList;
024    import java.util.Collection;
025    import java.util.Collections;
026    import java.util.HashMap;
027    import java.util.LinkedHashMap;
028    import java.util.LinkedHashSet;
029    import java.util.List;
030    import java.util.Map;
031    import java.util.Set;
032    import java.util.concurrent.ConcurrentHashMap;
033    import java.util.concurrent.ScheduledExecutorService;
034    import java.util.concurrent.TimeUnit;
035    import java.util.concurrent.atomic.AtomicLong;
036    import java.util.concurrent.locks.Lock;
037    import java.util.concurrent.locks.ReentrantLock;
038    
039    /**
040     * Instrumentation framework that supports Timers, Counters, Variables and Sampler instrumentation elements. <p/> All
041     * instrumentation elements have a group and a name.
042     */
043    public class Instrumentation {
044        private ScheduledExecutorService scheduler;
045        private Lock counterLock;
046        private Lock timerLock;
047        private Lock variableLock;
048        private Lock samplerLock;
049        private Configuration configuration;
050        private Map<String, Map<String, Map<String, Object>>> all;
051        private Map<String, Map<String, Element<Long>>> counters;
052        private Map<String, Map<String, Element<Timer>>> timers;
053        private Map<String, Map<String, Element<Variable>>> variables;
054        private Map<String, Map<String, Element<Double>>> samplers;
055    
056        /**
057         * Instrumentation constructor.
058         */
059        @SuppressWarnings("unchecked")
060        public Instrumentation() {
061            counterLock = new ReentrantLock();
062            timerLock = new ReentrantLock();
063            variableLock = new ReentrantLock();
064            samplerLock = new ReentrantLock();
065            all = new LinkedHashMap<String, Map<String, Map<String, Object>>>();
066            counters = new ConcurrentHashMap<String, Map<String, Element<Long>>>();
067            timers = new ConcurrentHashMap<String, Map<String, Element<Timer>>>();
068            variables = new ConcurrentHashMap<String, Map<String, Element<Variable>>>();
069            samplers = new ConcurrentHashMap<String, Map<String, Element<Double>>>();
070            all.put("variables", (Map<String, Map<String, Object>>) (Object) variables);
071            all.put("samplers", (Map<String, Map<String, Object>>) (Object) samplers);
072            all.put("counters", (Map<String, Map<String, Object>>) (Object) counters);
073            all.put("timers", (Map<String, Map<String, Object>>) (Object) timers);
074        }
075    
076        /**
077         * Set the scheduler instance to handle the samplers.
078         *
079         * @param scheduler scheduler instance.
080         */
081        public void setScheduler(ScheduledExecutorService scheduler) {
082            this.scheduler = scheduler;
083        }
084    
085        /**
086         * Cron is a stopwatch that can be started/stopped several times. <p/> This class is not thread safe, it does not
087         * need to be. <p/> It keeps track of the total time (first start to last stop) and the running time (total time
088         * minus the stopped intervals). <p/> Once a Cron is complete it must be added to the corresponding group/name in a
089         * Instrumentation instance.
090         */
091        public static class Cron {
092            private long start;
093            private long end;
094            private long lapStart;
095            private long own;
096            private long total;
097            private boolean running;
098    
099            /**
100             * Creates new Cron, stopped, in zero.
101             */
102            public Cron() {
103                running = false;
104            }
105    
106            /**
107             * Start the cron. It cannot be already started.
108             */
109            public void start() {
110                if (!running) {
111                    if (lapStart == 0) {
112                        lapStart = System.currentTimeMillis();
113                        if (start == 0) {
114                            start = lapStart;
115                            end = start;
116                        }
117                    }
118                    running = true;
119                }
120            }
121    
122            /**
123             * Stops the cron. It cannot be already stopped.
124             */
125            public void stop() {
126                if (running) {
127                    end = System.currentTimeMillis();
128                    if (start == 0) {
129                        start = end;
130                    }
131                    total = end - start;
132                    if (lapStart > 0) {
133                        own += end - lapStart;
134                        lapStart = 0;
135                    }
136                    running = false;
137                }
138            }
139    
140            /**
141             * Return the start time of the cron. It must be stopped.
142             *
143             * @return the start time of the cron.
144             */
145            public long getStart() {
146                if (running) {
147                    throw new IllegalStateException("Timer running");
148                }
149                return start;
150            }
151    
152            /**
153             * Return the end time of the cron.  It must be stopped.
154             *
155             * @return the end time of the cron.
156             */
157            public long getEnd() {
158                if (running) {
159                    throw new IllegalStateException("Timer running");
160                }
161                return end;
162            }
163    
164            /**
165             * Return the total time of the cron. It must be stopped.
166             *
167             * @return the total time of the cron.
168             */
169            public long getTotal() {
170                if (running) {
171                    throw new IllegalStateException("Timer running");
172                }
173                return total;
174            }
175    
176            /**
177             * Return the own time of the cron. It must be stopped.
178             *
179             * @return the own time of the cron.
180             */
181            public long getOwn() {
182                if (running) {
183                    throw new IllegalStateException("Timer running");
184                }
185                return own;
186            }
187    
188        }
189    
190        /**
191         * Gives access to a snapshot of an Instrumentation element (Counter, Timer). <p/> Instrumentation element snapshots
192         * are returned by the {@link Instrumentation#getCounters()} and {@link Instrumentation#getTimers()} ()} methods.
193         */
194        public interface Element<T> {
195    
196            /**
197             * Return the snapshot value of the Intrumentation element.
198             *
199             * @return the snapshot value of the Intrumentation element.
200             */
201            T getValue();
202        }
203    
204        /**
205         * Counter Instrumentation element.
206         */
207        private static class Counter extends AtomicLong implements Element<Long> {
208    
209            /**
210             * Return the counter snapshot.
211             *
212             * @return the counter snapshot.
213             */
214            public Long getValue() {
215                return get();
216            }
217    
218            /**
219             * Return the String representation of the counter value.
220             *
221             * @return the String representation of the counter value.
222             */
223            public String toString() {
224                return Long.toString(get());
225            }
226    
227        }
228    
229        /**
230         * Timer Instrumentation element.
231         */
232        public static class Timer implements Element<Timer> {
233            Lock lock = new ReentrantLock();
234            private long ownTime;
235            private long totalTime;
236            private long ticks;
237            private long ownSquareTime;
238            private long totalSquareTime;
239            private long ownMinTime;
240            private long ownMaxTime;
241            private long totalMinTime;
242            private long totalMaxTime;
243    
244            /**
245             * Timer constructor. <p/> It is project private for test purposes.
246             */
247            Timer() {
248            }
249    
250            /**
251             * Return the String representation of the timer value.
252             *
253             * @return the String representation of the timer value.
254             */
255            public String toString() {
256                return XLog.format("ticks[{0}] totalAvg[{1}] ownAvg[{2}]", ticks, getTotalAvg(), getOwnAvg());
257            }
258    
259            /**
260             * Return the timer snapshot.
261             *
262             * @return the timer snapshot.
263             */
264            public Timer getValue() {
265                try {
266                    lock.lock();
267                    Timer timer = new Timer();
268                    timer.ownTime = ownTime;
269                    timer.totalTime = totalTime;
270                    timer.ticks = ticks;
271                    timer.ownSquareTime = ownSquareTime;
272                    timer.totalSquareTime = totalSquareTime;
273                    timer.ownMinTime = ownMinTime;
274                    timer.ownMaxTime = ownMaxTime;
275                    timer.totalMinTime = totalMinTime;
276                    timer.totalMaxTime = totalMaxTime;
277                    return timer;
278                }
279                finally {
280                    lock.unlock();
281                }
282            }
283    
284            /**
285             * Add a cron to a timer. <p/> It is project private for test purposes.
286             *
287             * @param cron Cron to add.
288             */
289            void addCron(Cron cron) {
290                try {
291                    lock.lock();
292                    long own = cron.getOwn();
293                    long total = cron.getTotal();
294                    ownTime += own;
295                    totalTime += total;
296                    ticks++;
297                    ownSquareTime += own * own;
298                    totalSquareTime += total * total;
299                    if (ticks == 1) {
300                        ownMinTime = own;
301                        ownMaxTime = own;
302                        totalMinTime = total;
303                        totalMaxTime = total;
304                    }
305                    else {
306                        ownMinTime = Math.min(ownMinTime, own);
307                        ownMaxTime = Math.max(ownMaxTime, own);
308                        totalMinTime = Math.min(totalMinTime, total);
309                        totalMaxTime = Math.max(totalMaxTime, total);
310                    }
311                }
312                finally {
313                    lock.unlock();
314                }
315            }
316    
317            /**
318             * Return the own accumulated computing time by the timer.
319             *
320             * @return own accumulated computing time by the timer.
321             */
322            public long getOwn() {
323                return ownTime;
324            }
325    
326            /**
327             * Return the total accumulated computing time by the timer.
328             *
329             * @return total accumulated computing time by the timer.
330             */
331            public long getTotal() {
332                return totalTime;
333            }
334    
335            /**
336             * Return the number of times a cron was added to the timer.
337             *
338             * @return the number of times a cron was added to the timer.
339             */
340            public long getTicks() {
341                return ticks;
342            }
343    
344            /**
345             * Return the sum of the square own times. <p/> It can be used to calculate the standard deviation.
346             *
347             * @return the sum of the square own timer.
348             */
349            public long getOwnSquareSum() {
350                return ownSquareTime;
351            }
352    
353            /**
354             * Return the sum of the square total times. <p/> It can be used to calculate the standard deviation.
355             *
356             * @return the sum of the square own timer.
357             */
358            public long getTotalSquareSum() {
359                return totalSquareTime;
360            }
361    
362            /**
363             * Returns the own minimum time.
364             *
365             * @return the own minimum time.
366             */
367            public long getOwnMin() {
368                return ownMinTime;
369            }
370    
371            /**
372             * Returns the own maximum time.
373             *
374             * @return the own maximum time.
375             */
376            public long getOwnMax() {
377                return ownMaxTime;
378            }
379    
380            /**
381             * Returns the total minimum time.
382             *
383             * @return the total minimum time.
384             */
385            public long getTotalMin() {
386                return totalMinTime;
387            }
388    
389            /**
390             * Returns the total maximum time.
391             *
392             * @return the total maximum time.
393             */
394            public long getTotalMax() {
395                return totalMaxTime;
396            }
397    
398            /**
399             * Returns the own average time.
400             *
401             * @return the own average time.
402             */
403            public long getOwnAvg() {
404                return (ticks != 0) ? ownTime / ticks : 0;
405            }
406    
407            /**
408             * Returns the total average time.
409             *
410             * @return the total average time.
411             */
412            public long getTotalAvg() {
413                return (ticks != 0) ? totalTime / ticks : 0;
414            }
415    
416            /**
417             * Returns the total time standard deviation.
418             *
419             * @return the total time standard deviation.
420             */
421            public double getTotalStdDev() {
422                return evalStdDev(ticks, totalTime, totalSquareTime);
423            }
424    
425            /**
426             * Returns the own time standard deviation.
427             *
428             * @return the own time standard deviation.
429             */
430            public double getOwnStdDev() {
431                return evalStdDev(ticks, ownTime, ownSquareTime);
432            }
433    
434            private double evalStdDev(long n, long sn, long ssn) {
435                return (n < 2) ? -1 : Math.sqrt((n * ssn - sn * sn) / (n * (n - 1)));
436            }
437    
438        }
439    
440        /**
441         * Add a cron to an instrumentation timer. The timer is created if it does not exists. <p/> This method is thread
442         * safe.
443         *
444         * @param group timer group.
445         * @param name timer name.
446         * @param cron cron to add to the timer.
447         */
448        public void addCron(String group, String name, Cron cron) {
449            Map<String, Element<Timer>> map = timers.get(group);
450            if (map == null) {
451                try {
452                    timerLock.lock();
453                    map = timers.get(group);
454                    if (map == null) {
455                        map = new HashMap<String, Element<Timer>>();
456                        timers.put(group, map);
457                    }
458                }
459                finally {
460                    timerLock.unlock();
461                }
462            }
463            Timer timer = (Timer) map.get(name);
464            if (timer == null) {
465                try {
466                    timerLock.lock();
467                    timer = (Timer) map.get(name);
468                    if (timer == null) {
469                        timer = new Timer();
470                        map.put(name, timer);
471                    }
472                }
473                finally {
474                    timerLock.unlock();
475                }
476            }
477            timer.addCron(cron);
478        }
479    
480        /**
481         * Increment an instrumentation counter. The counter is created if it does not exists. <p/> This method is thread
482         * safe.
483         *
484         * @param group counter group.
485         * @param name counter name.
486         * @param count increment to add to the counter.
487         */
488        public void incr(String group, String name, long count) {
489            Map<String, Element<Long>> map = counters.get(group);
490            if (map == null) {
491                try {
492                    counterLock.lock();
493                    map = counters.get(group);
494                    if (map == null) {
495                        map = new HashMap<String, Element<Long>>();
496                        counters.put(group, map);
497                    }
498                }
499                finally {
500                    counterLock.unlock();
501                }
502            }
503            Counter counter = (Counter) map.get(name);
504            if (counter == null) {
505                try {
506                    counterLock.lock();
507                    counter = (Counter) map.get(name);
508                    if (counter == null) {
509                        counter = new Counter();
510                        map.put(name, counter);
511                    }
512                }
513                finally {
514                    counterLock.unlock();
515                }
516            }
517            counter.addAndGet(count);
518        }
519    
520        /**
521         * Interface for instrumentation variables. <p/> For example a the database service could expose the number of
522         * currently active connections.
523         */
524        public interface Variable<T> extends Element<T> {
525        }
526    
527        /**
528         * Add an instrumentation variable. The variable must not exist. <p/> This method is thread safe.
529         *
530         * @param group counter group.
531         * @param name counter name.
532         * @param variable variable to add.
533         */
534        @SuppressWarnings("unchecked")
535        public void addVariable(String group, String name, Variable variable) {
536            Map<String, Element<Variable>> map = variables.get(group);
537            if (map == null) {
538                try {
539                    variableLock.lock();
540                    map = variables.get(group);
541                    if (map == null) {
542                        map = new HashMap<String, Element<Variable>>();
543                        variables.put(group, map);
544                    }
545                }
546                finally {
547                    variableLock.unlock();
548                }
549            }
550            if (map.containsKey(name)) {
551                throw new RuntimeException(XLog.format("Variable group=[{0}] name=[{1}] already defined", group, name));
552            }
553            map.put(name, variable);
554        }
555    
556        /**
557         * Set the system configuration.
558         *
559         * @param configuration system configuration.
560         */
561        public void setConfiguration(Configuration configuration) {
562            this.configuration = configuration;
563        }
564    
565        /**
566         * Return the JVM system properties.
567         *
568         * @return JVM system properties.
569         */
570        @SuppressWarnings("unchecked")
571        public Map<String, String> getJavaSystemProperties() {
572            return (Map<String, String>) (Object) System.getProperties();
573        }
574    
575        /**
576         * Return the OS environment used to start Oozie.
577         *
578         * @return the OS environment used to start Oozie.
579         */
580        public Map<String, String> getOSEnv() {
581            return System.getenv();
582        }
583    
584        /**
585         * Return the current system configuration as a Map<String,String>.
586         *
587         * @return the current system configuration as a Map<String,String>.
588         */
589        public Map<String, String> getConfiguration() {
590            final Configuration maskedConf = ConfigurationService.maskPasswords(configuration);
591    
592            return new Map<String, String>() {
593                public int size() {
594                    return maskedConf.size();
595                }
596    
597                public boolean isEmpty() {
598                    return maskedConf.size() == 0;
599                }
600    
601                public boolean containsKey(Object o) {
602                    return maskedConf.get((String) o) != null;
603                }
604    
605                public boolean containsValue(Object o) {
606                    throw new UnsupportedOperationException();
607                }
608    
609                public String get(Object o) {
610                    return maskedConf.get((String) o);
611                }
612    
613                public String put(String s, String s1) {
614                    throw new UnsupportedOperationException();
615                }
616    
617                public String remove(Object o) {
618                    throw new UnsupportedOperationException();
619                }
620    
621                public void putAll(Map<? extends String, ? extends String> map) {
622                    throw new UnsupportedOperationException();
623                }
624    
625                public void clear() {
626                    throw new UnsupportedOperationException();
627                }
628    
629                public Set<String> keySet() {
630                    Set<String> set = new LinkedHashSet<String>();
631                    for (Entry<String, String> entry : maskedConf) {
632                        set.add(entry.getKey());
633                    }
634                    return set;
635                }
636    
637                public Collection<String> values() {
638                    Set<String> set = new LinkedHashSet<String>();
639                    for (Entry<String, String> entry : maskedConf) {
640                        set.add(entry.getValue());
641                    }
642                    return set;
643                }
644    
645                public Set<Entry<String, String>> entrySet() {
646                    Set<Entry<String, String>> set = new LinkedHashSet<Entry<String, String>>();
647                    for (Entry<String, String> entry : maskedConf) {
648                        set.add(entry);
649                    }
650                    return set;
651                }
652            };
653        }
654    
655        /**
656         * Return all the counters. <p/> This method is thread safe. <p/> The counters are live. The counter value is a
657         * snapshot at the time the {@link Instrumentation.Element#getValue()} is invoked.
658         *
659         * @return all counters.
660         */
661        public Map<String, Map<String, Element<Long>>> getCounters() {
662            return counters;
663        }
664    
665        /**
666         * Return all the timers. <p/> This method is thread safe. <p/> The timers are live. Once a timer is obtained, all
667         * its values are consistent (they are snapshot at the time the {@link Instrumentation.Element#getValue()} is
668         * invoked.
669         *
670         * @return all counters.
671         */
672        public Map<String, Map<String, Element<Timer>>> getTimers() {
673            return timers;
674        }
675    
676        /**
677         * Return all the variables. <p/> This method is thread safe. <p/> The variables are live. The variable value is a
678         * snapshot at the time the {@link Instrumentation.Element#getValue()} is invoked.
679         *
680         * @return all counters.
681         */
682        public Map<String, Map<String, Element<Variable>>> getVariables() {
683            return variables;
684        }
685    
686        /**
687         * Return a map containing all variables, counters and timers.
688         *
689         * @return a map containing all variables, counters and timers.
690         */
691        public Map<String, Map<String, Map<String, Object>>> getAll() {
692            return all;
693        }
694    
695        /**
696         * Return the string representation of the instrumentation.
697         *
698         * @return the string representation of the instrumentation.
699         */
700        public String toString() {
701            String E = System.getProperty("line.separator");
702            StringBuilder sb = new StringBuilder(4096);
703            for (String element : all.keySet()) {
704                sb.append(element).append(':').append(E);
705                List<String> groups = new ArrayList<String>(all.get(element).keySet());
706                Collections.sort(groups);
707                for (String group : groups) {
708                    sb.append("  ").append(group).append(':').append(E);
709                    List<String> names = new ArrayList<String>(all.get(element).get(group).keySet());
710                    Collections.sort(names);
711                    for (String name : names) {
712                        sb.append("    ").append(name).append(": ").append(((Element) all.get(element).
713                                get(group).get(name)).getValue()).append(E);
714                    }
715                }
716            }
717            return sb.toString();
718        }
719    
720        private static class Sampler implements Element<Double>, Runnable {
721            private Lock lock = new ReentrantLock();
722            private int samplingInterval;
723            private Variable<Long> variable;
724            private long[] values;
725            private int current;
726            private long valuesSum;
727            private double rate;
728    
729            public Sampler(int samplingPeriod, int samplingInterval, Variable<Long> variable) {
730                this.samplingInterval = samplingInterval;
731                this.variable = variable;
732                values = new long[samplingPeriod / samplingInterval];
733                valuesSum = 0;
734                current = -1;
735            }
736    
737            public int getSamplingInterval() {
738                return samplingInterval;
739            }
740    
741            public void run() {
742                try {
743                    lock.lock();
744                    long newValue = variable.getValue();
745                    if (current == -1) {
746                        valuesSum = newValue;
747                        current = 0;
748                        values[current] = newValue;
749                    }
750                    else {
751                        current = (current + 1) % values.length;
752                        valuesSum = valuesSum - values[current] + newValue;
753                        values[current] = newValue;
754                    }
755                    rate = ((double) valuesSum) / values.length;
756                }
757                finally {
758                    lock.unlock();
759                }
760            }
761    
762            public Double getValue() {
763                return rate;
764            }
765        }
766    
767        /**
768         * Add a sampling variable. <p/> This method is thread safe.
769         *
770         * @param group timer group.
771         * @param name timer name.
772         * @param period sampling period to compute rate.
773         * @param interval sampling frequency, how often the variable is probed.
774         * @param variable variable to sample.
775         */
776        public void addSampler(String group, String name, int period, int interval, Variable<Long> variable) {
777            if (scheduler == null) {
778                throw new IllegalStateException("scheduler not set, cannot sample");
779            }
780            try {
781                samplerLock.lock();
782                Map<String, Element<Double>> map = samplers.get(group);
783                if (map == null) {
784                    map = samplers.get(group);
785                    if (map == null) {
786                        map = new HashMap<String, Element<Double>>();
787                        samplers.put(group, map);
788                    }
789                }
790                if (map.containsKey(name)) {
791                    throw new RuntimeException(XLog.format("Sampler group=[{0}] name=[{1}] already defined", group, name));
792                }
793                Sampler sampler = new Sampler(period, interval, variable);
794                map.put(name, sampler);
795                scheduler.scheduleAtFixedRate(sampler, 0, sampler.getSamplingInterval(), TimeUnit.SECONDS);
796            }
797            finally {
798                samplerLock.unlock();
799            }
800        }
801    
802        /**
803         * Return all the samplers. <p/> This method is thread safe. <p/> The samplers are live. The sampler value is a
804         * snapshot at the time the {@link Instrumentation.Element#getValue()} is invoked.
805         *
806         * @return all counters.
807         */
808        public Map<String, Map<String, Element<Double>>> getSamplers() {
809            return samplers;
810        }
811    
812    }