001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.util;
020
021import com.google.common.collect.Maps;
022import org.apache.hadoop.conf.Configuration;
023import org.apache.oozie.service.ConfigurationService;
024import org.apache.oozie.service.Services;
025
026import java.util.ArrayList;
027import java.util.Collection;
028import java.util.Collections;
029import java.util.HashMap;
030import java.util.LinkedHashMap;
031import java.util.LinkedHashSet;
032import java.util.List;
033import java.util.Map;
034import java.util.Set;
035import java.util.concurrent.ConcurrentHashMap;
036import java.util.concurrent.ScheduledExecutorService;
037import java.util.concurrent.TimeUnit;
038import java.util.concurrent.atomic.AtomicLong;
039import java.util.concurrent.locks.Lock;
040import java.util.concurrent.locks.ReentrantLock;
041
042/**
043 * Instrumentation framework that supports Timers, Counters, Variables and Sampler instrumentation elements. <p> All
044 * instrumentation elements have a group and a name.
045 */
046public class Instrumentation {
047    private ScheduledExecutorService scheduler;
048    private Lock counterLock;
049    private Lock timerLock;
050    private Lock variableLock;
051    private Lock samplerLock;
052    private Map<String, Map<String, Map<String, Object>>> all;
053    private Map<String, Map<String, Element<Long>>> counters;
054    private Map<String, Map<String, Element<Timer>>> timers;
055    private Map<String, Map<String, Element<Variable>>> variables;
056    private Map<String, Map<String, Element<Double>>> samplers;
057
058    /**
059     * Instrumentation constructor.
060     */
061    @SuppressWarnings("unchecked")
062    public Instrumentation() {
063        counterLock = new ReentrantLock();
064        timerLock = new ReentrantLock();
065        variableLock = new ReentrantLock();
066        samplerLock = new ReentrantLock();
067        all = new LinkedHashMap<String, Map<String, Map<String, Object>>>();
068        counters = new ConcurrentHashMap<String, Map<String, Element<Long>>>();
069        timers = new ConcurrentHashMap<String, Map<String, Element<Timer>>>();
070        variables = new ConcurrentHashMap<String, Map<String, Element<Variable>>>();
071        samplers = new ConcurrentHashMap<String, Map<String, Element<Double>>>();
072        all.put("variables", (Map<String, Map<String, Object>>) (Object) variables);
073        all.put("samplers", (Map<String, Map<String, Object>>) (Object) samplers);
074        all.put("counters", (Map<String, Map<String, Object>>) (Object) counters);
075        all.put("timers", (Map<String, Map<String, Object>>) (Object) timers);
076    }
077
078    /**
079     * Set the scheduler instance to handle the samplers.
080     *
081     * @param scheduler scheduler instance.
082     */
083    public void setScheduler(ScheduledExecutorService scheduler) {
084        this.scheduler = scheduler;
085    }
086
087    /**
088     * Cron is a stopwatch that can be started/stopped several times. <p> This class is not thread safe, it does not
089     * need to be. <p> It keeps track of the total time (first start to last stop) and the running time (total time
090     * minus the stopped intervals). <p> Once a Cron is complete it must be added to the corresponding group/name in a
091     * Instrumentation instance.
092     */
093    public static class Cron {
094        private long start;
095        private long end;
096        private long lapStart;
097        private long own;
098        private long total;
099        private boolean running;
100
101        /**
102         * Creates new Cron, stopped, in zero.
103         */
104        public Cron() {
105            running = false;
106        }
107
108        /**
109         * Start the cron. It cannot be already started.
110         */
111        public void start() {
112            if (!running) {
113                if (lapStart == 0) {
114                    lapStart = System.currentTimeMillis();
115                    if (start == 0) {
116                        start = lapStart;
117                        end = start;
118                    }
119                }
120                running = true;
121            }
122        }
123
124        /**
125         * Stops the cron. It cannot be already stopped.
126         */
127        public void stop() {
128            if (running) {
129                end = System.currentTimeMillis();
130                if (start == 0) {
131                    start = end;
132                }
133                total = end - start;
134                if (lapStart > 0) {
135                    own += end - lapStart;
136                    lapStart = 0;
137                }
138                running = false;
139            }
140        }
141
142        /**
143         * Return the start time of the cron. It must be stopped.
144         *
145         * @return the start time of the cron.
146         */
147        public long getStart() {
148            if (running) {
149                throw new IllegalStateException("Timer running");
150            }
151            return start;
152        }
153
154        /**
155         * Return the end time of the cron.  It must be stopped.
156         *
157         * @return the end time of the cron.
158         */
159        public long getEnd() {
160            if (running) {
161                throw new IllegalStateException("Timer running");
162            }
163            return end;
164        }
165
166        /**
167         * Return the total time of the cron. It must be stopped.
168         *
169         * @return the total time of the cron.
170         */
171        public long getTotal() {
172            if (running) {
173                throw new IllegalStateException("Timer running");
174            }
175            return total;
176        }
177
178        /**
179         * Return the own time of the cron. It must be stopped.
180         *
181         * @return the own time of the cron.
182         */
183        public long getOwn() {
184            if (running) {
185                throw new IllegalStateException("Timer running");
186            }
187            return own;
188        }
189
190    }
191
192    /**
193     * Gives access to a snapshot of an Instrumentation element (Counter, Timer). <p> Instrumentation element snapshots
194     * are returned by the {@link Instrumentation#getCounters()} and {@link Instrumentation#getTimers()} ()} methods.
195     */
196    public interface Element<T> {
197
198        /**
199         * Return the snapshot value of the Intrumentation element.
200         *
201         * @return the snapshot value of the Intrumentation element.
202         */
203        T getValue();
204    }
205
206    /**
207     * Counter Instrumentation element.
208     */
209    private static class Counter extends AtomicLong implements Element<Long> {
210
211        /**
212         * Return the counter snapshot.
213         *
214         * @return the counter snapshot.
215         */
216        public Long getValue() {
217            return get();
218        }
219
220        /**
221         * Return the String representation of the counter value.
222         *
223         * @return the String representation of the counter value.
224         */
225        public String toString() {
226            return Long.toString(get());
227        }
228
229    }
230
231    /**
232     * Timer Instrumentation element.
233     */
234    public static class Timer implements Element<Timer> {
235        Lock lock = new ReentrantLock();
236        private long ownTime;
237        private long totalTime;
238        private long ticks;
239        private long ownSquareTime;
240        private long totalSquareTime;
241        private long ownMinTime;
242        private long ownMaxTime;
243        private long totalMinTime;
244        private long totalMaxTime;
245
246        /**
247         * Timer constructor. <p> It is project private for test purposes.
248         */
249        Timer() {
250        }
251
252        /**
253         * Return the String representation of the timer value.
254         *
255         * @return the String representation of the timer value.
256         */
257        public String toString() {
258            return XLog.format("ticks[{0}] totalAvg[{1}] ownAvg[{2}]", ticks, getTotalAvg(), getOwnAvg());
259        }
260
261        /**
262         * Return the timer snapshot.
263         *
264         * @return the timer snapshot.
265         */
266        public Timer getValue() {
267            try {
268                lock.lock();
269                Timer timer = new Timer();
270                timer.ownTime = ownTime;
271                timer.totalTime = totalTime;
272                timer.ticks = ticks;
273                timer.ownSquareTime = ownSquareTime;
274                timer.totalSquareTime = totalSquareTime;
275                timer.ownMinTime = ownMinTime;
276                timer.ownMaxTime = ownMaxTime;
277                timer.totalMinTime = totalMinTime;
278                timer.totalMaxTime = totalMaxTime;
279                return timer;
280            }
281            finally {
282                lock.unlock();
283            }
284        }
285
286        /**
287         * Add a cron to a timer. <p> It is project private for test purposes.
288         *
289         * @param cron Cron to add.
290         */
291        void addCron(Cron cron) {
292            try {
293                lock.lock();
294                long own = cron.getOwn();
295                long total = cron.getTotal();
296                ownTime += own;
297                totalTime += total;
298                ticks++;
299                ownSquareTime += own * own;
300                totalSquareTime += total * total;
301                if (ticks == 1) {
302                    ownMinTime = own;
303                    ownMaxTime = own;
304                    totalMinTime = total;
305                    totalMaxTime = total;
306                }
307                else {
308                    ownMinTime = Math.min(ownMinTime, own);
309                    ownMaxTime = Math.max(ownMaxTime, own);
310                    totalMinTime = Math.min(totalMinTime, total);
311                    totalMaxTime = Math.max(totalMaxTime, total);
312                }
313            }
314            finally {
315                lock.unlock();
316            }
317        }
318
319        /**
320         * Return the own accumulated computing time by the timer.
321         *
322         * @return own accumulated computing time by the timer.
323         */
324        public long getOwn() {
325            return ownTime;
326        }
327
328        /**
329         * Return the total accumulated computing time by the timer.
330         *
331         * @return total accumulated computing time by the timer.
332         */
333        public long getTotal() {
334            return totalTime;
335        }
336
337        /**
338         * Return the number of times a cron was added to the timer.
339         *
340         * @return the number of times a cron was added to the timer.
341         */
342        public long getTicks() {
343            return ticks;
344        }
345
346        /**
347         * Return the sum of the square own times. <p> It can be used to calculate the standard deviation.
348         *
349         * @return the sum of the square own timer.
350         */
351        public long getOwnSquareSum() {
352            return ownSquareTime;
353        }
354
355        /**
356         * Return the sum of the square total times. <p> It can be used to calculate the standard deviation.
357         *
358         * @return the sum of the square own timer.
359         */
360        public long getTotalSquareSum() {
361            return totalSquareTime;
362        }
363
364        /**
365         * Returns the own minimum time.
366         *
367         * @return the own minimum time.
368         */
369        public long getOwnMin() {
370            return ownMinTime;
371        }
372
373        /**
374         * Returns the own maximum time.
375         *
376         * @return the own maximum time.
377         */
378        public long getOwnMax() {
379            return ownMaxTime;
380        }
381
382        /**
383         * Returns the total minimum time.
384         *
385         * @return the total minimum time.
386         */
387        public long getTotalMin() {
388            return totalMinTime;
389        }
390
391        /**
392         * Returns the total maximum time.
393         *
394         * @return the total maximum time.
395         */
396        public long getTotalMax() {
397            return totalMaxTime;
398        }
399
400        /**
401         * Returns the own average time.
402         *
403         * @return the own average time.
404         */
405        public long getOwnAvg() {
406            return (ticks != 0) ? ownTime / ticks : 0;
407        }
408
409        /**
410         * Returns the total average time.
411         *
412         * @return the total average time.
413         */
414        public long getTotalAvg() {
415            return (ticks != 0) ? totalTime / ticks : 0;
416        }
417
418        /**
419         * Returns the total time standard deviation.
420         *
421         * @return the total time standard deviation.
422         */
423        public double getTotalStdDev() {
424            return evalStdDev(ticks, totalTime, totalSquareTime);
425        }
426
427        /**
428         * Returns the own time standard deviation.
429         *
430         * @return the own time standard deviation.
431         */
432        public double getOwnStdDev() {
433            return evalStdDev(ticks, ownTime, ownSquareTime);
434        }
435
436        private double evalStdDev(long n, long sn, long ssn) {
437            return (n < 2) ? -1 : Math.sqrt((n * ssn - sn * sn) / (n * (n - 1)));
438        }
439
440    }
441
442    /**
443     * Add a cron to an instrumentation timer. The timer is created if it does not exists. <p> This method is thread
444     * safe.
445     *
446     * @param group timer group.
447     * @param name timer name.
448     * @param cron cron to add to the timer.
449     */
450    public void addCron(String group, String name, Cron cron) {
451        Map<String, Element<Timer>> map = timers.get(group);
452        if (map == null) {
453            try {
454                timerLock.lock();
455                map = timers.get(group);
456                if (map == null) {
457                    map = new HashMap<String, Element<Timer>>();
458                    timers.put(group, map);
459                }
460            }
461            finally {
462                timerLock.unlock();
463            }
464        }
465        Timer timer = (Timer) map.get(name);
466        if (timer == null) {
467            try {
468                timerLock.lock();
469                timer = (Timer) map.get(name);
470                if (timer == null) {
471                    timer = new Timer();
472                    map.put(name, timer);
473                }
474            }
475            finally {
476                timerLock.unlock();
477            }
478        }
479        timer.addCron(cron);
480    }
481
482    /**
483     * Increment an instrumentation counter. The counter is created if it does not exists. <p> This method is thread
484     * safe.
485     *
486     * @param group counter group.
487     * @param name counter name.
488     * @param count increment to add to the counter.
489     */
490    public void incr(String group, String name, long count) {
491        Map<String, Element<Long>> map = counters.get(group);
492        if (map == null) {
493            try {
494                counterLock.lock();
495                map = counters.get(group);
496                if (map == null) {
497                    map = new HashMap<String, Element<Long>>();
498                    counters.put(group, map);
499                }
500            }
501            finally {
502                counterLock.unlock();
503            }
504        }
505        Counter counter = (Counter) map.get(name);
506        if (counter == null) {
507            try {
508                counterLock.lock();
509                counter = (Counter) map.get(name);
510                if (counter == null) {
511                    counter = new Counter();
512                    map.put(name, counter);
513                }
514            }
515            finally {
516                counterLock.unlock();
517            }
518        }
519        counter.addAndGet(count);
520    }
521
522    /**
523     * Interface for instrumentation variables. <p> For example a the database service could expose the number of
524     * currently active connections.
525     */
526    public interface Variable<T> extends Element<T> {
527    }
528
529    /**
530     * Add an instrumentation variable. The variable must not exist. <p> This method is thread safe.
531     *
532     * @param group counter group.
533     * @param name counter name.
534     * @param variable variable to add.
535     */
536    @SuppressWarnings("unchecked")
537    public void addVariable(String group, String name, Variable variable) {
538        Map<String, Element<Variable>> map = variables.get(group);
539        if (map == null) {
540            try {
541                variableLock.lock();
542                map = variables.get(group);
543                if (map == null) {
544                    map = new HashMap<String, Element<Variable>>();
545                    variables.put(group, map);
546                }
547            }
548            finally {
549                variableLock.unlock();
550            }
551        }
552        if (map.containsKey(name)) {
553            throw new RuntimeException(XLog.format("Variable group=[{0}] name=[{1}] already defined", group, name));
554        }
555        map.put(name, variable);
556    }
557
558    /**
559     * Return the JVM system properties.
560     *
561     * @return JVM system properties.
562     */
563    public Map<String, String> getJavaSystemProperties() {
564        Map<String, String> unmasked = Maps.fromProperties(System.getProperties());
565        return new PasswordMasker().mask(unmasked);
566    }
567
568    /**
569     * Return the OS environment used to start Oozie.
570     *
571     * @return the OS environment used to start Oozie.
572     */
573    public Map<String, String> getOSEnv() {
574        Map<String, String> unmasked = System.getenv();
575        return new PasswordMasker().mask(unmasked);
576    }
577
578    /**
579     * Return the current system configuration as a Map&lt;String,String&gt;.
580     *
581     * @return the current system configuration as a Map&lt;String,String&gt;.
582     */
583    public Map<String, String> getConfiguration() {
584        final Configuration maskedConf = Services.get().get(ConfigurationService.class).getMaskedConfiguration();
585
586        return new Map<String, String>() {
587            public int size() {
588                return maskedConf.size();
589            }
590
591            public boolean isEmpty() {
592                return maskedConf.size() == 0;
593            }
594
595            public boolean containsKey(Object o) {
596                return maskedConf.get((String) o) != null;
597            }
598
599            public boolean containsValue(Object o) {
600                throw new UnsupportedOperationException();
601            }
602
603            public String get(Object o) {
604                return maskedConf.get((String) o);
605            }
606
607            public String put(String s, String s1) {
608                throw new UnsupportedOperationException();
609            }
610
611            public String remove(Object o) {
612                throw new UnsupportedOperationException();
613            }
614
615            public void putAll(Map<? extends String, ? extends String> map) {
616                throw new UnsupportedOperationException();
617            }
618
619            public void clear() {
620                throw new UnsupportedOperationException();
621            }
622
623            public Set<String> keySet() {
624                Set<String> set = new LinkedHashSet<String>();
625                for (Entry<String, String> entry : maskedConf) {
626                    set.add(entry.getKey());
627                }
628                return set;
629            }
630
631            public Collection<String> values() {
632                Set<String> set = new LinkedHashSet<String>();
633                for (Entry<String, String> entry : maskedConf) {
634                    set.add(entry.getValue());
635                }
636                return set;
637            }
638
639            public Set<Entry<String, String>> entrySet() {
640                Set<Entry<String, String>> set = new LinkedHashSet<Entry<String, String>>();
641                for (Entry<String, String> entry : maskedConf) {
642                    set.add(entry);
643                }
644                return set;
645            }
646        };
647    }
648
649    /**
650     * Return all the counters. <p> This method is thread safe. <p> The counters are live. The counter value is a
651     * snapshot at the time the {@link Instrumentation.Element#getValue()} is invoked.
652     *
653     * @return all counters.
654     */
655    public Map<String, Map<String, Element<Long>>> getCounters() {
656        return counters;
657    }
658
659    /**
660     * Return all the timers. <p> This method is thread safe. <p> The timers are live. Once a timer is obtained, all
661     * its values are consistent (they are snapshot at the time the {@link Instrumentation.Element#getValue()} is
662     * invoked.
663     *
664     * @return all counters.
665     */
666    public Map<String, Map<String, Element<Timer>>> getTimers() {
667        return timers;
668    }
669
670    /**
671     * Return all the variables. <p> This method is thread safe. <p> The variables are live. The variable value is a
672     * snapshot at the time the {@link Instrumentation.Element#getValue()} is invoked.
673     *
674     * @return all counters.
675     */
676    public Map<String, Map<String, Element<Variable>>> getVariables() {
677        return variables;
678    }
679
680    /**
681     * Return a map containing all variables, counters and timers.
682     *
683     * @return a map containing all variables, counters and timers.
684     */
685    public Map<String, Map<String, Map<String, Object>>> getAll() {
686        return all;
687    }
688
689    /**
690     * Return the string representation of the instrumentation.
691     *
692     * @return the string representation of the instrumentation.
693     */
694    public String toString() {
695        String E = System.getProperty("line.separator");
696        StringBuilder sb = new StringBuilder(4096);
697        for (String element : all.keySet()) {
698            sb.append(element).append(':').append(E);
699            List<String> groups = new ArrayList<String>(all.get(element).keySet());
700            Collections.sort(groups);
701            for (String group : groups) {
702                sb.append("  ").append(group).append(':').append(E);
703                List<String> names = new ArrayList<String>(all.get(element).get(group).keySet());
704                Collections.sort(names);
705                for (String name : names) {
706                    sb.append("    ").append(name).append(": ").append(((Element) all.get(element).
707                            get(group).get(name)).getValue()).append(E);
708                }
709            }
710        }
711        return sb.toString();
712    }
713
714    private static class Sampler implements Element<Double>, Runnable {
715        private Lock lock = new ReentrantLock();
716        private int samplingInterval;
717        private Variable<Long> variable;
718        private long[] values;
719        private int current;
720        private long valuesSum;
721        private double rate;
722
723        public Sampler(int samplingPeriod, int samplingInterval, Variable<Long> variable) {
724            this.samplingInterval = samplingInterval;
725            this.variable = variable;
726            values = new long[samplingPeriod / samplingInterval];
727            valuesSum = 0;
728            current = -1;
729        }
730
731        public int getSamplingInterval() {
732            return samplingInterval;
733        }
734
735        public void run() {
736            try {
737                lock.lock();
738                long newValue = variable.getValue();
739                if (current == -1) {
740                    valuesSum = newValue;
741                    current = 0;
742                    values[current] = newValue;
743                }
744                else {
745                    current = (current + 1) % values.length;
746                    valuesSum = valuesSum - values[current] + newValue;
747                    values[current] = newValue;
748                }
749                rate = ((double) valuesSum) / values.length;
750            }
751            finally {
752                lock.unlock();
753            }
754        }
755
756        public Double getValue() {
757            return rate;
758        }
759    }
760
761    /**
762     * Add a sampling variable. <p> This method is thread safe.
763     *
764     * @param group timer group.
765     * @param name timer name.
766     * @param period sampling period to compute rate.
767     * @param interval sampling frequency, how often the variable is probed.
768     * @param variable variable to sample.
769     */
770    public void addSampler(String group, String name, int period, int interval, Variable<Long> variable) {
771        if (scheduler == null) {
772            throw new IllegalStateException("scheduler not set, cannot sample");
773        }
774        try {
775            samplerLock.lock();
776            Map<String, Element<Double>> map = samplers.get(group);
777            if (map == null) {
778                map = samplers.get(group);
779                if (map == null) {
780                    map = new HashMap<String, Element<Double>>();
781                    samplers.put(group, map);
782                }
783            }
784            if (map.containsKey(name)) {
785                throw new RuntimeException(XLog.format("Sampler group=[{0}] name=[{1}] already defined", group, name));
786            }
787            Sampler sampler = new Sampler(period, interval, variable);
788            map.put(name, sampler);
789            scheduler.scheduleAtFixedRate(sampler, 0, sampler.getSamplingInterval(), TimeUnit.SECONDS);
790        }
791        finally {
792            samplerLock.unlock();
793        }
794    }
795
796    /**
797     * Return all the samplers. <p> This method is thread safe. <p> The samplers are live. The sampler value is a
798     * snapshot at the time the {@link Instrumentation.Element#getValue()} is invoked.
799     *
800     * @return all counters.
801     */
802    public Map<String, Map<String, Element<Double>>> getSamplers() {
803        return samplers;
804    }
805
806    public void stop() {
807
808    }
809
810}