001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.service; 019 020import com.google.common.base.Joiner; 021import com.google.common.base.Preconditions; 022import com.google.common.base.Stopwatch; 023import com.google.common.collect.Lists; 024import com.google.common.collect.Maps; 025import com.google.common.collect.Sets; 026import java.lang.management.GarbageCollectorMXBean; 027import java.lang.management.ManagementFactory; 028import java.util.List; 029import java.util.Map; 030import java.util.Set; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.util.Daemon; 033import org.apache.oozie.util.Instrumentation; 034import org.apache.oozie.util.XLog; 035 036/** 037 * This class sets up a simple thread that sleeps for a short period of time. If the sleep takes significantly longer than its 038 * target time, it implies that the JVM or host machine has paused processing, which may cause other problems. If such a pause is 039 * detected, the thread logs a message and updates the Instrumentation. 040 * 041 * Adapted from org.apache.hadoop.util.JvmPauseMonitor 042 */ 043public class JvmPauseMonitorService implements Service { 044 045 private static XLog LOG = XLog.getLog(JvmPauseMonitorService.class); 046 047 public static final String CONF_PREFIX = Service.CONF_PREFIX + "JvmPauseMonitorService."; 048 049 /** 050 * The target sleep time 051 */ 052 private static final long SLEEP_INTERVAL_MS = 500; 053 054 /** 055 * log WARN if we detect a pause longer than this threshold 056 */ 057 private long warnThresholdMs; 058 private static final String WARN_THRESHOLD_KEY = CONF_PREFIX + "warn-threshold.ms"; 059 private static final long WARN_THRESHOLD_DEFAULT = 10000; 060 061 /** 062 * log INFO if we detect a pause longer than this threshold 063 */ 064 private long infoThresholdMs; 065 private static final String INFO_THRESHOLD_KEY = CONF_PREFIX + "info-threshold.ms"; 066 private static final long INFO_THRESHOLD_DEFAULT = 1000; 067 068 private Thread monitorThread; 069 private volatile boolean shouldRun = true; 070 private Instrumentation instrumentation; 071 072 @Override 073 public void init(Services services) throws ServiceException { 074 Configuration conf = services.getConf(); 075 warnThresholdMs = conf.getLong(WARN_THRESHOLD_KEY, WARN_THRESHOLD_DEFAULT); 076 infoThresholdMs = conf.getLong(INFO_THRESHOLD_KEY, INFO_THRESHOLD_DEFAULT); 077 078 instrumentation = services.get(InstrumentationService.class).get(); 079 080 Preconditions.checkState(monitorThread == null, 081 "Already started"); 082 monitorThread = new Daemon(new Monitor()); 083 monitorThread.start(); 084 } 085 086 @Override 087 public void destroy() { 088 shouldRun = false; 089 monitorThread.interrupt(); 090 try { 091 monitorThread.join(); 092 } catch (InterruptedException e) { 093 Thread.currentThread().interrupt(); 094 } 095 } 096 097 @Override 098 public Class<? extends Service> getInterface() { 099 return JvmPauseMonitorService.class; 100 } 101 102 private String formatMessage(long extraSleepTime, Map<String, GcTimes> gcTimesAfterSleep, 103 Map<String, GcTimes> gcTimesBeforeSleep) { 104 Set<String> gcBeanNames = Sets.intersection(gcTimesAfterSleep.keySet(), gcTimesBeforeSleep.keySet()); 105 List<String> gcDiffs = Lists.newArrayList(); 106 for (String name : gcBeanNames) { 107 GcTimes diff = gcTimesAfterSleep.get(name).subtract(gcTimesBeforeSleep.get(name)); 108 if (diff.gcCount != 0) { 109 gcDiffs.add("GC pool '" + name + "' had collection(s): " + diff.toString()); 110 } 111 } 112 113 String ret = "Detected pause in JVM or host machine (eg GC): pause of approximately " + extraSleepTime + "ms\n"; 114 if (gcDiffs.isEmpty()) { 115 ret += "No GCs detected"; 116 } else { 117 ret += Joiner.on("\n").join(gcDiffs); 118 } 119 return ret; 120 } 121 122 private Map<String, GcTimes> getGcTimes() { 123 Map<String, GcTimes> map = Maps.newHashMap(); 124 List<GarbageCollectorMXBean> gcBeans = ManagementFactory.getGarbageCollectorMXBeans(); 125 for (GarbageCollectorMXBean gcBean : gcBeans) { 126 map.put(gcBean.getName(), new GcTimes(gcBean)); 127 } 128 return map; 129 } 130 131 private static class GcTimes { 132 133 private GcTimes(GarbageCollectorMXBean gcBean) { 134 gcCount = gcBean.getCollectionCount(); 135 gcTimeMillis = gcBean.getCollectionTime(); 136 } 137 138 private GcTimes(long count, long time) { 139 this.gcCount = count; 140 this.gcTimeMillis = time; 141 } 142 143 private GcTimes subtract(GcTimes other) { 144 return new GcTimes(this.gcCount - other.gcCount, this.gcTimeMillis - other.gcTimeMillis); 145 } 146 147 @Override 148 public String toString() { 149 return "count=" + gcCount + " time=" + gcTimeMillis + "ms"; 150 } 151 152 private long gcCount; 153 private long gcTimeMillis; 154 } 155 156 private class Monitor implements Runnable { 157 158 @Override 159 public void run() { 160 Stopwatch sw = new Stopwatch(); 161 Map<String, GcTimes> gcTimesBeforeSleep = getGcTimes(); 162 while (shouldRun) { 163 sw.reset().start(); 164 try { 165 Thread.sleep(SLEEP_INTERVAL_MS); 166 } catch (InterruptedException ie) { 167 return; 168 } 169 long extraSleepTime = sw.elapsedMillis() - SLEEP_INTERVAL_MS; 170 Map<String, GcTimes> gcTimesAfterSleep = getGcTimes(); 171 172 if (extraSleepTime > warnThresholdMs) { 173 LOG.warn(formatMessage(extraSleepTime, gcTimesAfterSleep, gcTimesBeforeSleep)); 174 instrumentation.incr("jvm", "pause.warn-threshold", 1); 175 } else if (extraSleepTime > infoThresholdMs) { 176 LOG.info(formatMessage(extraSleepTime, gcTimesAfterSleep, gcTimesBeforeSleep)); 177 instrumentation.incr("jvm", "pause.info-threshold", 1); 178 } 179 instrumentation.incr("jvm", "pause.extraSleepTime", extraSleepTime); 180 181 gcTimesBeforeSleep = gcTimesAfterSleep; 182 } 183 } 184 } 185}