001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.service;
019
020import com.google.common.base.Joiner;
021import com.google.common.base.Preconditions;
022import com.google.common.base.Stopwatch;
023import com.google.common.collect.Lists;
024import com.google.common.collect.Maps;
025import com.google.common.collect.Sets;
026import java.lang.management.GarbageCollectorMXBean;
027import java.lang.management.ManagementFactory;
028import java.util.List;
029import java.util.Map;
030import java.util.Set;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.util.Daemon;
033import org.apache.oozie.util.Instrumentation;
034import org.apache.oozie.util.XLog;
035
036/**
037 * This class sets up a simple thread that sleeps for a short period of time. If the sleep takes significantly longer than its
038 * target time, it implies that the JVM or host machine has paused processing, which may cause other problems. If such a pause is
039 * detected, the thread logs a message and updates the Instrumentation.
040 *
041 * Adapted from org.apache.hadoop.util.JvmPauseMonitor
042 */
043public class JvmPauseMonitorService implements Service {
044
045    private static XLog LOG = XLog.getLog(JvmPauseMonitorService.class);
046
047    public static final String CONF_PREFIX = Service.CONF_PREFIX + "JvmPauseMonitorService.";
048
049    /**
050     * The target sleep time
051     */
052    private static final long SLEEP_INTERVAL_MS = 500;
053
054    /**
055     * log WARN if we detect a pause longer than this threshold
056     */
057    private long warnThresholdMs;
058    private static final String WARN_THRESHOLD_KEY = CONF_PREFIX + "warn-threshold.ms";
059    private static final long WARN_THRESHOLD_DEFAULT = 10000;
060
061    /**
062     * log INFO if we detect a pause longer than this threshold
063     */
064    private long infoThresholdMs;
065    private static final String INFO_THRESHOLD_KEY = CONF_PREFIX + "info-threshold.ms";
066    private static final long INFO_THRESHOLD_DEFAULT = 1000;
067
068    private Thread monitorThread;
069    private volatile boolean shouldRun = true;
070    private Instrumentation instrumentation;
071
072    @Override
073    public void init(Services services) throws ServiceException {
074        Configuration conf = services.getConf();
075        warnThresholdMs = conf.getLong(WARN_THRESHOLD_KEY, WARN_THRESHOLD_DEFAULT);
076        infoThresholdMs = conf.getLong(INFO_THRESHOLD_KEY, INFO_THRESHOLD_DEFAULT);
077
078        instrumentation = services.get(InstrumentationService.class).get();
079
080        Preconditions.checkState(monitorThread == null,
081                "Already started");
082        monitorThread = new Daemon(new Monitor());
083        monitorThread.start();
084    }
085
086    @Override
087    public void destroy() {
088        shouldRun = false;
089        monitorThread.interrupt();
090        try {
091            monitorThread.join();
092        } catch (InterruptedException e) {
093            Thread.currentThread().interrupt();
094        }
095    }
096
097    @Override
098    public Class<? extends Service> getInterface() {
099        return JvmPauseMonitorService.class;
100    }
101
102    private String formatMessage(long extraSleepTime, Map<String, GcTimes> gcTimesAfterSleep,
103            Map<String, GcTimes> gcTimesBeforeSleep) {
104        Set<String> gcBeanNames = Sets.intersection(gcTimesAfterSleep.keySet(), gcTimesBeforeSleep.keySet());
105        List<String> gcDiffs = Lists.newArrayList();
106        for (String name : gcBeanNames) {
107            GcTimes diff = gcTimesAfterSleep.get(name).subtract(gcTimesBeforeSleep.get(name));
108            if (diff.gcCount != 0) {
109                gcDiffs.add("GC pool '" + name + "' had collection(s): " + diff.toString());
110            }
111        }
112
113        String ret = "Detected pause in JVM or host machine (eg GC): pause of approximately " + extraSleepTime + "ms\n";
114        if (gcDiffs.isEmpty()) {
115            ret += "No GCs detected";
116        } else {
117            ret += Joiner.on("\n").join(gcDiffs);
118        }
119        return ret;
120    }
121
122    private Map<String, GcTimes> getGcTimes() {
123        Map<String, GcTimes> map = Maps.newHashMap();
124        List<GarbageCollectorMXBean> gcBeans = ManagementFactory.getGarbageCollectorMXBeans();
125        for (GarbageCollectorMXBean gcBean : gcBeans) {
126            map.put(gcBean.getName(), new GcTimes(gcBean));
127        }
128        return map;
129    }
130
131    private static class GcTimes {
132
133        private GcTimes(GarbageCollectorMXBean gcBean) {
134            gcCount = gcBean.getCollectionCount();
135            gcTimeMillis = gcBean.getCollectionTime();
136        }
137
138        private GcTimes(long count, long time) {
139            this.gcCount = count;
140            this.gcTimeMillis = time;
141        }
142
143        private GcTimes subtract(GcTimes other) {
144            return new GcTimes(this.gcCount - other.gcCount, this.gcTimeMillis - other.gcTimeMillis);
145        }
146
147        @Override
148        public String toString() {
149            return "count=" + gcCount + " time=" + gcTimeMillis + "ms";
150        }
151
152        private long gcCount;
153        private long gcTimeMillis;
154    }
155
156    private class Monitor implements Runnable {
157
158        @Override
159        public void run() {
160            Stopwatch sw = new Stopwatch();
161            Map<String, GcTimes> gcTimesBeforeSleep = getGcTimes();
162            while (shouldRun) {
163                sw.reset().start();
164                try {
165                    Thread.sleep(SLEEP_INTERVAL_MS);
166                } catch (InterruptedException ie) {
167                    return;
168                }
169                long extraSleepTime = sw.elapsedMillis() - SLEEP_INTERVAL_MS;
170                Map<String, GcTimes> gcTimesAfterSleep = getGcTimes();
171
172                if (extraSleepTime > warnThresholdMs) {
173                    LOG.warn(formatMessage(extraSleepTime, gcTimesAfterSleep, gcTimesBeforeSleep));
174                    instrumentation.incr("jvm", "pause.warn-threshold", 1);
175                } else if (extraSleepTime > infoThresholdMs) {
176                    LOG.info(formatMessage(extraSleepTime, gcTimesAfterSleep, gcTimesBeforeSleep));
177                    instrumentation.incr("jvm", "pause.info-threshold", 1);
178                }
179                instrumentation.incr("jvm", "pause.extraSleepTime", extraSleepTime);
180
181                gcTimesBeforeSleep = gcTimesAfterSleep;
182            }
183        }
184    }
185}