001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.util; 019 020 import java.io.File; 021 import java.io.FileInputStream; 022 import java.io.IOException; 023 import java.io.InputStream; 024 import java.io.Writer; 025 import java.util.ArrayList; 026 import java.util.Calendar; 027 import java.util.Collections; 028 import java.util.Date; 029 import java.util.HashMap; 030 import java.util.List; 031 import java.util.Map; 032 import java.util.regex.Matcher; 033 import java.util.regex.Pattern; 034 import java.util.zip.GZIPInputStream; 035 036 /** 037 * XLogStreamer streams the given log file to logWriter after applying the given filter. 038 */ 039 public class XLogStreamer { 040 041 /** 042 * Filter that will construct the regular expression that will be used to filter the log statement. And also checks 043 * if the given log message go through the filter. Filters that can be used are logLevel(Multi values separated by 044 * "|") jobId appName actionId token 045 */ 046 public static class Filter { 047 private Map<String, Integer> logLevels; 048 private Map<String, String> filterParams; 049 private static List<String> parameters = new ArrayList<String>(); 050 private boolean noFilter; 051 private Pattern filterPattern; 052 053 // TODO Patterns to be read from config file 054 private static final String DEFAULT_REGEX = "[^\\]]*"; 055 056 public static final String ALLOW_ALL_REGEX = "(.*)"; 057 private static final String TIMESTAMP_REGEX = "(\\d\\d\\d\\d-\\d\\d-\\d\\d \\d\\d:\\d\\d:\\d\\d,\\d\\d\\d)"; 058 private static final String WHITE_SPACE_REGEX = "\\s+"; 059 private static final String LOG_LEVEL_REGEX = "(\\w+)"; 060 private static final String PREFIX_REGEX = TIMESTAMP_REGEX + WHITE_SPACE_REGEX + LOG_LEVEL_REGEX 061 + WHITE_SPACE_REGEX; 062 private static final Pattern SPLITTER_PATTERN = Pattern.compile(PREFIX_REGEX + ALLOW_ALL_REGEX); 063 064 public Filter() { 065 filterParams = new HashMap<String, String>(); 066 for (int i = 0; i < parameters.size(); i++) { 067 filterParams.put(parameters.get(i), DEFAULT_REGEX); 068 } 069 logLevels = null; 070 noFilter = true; 071 filterPattern = null; 072 } 073 074 public void setLogLevel(String logLevel) { 075 if (logLevel != null && logLevel.trim().length() > 0) { 076 this.logLevels = new HashMap<String, Integer>(); 077 String[] levels = logLevel.split("\\|"); 078 for (int i = 0; i < levels.length; i++) { 079 String s = levels[i].trim().toUpperCase(); 080 try { 081 XLog.Level.valueOf(s); 082 } 083 catch (Exception ex) { 084 continue; 085 } 086 this.logLevels.put(levels[i].toUpperCase(), 1); 087 } 088 } 089 } 090 091 public void setParameter(String filterParam, String value) { 092 if (filterParams.containsKey(filterParam)) { 093 noFilter = false; 094 filterParams.put(filterParam, value); 095 } 096 } 097 098 public static void defineParameter(String filterParam) { 099 parameters.add(filterParam); 100 } 101 102 public boolean isFilterPresent() { 103 if (noFilter && logLevels == null) { 104 return false; 105 } 106 return true; 107 } 108 109 /** 110 * Checks if the logLevel and logMessage goes through the logFilter. 111 * 112 * @param logParts 113 * @return 114 */ 115 public boolean matches(ArrayList<String> logParts) { 116 String logLevel = logParts.get(0); 117 String logMessage = logParts.get(1); 118 if (this.logLevels == null || this.logLevels.containsKey(logLevel.toUpperCase())) { 119 Matcher logMatcher = filterPattern.matcher(logMessage); 120 return logMatcher.matches(); 121 } 122 else { 123 return false; 124 } 125 } 126 127 /** 128 * Splits the log line into timestamp, logLevel and remaining log message. Returns array containing logLevel and 129 * logMessage if the pattern matches i.e A new log statement, else returns null. 130 * 131 * @param logLine 132 * @return Array containing log level and log message 133 */ 134 public ArrayList<String> splitLogMessage(String logLine) { 135 Matcher splitter = SPLITTER_PATTERN.matcher(logLine); 136 if (splitter.matches()) { 137 ArrayList<String> logParts = new ArrayList<String>(); 138 logParts.add(splitter.group(2));// log level 139 logParts.add(splitter.group(3));// Log Message 140 return logParts; 141 } 142 else { 143 return null; 144 } 145 } 146 147 /** 148 * Constructs the regular expression according to the filter and assigns it to fileterPattarn. ".*" will be 149 * assigned if no filters are set. 150 */ 151 public void constructPattern() { 152 if (noFilter && logLevels == null) { 153 filterPattern = Pattern.compile(ALLOW_ALL_REGEX); 154 return; 155 } 156 StringBuilder sb = new StringBuilder(); 157 if (noFilter) { 158 sb.append("(.*)"); 159 } 160 else { 161 sb.append("(.* - "); 162 for (int i = 0; i < parameters.size(); i++) { 163 sb.append(parameters.get(i) + "\\["); 164 sb.append(filterParams.get(parameters.get(i)) + "\\] "); 165 } 166 sb.append(".*)"); 167 } 168 filterPattern = Pattern.compile(sb.toString()); 169 } 170 171 public static void reset() { 172 parameters.clear(); 173 } 174 } 175 176 private String logFile; 177 private String logPath; 178 private Filter logFilter; 179 private Writer logWriter; 180 private long logRotation; 181 182 public XLogStreamer(Filter logFilter, Writer logWriter, String logPath, String logFile, long logRotationSecs) { 183 this.logWriter = logWriter; 184 this.logFilter = logFilter; 185 if (logFile == null) { 186 logFile = "oozie-app.log"; 187 } 188 this.logFile = logFile; 189 this.logPath = logPath; 190 this.logRotation = logRotationSecs * 1000l; 191 } 192 193 /** 194 * Gets the files that are modified between startTime and endTime in the given logPath and streams the log after 195 * applying the filters. 196 * 197 * @param startTime 198 * @param endTime 199 * @throws IOException 200 */ 201 public void streamLog(Date startTime, Date endTime) throws IOException { 202 long startTimeMillis = 0; 203 long endTimeMillis; 204 if (startTime != null) { 205 startTimeMillis = startTime.getTime(); 206 } 207 if (endTime == null) { 208 endTimeMillis = System.currentTimeMillis(); 209 } 210 else { 211 endTimeMillis = endTime.getTime(); 212 } 213 File dir = new File(logPath); 214 ArrayList<FileInfo> fileList = getFileList(dir, startTimeMillis, endTimeMillis, logRotation, logFile); 215 File file; 216 String fileName; 217 XLogReader logReader; 218 for (int i = 0; i < fileList.size(); i++) { 219 fileName = fileList.get(i).getFileName(); 220 if (fileName.endsWith(".gz")) { 221 file = new File(fileName); 222 GZIPInputStream gzipInputStream = null; 223 gzipInputStream = new GZIPInputStream(new FileInputStream(file)); 224 logReader = new XLogReader(gzipInputStream, logFilter, logWriter); 225 logReader.processLog(); 226 logReader.close(); 227 continue; 228 } 229 InputStream ifs; 230 ifs = new FileInputStream(fileName); 231 logReader = new XLogReader(ifs, logFilter, logWriter); 232 logReader.processLog(); 233 ifs.close(); 234 } 235 } 236 237 /** 238 * File name along with the modified time which will be used to sort later. 239 */ 240 class FileInfo implements Comparable<FileInfo> { 241 String fileName; 242 long modTime; 243 244 public FileInfo(String fileName, long modTime) { 245 this.fileName = fileName; 246 this.modTime = modTime; 247 } 248 249 public String getFileName() { 250 return fileName; 251 } 252 253 public long getModTime() { 254 return modTime; 255 } 256 257 public int compareTo(FileInfo fileInfo) { 258 long diff = this.modTime - fileInfo.modTime; 259 if (diff > 0) { 260 return 1; 261 } 262 else if (diff < 0) { 263 return -1; 264 } 265 else { 266 return 0; 267 } 268 } 269 } 270 271 /** 272 * Gets the file list that will have the logs between startTime and endTime. 273 * 274 * @param dir 275 * @param startTime 276 * @param endTime 277 * @param logRotationTime 278 * @param logFile 279 * @return List of files to be streamed 280 */ 281 private ArrayList<FileInfo> getFileList(File dir, long startTime, long endTime, long logRotationTime, String logFile) { 282 String[] children = dir.list(); 283 ArrayList<FileInfo> fileList = new ArrayList<FileInfo>(); 284 if (children == null) { 285 return fileList; 286 } 287 else { 288 for (int i = 0; i < children.length; i++) { 289 String fileName = children[i]; 290 if (!fileName.startsWith(logFile) && !fileName.equals(logFile)) { 291 continue; 292 } 293 File file = new File(dir.getAbsolutePath(), fileName); 294 if (fileName.endsWith(".gz")) { 295 long gzFileCreationTime = getGZFileCreationTime(fileName, startTime, endTime); 296 if (gzFileCreationTime != -1) { 297 fileList.add(new FileInfo(file.getAbsolutePath(), gzFileCreationTime)); 298 } 299 continue; 300 } 301 long modTime = file.lastModified(); 302 if (modTime < startTime) { 303 continue; 304 } 305 if (modTime / logRotationTime > (endTime / logRotationTime + 1)) { 306 continue; 307 } 308 fileList.add(new FileInfo(file.getAbsolutePath(), modTime)); 309 } 310 } 311 Collections.sort(fileList); 312 return fileList; 313 } 314 315 /** 316 * Returns the creation time of the .gz archive if it is relevant to the job 317 * 318 * @param fileName 319 * @param startTime 320 * @param endTime 321 * @return Modification time of .gz file after checking if it is relevant to the job 322 */ 323 private long getGZFileCreationTime(String fileName, long startTime, long endTime) { 324 long returnVal = -1; 325 int dateStartIndex = 10; 326 String[] dateDetails; 327 dateDetails = fileName.substring(dateStartIndex, fileName.length() - 3).split("-"); 328 int year = Integer.parseInt(dateDetails[0]); 329 int month = Integer.parseInt(dateDetails[1]); 330 int day = Integer.parseInt(dateDetails[2]); 331 int hour = Integer.parseInt(dateDetails[3]); 332 int minute = 0; 333 Calendar calendarEntry = Calendar.getInstance(); 334 calendarEntry.set(year, month - 1, day, hour, minute); // give month-1(Say, 7 for August) 335 long logFileStartTime = calendarEntry.getTimeInMillis(); 336 long milliSecondsPerHour = 3600000; 337 long logFileEndTime = logFileStartTime + milliSecondsPerHour; 338 if ((startTime >= logFileStartTime && startTime <= logFileEndTime) 339 || (endTime >= logFileStartTime && endTime <= logFileEndTime)) { 340 returnVal = logFileStartTime; 341 } 342 return returnVal; 343 } 344 }