001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.util; 019 020 import java.io.File; 021 import java.io.FileInputStream; 022 import java.io.IOException; 023 import java.io.InputStream; 024 import java.io.Writer; 025 import java.util.ArrayList; 026 import java.util.Calendar; 027 import java.util.Collections; 028 import java.util.Date; 029 import java.util.HashMap; 030 import java.util.List; 031 import java.util.Map; 032 import java.util.regex.Matcher; 033 import java.util.regex.Pattern; 034 import java.util.zip.GZIPInputStream; 035 036 /** 037 * XLogStreamer streams the given log file to logWriter after applying the given filter. 038 */ 039 public class XLogStreamer { 040 private static XLog LOG = XLog.getLog(XLogStreamer.class); 041 042 /** 043 * Filter that will construct the regular expression that will be used to filter the log statement. And also checks 044 * if the given log message go through the filter. Filters that can be used are logLevel(Multi values separated by 045 * "|") jobId appName actionId token 046 */ 047 public static class Filter { 048 private Map<String, Integer> logLevels; 049 private Map<String, String> filterParams; 050 private static List<String> parameters = new ArrayList<String>(); 051 private boolean noFilter; 052 private Pattern filterPattern; 053 054 // TODO Patterns to be read from config file 055 private static final String DEFAULT_REGEX = "[^\\]]*"; 056 057 public static final String ALLOW_ALL_REGEX = "(.*)"; 058 private static final String TIMESTAMP_REGEX = "(\\d\\d\\d\\d-\\d\\d-\\d\\d \\d\\d:\\d\\d:\\d\\d,\\d\\d\\d)"; 059 private static final String WHITE_SPACE_REGEX = "\\s+"; 060 private static final String LOG_LEVEL_REGEX = "(\\w+)"; 061 private static final String PREFIX_REGEX = TIMESTAMP_REGEX + WHITE_SPACE_REGEX + LOG_LEVEL_REGEX 062 + WHITE_SPACE_REGEX; 063 private static final Pattern SPLITTER_PATTERN = Pattern.compile(PREFIX_REGEX + ALLOW_ALL_REGEX); 064 065 public Filter() { 066 filterParams = new HashMap<String, String>(); 067 for (int i = 0; i < parameters.size(); i++) { 068 filterParams.put(parameters.get(i), DEFAULT_REGEX); 069 } 070 logLevels = null; 071 noFilter = true; 072 filterPattern = null; 073 } 074 075 public void setLogLevel(String logLevel) { 076 if (logLevel != null && logLevel.trim().length() > 0) { 077 this.logLevels = new HashMap<String, Integer>(); 078 String[] levels = logLevel.split("\\|"); 079 for (int i = 0; i < levels.length; i++) { 080 String s = levels[i].trim().toUpperCase(); 081 try { 082 XLog.Level.valueOf(s); 083 } 084 catch (Exception ex) { 085 continue; 086 } 087 this.logLevels.put(levels[i].toUpperCase(), 1); 088 } 089 } 090 } 091 092 public void setParameter(String filterParam, String value) { 093 if (filterParams.containsKey(filterParam)) { 094 noFilter = false; 095 filterParams.put(filterParam, value); 096 } 097 } 098 099 public static void defineParameter(String filterParam) { 100 parameters.add(filterParam); 101 } 102 103 public boolean isFilterPresent() { 104 if (noFilter && logLevels == null) { 105 return false; 106 } 107 return true; 108 } 109 110 /** 111 * Checks if the logLevel and logMessage goes through the logFilter. 112 * 113 * @param logParts 114 * @return 115 */ 116 public boolean matches(ArrayList<String> logParts) { 117 String logLevel = logParts.get(0); 118 String logMessage = logParts.get(1); 119 if (this.logLevels == null || this.logLevels.containsKey(logLevel.toUpperCase())) { 120 Matcher logMatcher = filterPattern.matcher(logMessage); 121 return logMatcher.matches(); 122 } 123 else { 124 return false; 125 } 126 } 127 128 /** 129 * Splits the log line into timestamp, logLevel and remaining log message. Returns array containing logLevel and 130 * logMessage if the pattern matches i.e A new log statement, else returns null. 131 * 132 * @param logLine 133 * @return Array containing log level and log message 134 */ 135 public ArrayList<String> splitLogMessage(String logLine) { 136 Matcher splitter = SPLITTER_PATTERN.matcher(logLine); 137 if (splitter.matches()) { 138 ArrayList<String> logParts = new ArrayList<String>(); 139 logParts.add(splitter.group(2));// log level 140 logParts.add(splitter.group(3));// Log Message 141 return logParts; 142 } 143 else { 144 return null; 145 } 146 } 147 148 /** 149 * Constructs the regular expression according to the filter and assigns it to fileterPattarn. ".*" will be 150 * assigned if no filters are set. 151 */ 152 public void constructPattern() { 153 if (noFilter && logLevels == null) { 154 filterPattern = Pattern.compile(ALLOW_ALL_REGEX); 155 return; 156 } 157 StringBuilder sb = new StringBuilder(); 158 if (noFilter) { 159 sb.append("(.*)"); 160 } 161 else { 162 sb.append("(.* "); 163 for (int i = 0; i < parameters.size(); i++) { 164 sb.append(parameters.get(i) + "\\["); 165 sb.append(filterParams.get(parameters.get(i)) + "\\] "); 166 } 167 sb.append(".*)"); 168 } 169 filterPattern = Pattern.compile(sb.toString()); 170 } 171 172 public static void reset() { 173 parameters.clear(); 174 } 175 } 176 177 private String logFile; 178 private String logPath; 179 private Filter logFilter; 180 private Writer logWriter; 181 private long logRotation; 182 183 public XLogStreamer(Filter logFilter, Writer logWriter, String logPath, String logFile, long logRotationSecs) { 184 this.logWriter = logWriter; 185 this.logFilter = logFilter; 186 if (logFile == null) { 187 logFile = "oozie-app.log"; 188 } 189 this.logFile = logFile; 190 this.logPath = logPath; 191 this.logRotation = logRotationSecs * 1000l; 192 } 193 194 /** 195 * Gets the files that are modified between startTime and endTime in the given logPath and streams the log after 196 * applying the filters. 197 * 198 * @param startTime 199 * @param endTime 200 * @throws IOException 201 */ 202 public void streamLog(Date startTime, Date endTime) throws IOException { 203 long startTimeMillis = 0; 204 long endTimeMillis; 205 if (startTime != null) { 206 startTimeMillis = startTime.getTime(); 207 } 208 if (endTime == null) { 209 endTimeMillis = System.currentTimeMillis(); 210 } 211 else { 212 endTimeMillis = endTime.getTime(); 213 } 214 File dir = new File(logPath); 215 ArrayList<FileInfo> fileList = getFileList(dir, startTimeMillis, endTimeMillis, logRotation, logFile); 216 File file; 217 String fileName; 218 XLogReader logReader; 219 for (int i = 0; i < fileList.size(); i++) { 220 fileName = fileList.get(i).getFileName(); 221 if (fileName.endsWith(".gz")) { 222 file = new File(fileName); 223 GZIPInputStream gzipInputStream = null; 224 gzipInputStream = new GZIPInputStream(new FileInputStream(file)); 225 logReader = new XLogReader(gzipInputStream, logFilter, logWriter); 226 logReader.processLog(); 227 logReader.close(); 228 continue; 229 } 230 InputStream ifs; 231 ifs = new FileInputStream(fileName); 232 logReader = new XLogReader(ifs, logFilter, logWriter); 233 logReader.processLog(); 234 ifs.close(); 235 } 236 } 237 238 /** 239 * File name along with the modified time which will be used to sort later. 240 */ 241 class FileInfo implements Comparable<FileInfo> { 242 String fileName; 243 long modTime; 244 245 public FileInfo(String fileName, long modTime) { 246 this.fileName = fileName; 247 this.modTime = modTime; 248 } 249 250 public String getFileName() { 251 return fileName; 252 } 253 254 public long getModTime() { 255 return modTime; 256 } 257 258 public int compareTo(FileInfo fileInfo) { 259 long diff = this.modTime - fileInfo.modTime; 260 if (diff > 0) { 261 return 1; 262 } 263 else if (diff < 0) { 264 return -1; 265 } 266 else { 267 return 0; 268 } 269 } 270 } 271 272 /** 273 * Gets the file list that will have the logs between startTime and endTime. 274 * 275 * @param dir 276 * @param startTime 277 * @param endTime 278 * @param logRotationTime 279 * @param logFile 280 * @return List of files to be streamed 281 */ 282 private ArrayList<FileInfo> getFileList(File dir, long startTime, long endTime, long logRotationTime, String logFile) { 283 String[] children = dir.list(); 284 ArrayList<FileInfo> fileList = new ArrayList<FileInfo>(); 285 if (children == null) { 286 return fileList; 287 } 288 else { 289 for (int i = 0; i < children.length; i++) { 290 String fileName = children[i]; 291 if (!fileName.startsWith(logFile) && !fileName.equals(logFile)) { 292 continue; 293 } 294 File file = new File(dir.getAbsolutePath(), fileName); 295 if (fileName.endsWith(".gz")) { 296 long gzFileCreationTime = getGZFileCreationTime(fileName, startTime, endTime); 297 if (gzFileCreationTime != -1) { 298 fileList.add(new FileInfo(file.getAbsolutePath(), gzFileCreationTime)); 299 } 300 continue; 301 } 302 long modTime = file.lastModified(); 303 if (modTime < startTime) { 304 continue; 305 } 306 if (modTime / logRotationTime > (endTime / logRotationTime + 1)) { 307 continue; 308 } 309 fileList.add(new FileInfo(file.getAbsolutePath(), modTime)); 310 } 311 } 312 Collections.sort(fileList); 313 return fileList; 314 } 315 316 /** 317 * This pattern matches the end of a gzip filename to have a format like "-YYYY-MM-dd-HH.gz" with capturing groups for each part 318 * of the date 319 */ 320 public static final Pattern gzTimePattern = Pattern.compile(".*-(\\d\\d\\d\\d)-(\\d\\d)-(\\d\\d)-(\\d\\d)\\.gz"); 321 322 /** 323 * Returns the creation time of the .gz archive if it is relevant to the job 324 * 325 * @param fileName 326 * @param startTime 327 * @param endTime 328 * @return Modification time of .gz file after checking if it is relevant to the job 329 */ 330 private long getGZFileCreationTime(String fileName, long startTime, long endTime) { 331 // Default return value of -1 to exclude the file 332 long returnVal = -1; 333 334 // Include oozie.log as oozie.log.gz if it is accidentally GZipped 335 if (fileName.equals("oozie.log.gz")) { 336 LOG.warn("oozie.log has been GZipped, which is unexpected"); 337 // Return a value other than -1 to include the file in list 338 returnVal = 0; 339 } else { 340 Matcher m = gzTimePattern.matcher(fileName); 341 if (m.matches() && m.groupCount() == 4) { 342 int year = Integer.parseInt(m.group(1)); 343 int month = Integer.parseInt(m.group(2)); 344 int day = Integer.parseInt(m.group(3)); 345 int hour = Integer.parseInt(m.group(4)); 346 int minute = 0; 347 Calendar calendarEntry = Calendar.getInstance(); 348 calendarEntry.set(year, month - 1, day, hour, minute); // give month-1(Say, 7 for August) 349 long logFileStartTime = calendarEntry.getTimeInMillis(); 350 long milliSecondsPerHour = 3600000; 351 long logFileEndTime = logFileStartTime + milliSecondsPerHour; 352 /* To check whether the log content is there in the initial or later part of the log file or 353 the log content is contained entirely within this log file or 354 the entire log file contains the event log where the event spans across hours 355 */ 356 if ((startTime >= logFileStartTime && startTime <= logFileEndTime) 357 || (endTime >= logFileStartTime && endTime <= logFileEndTime) 358 || (startTime <= logFileStartTime && endTime >= logFileEndTime)) { 359 returnVal = logFileStartTime; 360 } 361 } else { 362 LOG.debug("Filename " + fileName + " does not match the expected format"); 363 returnVal = -1; 364 } 365 } 366 return returnVal; 367 } 368 }