001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.util; 019 020 import java.io.File; 021 import java.io.FileInputStream; 022 import java.io.IOException; 023 import java.io.InputStream; 024 import java.io.Writer; 025 import java.util.ArrayList; 026 import java.util.Calendar; 027 import java.util.Collections; 028 import java.util.Date; 029 import java.util.HashMap; 030 import java.util.List; 031 import java.util.Map; 032 import java.util.regex.Matcher; 033 import java.util.regex.Pattern; 034 import java.util.zip.GZIPInputStream; 035 036 import com.google.common.annotations.VisibleForTesting; 037 038 /** 039 * XLogStreamer streams the given log file to logWriter after applying the given filter. 040 */ 041 public class XLogStreamer { 042 private static XLog LOG = XLog.getLog(XLogStreamer.class); 043 044 /** 045 * Filter that will construct the regular expression that will be used to filter the log statement. And also checks 046 * if the given log message go through the filter. Filters that can be used are logLevel(Multi values separated by 047 * "|") jobId appName actionId token 048 */ 049 public static class Filter { 050 private Map<String, Integer> logLevels; 051 private final Map<String, String> filterParams; 052 private static List<String> parameters = new ArrayList<String>(); 053 private boolean noFilter; 054 private Pattern filterPattern; 055 056 // TODO Patterns to be read from config file 057 private static final String DEFAULT_REGEX = "[^\\]]*"; 058 059 public static final String ALLOW_ALL_REGEX = "(.*)"; 060 private static final String TIMESTAMP_REGEX = "(\\d\\d\\d\\d-\\d\\d-\\d\\d \\d\\d:\\d\\d:\\d\\d,\\d\\d\\d)"; 061 private static final String WHITE_SPACE_REGEX = "\\s+"; 062 private static final String LOG_LEVEL_REGEX = "(\\w+)"; 063 private static final String PREFIX_REGEX = TIMESTAMP_REGEX + WHITE_SPACE_REGEX + LOG_LEVEL_REGEX 064 + WHITE_SPACE_REGEX; 065 private static final Pattern SPLITTER_PATTERN = Pattern.compile(PREFIX_REGEX + ALLOW_ALL_REGEX); 066 067 public Filter() { 068 filterParams = new HashMap<String, String>(); 069 for (int i = 0; i < parameters.size(); i++) { 070 filterParams.put(parameters.get(i), DEFAULT_REGEX); 071 } 072 logLevels = null; 073 noFilter = true; 074 filterPattern = null; 075 } 076 077 public void setLogLevel(String logLevel) { 078 if (logLevel != null && logLevel.trim().length() > 0) { 079 this.logLevels = new HashMap<String, Integer>(); 080 String[] levels = logLevel.split("\\|"); 081 for (int i = 0; i < levels.length; i++) { 082 String s = levels[i].trim().toUpperCase(); 083 try { 084 XLog.Level.valueOf(s); 085 } 086 catch (Exception ex) { 087 continue; 088 } 089 this.logLevels.put(levels[i].toUpperCase(), 1); 090 } 091 } 092 } 093 094 public void setParameter(String filterParam, String value) { 095 if (filterParams.containsKey(filterParam)) { 096 noFilter = false; 097 filterParams.put(filterParam, value); 098 } 099 } 100 101 public static void defineParameter(String filterParam) { 102 parameters.add(filterParam); 103 } 104 105 public boolean isFilterPresent() { 106 if (noFilter && logLevels == null) { 107 return false; 108 } 109 return true; 110 } 111 112 /** 113 * Checks if the logLevel and logMessage goes through the logFilter. 114 * 115 * @param logParts 116 * @return 117 */ 118 public boolean matches(ArrayList<String> logParts) { 119 String logLevel = logParts.get(0); 120 String logMessage = logParts.get(1); 121 if (this.logLevels == null || this.logLevels.containsKey(logLevel.toUpperCase())) { 122 Matcher logMatcher = filterPattern.matcher(logMessage); 123 return logMatcher.matches(); 124 } 125 else { 126 return false; 127 } 128 } 129 130 /** 131 * Splits the log line into timestamp, logLevel and remaining log message. Returns array containing logLevel and 132 * logMessage if the pattern matches i.e A new log statement, else returns null. 133 * 134 * @param logLine 135 * @return Array containing log level and log message 136 */ 137 public ArrayList<String> splitLogMessage(String logLine) { 138 Matcher splitter = SPLITTER_PATTERN.matcher(logLine); 139 if (splitter.matches()) { 140 ArrayList<String> logParts = new ArrayList<String>(); 141 logParts.add(splitter.group(2));// log level 142 logParts.add(splitter.group(3));// Log Message 143 return logParts; 144 } 145 else { 146 return null; 147 } 148 } 149 150 /** 151 * Constructs the regular expression according to the filter and assigns it to fileterPattarn. ".*" will be 152 * assigned if no filters are set. 153 */ 154 public void constructPattern() { 155 if (noFilter && logLevels == null) { 156 filterPattern = Pattern.compile(ALLOW_ALL_REGEX); 157 return; 158 } 159 StringBuilder sb = new StringBuilder(); 160 if (noFilter) { 161 sb.append("(.*)"); 162 } 163 else { 164 sb.append("(.* "); 165 for (int i = 0; i < parameters.size(); i++) { 166 sb.append(parameters.get(i) + "\\["); 167 sb.append(filterParams.get(parameters.get(i)) + "\\] "); 168 } 169 sb.append(".*)"); 170 } 171 filterPattern = Pattern.compile(sb.toString()); 172 } 173 174 public static void reset() { 175 parameters.clear(); 176 } 177 178 @VisibleForTesting 179 public final Map<String, String> getFilterParams() { 180 return filterParams; 181 } 182 } 183 184 private String logFile; 185 private String logPath; 186 private Filter logFilter; 187 private Writer logWriter; 188 private long logRotation; 189 190 public XLogStreamer(Filter logFilter, Writer logWriter, String logPath, String logFile, long logRotationSecs) { 191 this.logWriter = logWriter; 192 this.logFilter = logFilter; 193 if (logFile == null) { 194 logFile = "oozie-app.log"; 195 } 196 this.logFile = logFile; 197 this.logPath = logPath; 198 this.logRotation = logRotationSecs * 1000l; 199 } 200 201 /** 202 * Gets the files that are modified between startTime and endTime in the given logPath and streams the log after 203 * applying the filters. 204 * 205 * @param startTime 206 * @param endTime 207 * @throws IOException 208 */ 209 public void streamLog(Date startTime, Date endTime) throws IOException { 210 long startTimeMillis = 0; 211 long endTimeMillis; 212 if (startTime != null) { 213 startTimeMillis = startTime.getTime(); 214 } 215 if (endTime == null) { 216 endTimeMillis = System.currentTimeMillis(); 217 } 218 else { 219 endTimeMillis = endTime.getTime(); 220 } 221 File dir = new File(logPath); 222 ArrayList<FileInfo> fileList = getFileList(dir, startTimeMillis, endTimeMillis, logRotation, logFile); 223 File file; 224 String fileName; 225 XLogReader logReader; 226 for (int i = 0; i < fileList.size(); i++) { 227 fileName = fileList.get(i).getFileName(); 228 if (fileName.endsWith(".gz")) { 229 file = new File(fileName); 230 GZIPInputStream gzipInputStream = null; 231 gzipInputStream = new GZIPInputStream(new FileInputStream(file)); 232 logReader = new XLogReader(gzipInputStream, logFilter, logWriter); 233 logReader.processLog(); 234 logReader.close(); 235 continue; 236 } 237 InputStream ifs; 238 ifs = new FileInputStream(fileName); 239 logReader = new XLogReader(ifs, logFilter, logWriter); 240 logReader.processLog(); 241 ifs.close(); 242 } 243 } 244 245 /** 246 * File name along with the modified time which will be used to sort later. 247 */ 248 class FileInfo implements Comparable<FileInfo> { 249 String fileName; 250 long modTime; 251 252 public FileInfo(String fileName, long modTime) { 253 this.fileName = fileName; 254 this.modTime = modTime; 255 } 256 257 public String getFileName() { 258 return fileName; 259 } 260 261 public long getModTime() { 262 return modTime; 263 } 264 265 public int compareTo(FileInfo fileInfo) { 266 long diff = this.modTime - fileInfo.modTime; 267 if (diff > 0) { 268 return 1; 269 } 270 else if (diff < 0) { 271 return -1; 272 } 273 else { 274 return 0; 275 } 276 } 277 } 278 279 /** 280 * Gets the file list that will have the logs between startTime and endTime. 281 * 282 * @param dir 283 * @param startTime 284 * @param endTime 285 * @param logRotationTime 286 * @param logFile 287 * @return List of files to be streamed 288 */ 289 private ArrayList<FileInfo> getFileList(File dir, long startTime, long endTime, long logRotationTime, String logFile) { 290 String[] children = dir.list(); 291 ArrayList<FileInfo> fileList = new ArrayList<FileInfo>(); 292 if (children == null) { 293 return fileList; 294 } 295 else { 296 for (int i = 0; i < children.length; i++) { 297 String fileName = children[i]; 298 if (!fileName.startsWith(logFile) && !fileName.equals(logFile)) { 299 continue; 300 } 301 File file = new File(dir.getAbsolutePath(), fileName); 302 if (fileName.endsWith(".gz")) { 303 long gzFileCreationTime = getGZFileCreationTime(fileName, startTime, endTime); 304 if (gzFileCreationTime != -1) { 305 fileList.add(new FileInfo(file.getAbsolutePath(), gzFileCreationTime)); 306 } 307 continue; 308 } 309 long modTime = file.lastModified(); 310 if (modTime < startTime) { 311 continue; 312 } 313 if (modTime / logRotationTime > (endTime / logRotationTime + 1)) { 314 continue; 315 } 316 fileList.add(new FileInfo(file.getAbsolutePath(), modTime)); 317 } 318 } 319 Collections.sort(fileList); 320 return fileList; 321 } 322 323 /** 324 * This pattern matches the end of a gzip filename to have a format like "-YYYY-MM-dd-HH.gz" with capturing groups for each part 325 * of the date 326 */ 327 public static final Pattern gzTimePattern = Pattern.compile(".*-(\\d\\d\\d\\d)-(\\d\\d)-(\\d\\d)-(\\d\\d)\\.gz"); 328 329 /** 330 * Returns the creation time of the .gz archive if it is relevant to the job 331 * 332 * @param fileName 333 * @param startTime 334 * @param endTime 335 * @return Modification time of .gz file after checking if it is relevant to the job 336 */ 337 private long getGZFileCreationTime(String fileName, long startTime, long endTime) { 338 // Default return value of -1 to exclude the file 339 long returnVal = -1; 340 341 // Include oozie.log as oozie.log.gz if it is accidentally GZipped 342 if (fileName.equals("oozie.log.gz")) { 343 LOG.warn("oozie.log has been GZipped, which is unexpected"); 344 // Return a value other than -1 to include the file in list 345 returnVal = 0; 346 } else { 347 Matcher m = gzTimePattern.matcher(fileName); 348 if (m.matches() && m.groupCount() == 4) { 349 int year = Integer.parseInt(m.group(1)); 350 int month = Integer.parseInt(m.group(2)); 351 int day = Integer.parseInt(m.group(3)); 352 int hour = Integer.parseInt(m.group(4)); 353 int minute = 0; 354 Calendar calendarEntry = Calendar.getInstance(); 355 calendarEntry.set(year, month - 1, day, hour, minute); // give month-1(Say, 7 for August) 356 long logFileStartTime = calendarEntry.getTimeInMillis(); 357 long milliSecondsPerHour = 3600000; 358 long logFileEndTime = logFileStartTime + milliSecondsPerHour; 359 /* To check whether the log content is there in the initial or later part of the log file or 360 the log content is contained entirely within this log file or 361 the entire log file contains the event log where the event spans across hours 362 */ 363 if ((startTime >= logFileStartTime && startTime <= logFileEndTime) 364 || (endTime >= logFileStartTime && endTime <= logFileEndTime) 365 || (startTime <= logFileStartTime && endTime >= logFileEndTime)) { 366 returnVal = logFileStartTime; 367 } 368 } else { 369 LOG.debug("Filename " + fileName + " does not match the expected format"); 370 returnVal = -1; 371 } 372 } 373 return returnVal; 374 } 375 }