001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.service; 020 021import com.google.common.base.Joiner; 022import com.google.common.base.Preconditions; 023import com.google.common.base.Stopwatch; 024import com.google.common.collect.Lists; 025import com.google.common.collect.Maps; 026import com.google.common.collect.Sets; 027import java.lang.management.GarbageCollectorMXBean; 028import java.lang.management.ManagementFactory; 029import java.util.List; 030import java.util.Map; 031import java.util.Set; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.util.Daemon; 034import org.apache.oozie.util.ConfigUtils; 035import org.apache.oozie.util.Instrumentation; 036import org.apache.oozie.util.XLog; 037 038/** 039 * This class sets up a simple thread that sleeps for a short period of time. If the sleep takes significantly longer than its 040 * target time, it implies that the JVM or host machine has paused processing, which may cause other problems. If such a pause is 041 * detected, the thread logs a message and updates the Instrumentation. 042 * 043 * Adapted from org.apache.hadoop.util.JvmPauseMonitor 044 */ 045public class JvmPauseMonitorService implements Service { 046 047 private static XLog LOG = XLog.getLog(JvmPauseMonitorService.class); 048 049 public static final String CONF_PREFIX = Service.CONF_PREFIX + "JvmPauseMonitorService."; 050 051 /** 052 * The target sleep time 053 */ 054 private static final long SLEEP_INTERVAL_MS = 500; 055 056 /** 057 * log WARN if we detect a pause longer than this threshold 058 */ 059 private long warnThresholdMs; 060 public static final String WARN_THRESHOLD_KEY = CONF_PREFIX + "warn-threshold.ms"; 061 062 /** 063 * log INFO if we detect a pause longer than this threshold 064 */ 065 private long infoThresholdMs; 066 public static final String INFO_THRESHOLD_KEY = CONF_PREFIX + "info-threshold.ms"; 067 068 private Thread monitorThread; 069 private volatile boolean shouldRun = true; 070 private Instrumentation instrumentation; 071 072 @Override 073 public void init(Services services) throws ServiceException { 074 warnThresholdMs = ConfigurationService.getLong(services.getConf(), WARN_THRESHOLD_KEY); 075 infoThresholdMs = ConfigurationService.getLong(services.getConf(), INFO_THRESHOLD_KEY); 076 077 instrumentation = services.get(InstrumentationService.class).get(); 078 079 Preconditions.checkState(monitorThread == null, 080 "Already started"); 081 monitorThread = new Daemon(new Monitor()); 082 monitorThread.start(); 083 } 084 085 @Override 086 public void destroy() { 087 shouldRun = false; 088 monitorThread.interrupt(); 089 try { 090 monitorThread.join(); 091 } catch (InterruptedException e) { 092 Thread.currentThread().interrupt(); 093 } 094 } 095 096 @Override 097 public Class<? extends Service> getInterface() { 098 return JvmPauseMonitorService.class; 099 } 100 101 private String formatMessage(long extraSleepTime, Map<String, GcTimes> gcTimesAfterSleep, 102 Map<String, GcTimes> gcTimesBeforeSleep) { 103 Set<String> gcBeanNames = Sets.intersection(gcTimesAfterSleep.keySet(), gcTimesBeforeSleep.keySet()); 104 List<String> gcDiffs = Lists.newArrayList(); 105 for (String name : gcBeanNames) { 106 GcTimes diff = gcTimesAfterSleep.get(name).subtract(gcTimesBeforeSleep.get(name)); 107 if (diff.gcCount != 0) { 108 gcDiffs.add("GC pool '" + name + "' had collection(s): " + diff.toString()); 109 } 110 } 111 112 String ret = "Detected pause in JVM or host machine (eg GC): pause of approximately " + extraSleepTime + "ms\n"; 113 if (gcDiffs.isEmpty()) { 114 ret += "No GCs detected"; 115 } else { 116 ret += Joiner.on("\n").join(gcDiffs); 117 } 118 return ret; 119 } 120 121 private Map<String, GcTimes> getGcTimes() { 122 Map<String, GcTimes> map = Maps.newHashMap(); 123 List<GarbageCollectorMXBean> gcBeans = ManagementFactory.getGarbageCollectorMXBeans(); 124 for (GarbageCollectorMXBean gcBean : gcBeans) { 125 map.put(gcBean.getName(), new GcTimes(gcBean)); 126 } 127 return map; 128 } 129 130 private static class GcTimes { 131 132 private GcTimes(GarbageCollectorMXBean gcBean) { 133 gcCount = gcBean.getCollectionCount(); 134 gcTimeMillis = gcBean.getCollectionTime(); 135 } 136 137 private GcTimes(long count, long time) { 138 this.gcCount = count; 139 this.gcTimeMillis = time; 140 } 141 142 private GcTimes subtract(GcTimes other) { 143 return new GcTimes(this.gcCount - other.gcCount, this.gcTimeMillis - other.gcTimeMillis); 144 } 145 146 @Override 147 public String toString() { 148 return "count=" + gcCount + " time=" + gcTimeMillis + "ms"; 149 } 150 151 private long gcCount; 152 private long gcTimeMillis; 153 } 154 155 private class Monitor implements Runnable { 156 157 @Override 158 public void run() { 159 Stopwatch sw = new Stopwatch(); 160 Map<String, GcTimes> gcTimesBeforeSleep = getGcTimes(); 161 while (shouldRun) { 162 sw.reset().start(); 163 try { 164 Thread.sleep(SLEEP_INTERVAL_MS); 165 } catch (InterruptedException ie) { 166 return; 167 } 168 long extraSleepTime = sw.elapsedMillis() - SLEEP_INTERVAL_MS; 169 Map<String, GcTimes> gcTimesAfterSleep = getGcTimes(); 170 171 if (extraSleepTime > warnThresholdMs) { 172 LOG.warn(formatMessage(extraSleepTime, gcTimesAfterSleep, gcTimesBeforeSleep)); 173 instrumentation.incr("jvm", "pause.warn-threshold", 1); 174 } else if (extraSleepTime > infoThresholdMs) { 175 LOG.info(formatMessage(extraSleepTime, gcTimesAfterSleep, gcTimesBeforeSleep)); 176 instrumentation.incr("jvm", "pause.info-threshold", 1); 177 } 178 instrumentation.incr("jvm", "pause.extraSleepTime", extraSleepTime); 179 180 gcTimesBeforeSleep = gcTimesAfterSleep; 181 } 182 } 183 } 184}