001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.util; 020 021import java.io.File; 022import java.io.IOException; 023import java.io.Writer; 024import java.util.ArrayList; 025import java.util.Calendar; 026import java.util.Collections; 027import java.util.Date; 028import java.util.Map; 029import java.util.regex.Matcher; 030import java.util.regex.Pattern; 031import java.io.BufferedReader; 032 033import org.apache.commons.lang.StringUtils; 034import org.apache.oozie.client.rest.RestConstants; 035import org.apache.oozie.command.CommandException; 036import org.apache.oozie.service.ConfigurationService; 037import org.apache.oozie.service.Service; 038import org.apache.oozie.service.Services; 039import org.apache.oozie.service.XLogService; 040 041/** 042 * XLogStreamer streams the given log file to writer after applying the given filter. 043 */ 044public class XLogStreamer { 045 private static XLog LOG = XLog.getLog(XLogStreamer.class); 046 protected static final String CONF_PREFIX = Service.CONF_PREFIX + "XLogStreamingService."; 047 public static final String STREAM_BUFFER_LEN = CONF_PREFIX + "buffer.len"; 048 049 private String logFile; 050 private String logPath; 051 protected XLogFilter logFilter; 052 private long logRotation; 053 Map<String, String[]> requestParam; 054 protected int totalDataWritten; 055 protected int bufferLen; 056 057 public XLogStreamer(XLogFilter logFilter, String logPath, String logFile, long logRotationSecs) { 058 if (logFile == null) { 059 logFile = "oozie-app.log"; 060 } 061 062 this.logFilter = logFilter; 063 this.logFile = logFile; 064 this.logPath = logPath; 065 this.logRotation = logRotationSecs * 1000l; 066 bufferLen = ConfigurationService.getInt(STREAM_BUFFER_LEN, 4096); 067 } 068 069 public XLogStreamer(XLogFilter logFilter) { 070 this(logFilter, Services.get().get(XLogService.class).getOozieLogPath(), Services.get().get(XLogService.class) 071 .getOozieLogName(), Services.get().get(XLogService.class).getOozieLogRotation()); 072 073 } 074 075 public XLogStreamer(XLogFilter logFilter, Map<String, String[]> params) { 076 this(logFilter, Services.get().get(XLogService.class).getOozieLogPath(), Services.get().get(XLogService.class) 077 .getOozieLogName(), Services.get().get(XLogService.class).getOozieLogRotation()); 078 this.requestParam = params; 079 080 } 081 082 083 public XLogStreamer(Map<String, String[]> params) throws CommandException { 084 this(new XLogFilter(new XLogUserFilterParam(params))); 085 this.requestParam = params; 086 } 087 088 /** 089 * Gets the files that are modified between startTime and endTime in the given logPath and streams the log after 090 * applying the filters. 091 * 092 * @param writer the target writer 093 * @param startTime the start time 094 * @param endTime the end time 095 * @throws IOException Signals that an I/O exception has occurred. 096 */ 097 public void streamLog(Writer writer, Date startTime, Date endTime) throws IOException { 098 streamLog(writer, startTime, endTime, true); 099 } 100 101 /** 102 * Gets the files that are modified between startTime and endTime in the given logPath and streams the log after 103 * applying the filters 104 * 105 * @param writer the writer 106 * @param startTime the start time 107 * @param endTime the end time 108 * @param appendDebug the append debug 109 * @throws IOException Signals that an I/O exception has occurred. 110 */ 111 public void streamLog(Writer writer, Date startTime, Date endTime, boolean appendDebug) throws IOException { 112 // Get a Reader for the log file(s) 113 BufferedReader reader = new BufferedReader(getReader(startTime, endTime)); 114 try { 115 if (appendDebug) { 116 if (!StringUtils.isEmpty(logFilter.getTruncatedMessage())) { 117 writer.write(logFilter.getTruncatedMessage()); 118 } 119 if (logFilter.isDebugMode()) { 120 writer.write(logFilter.getDebugMessage()); 121 } 122 } 123 // Process the entire logs from the reader using the logFilter 124 new TimestampedMessageParser(reader, logFilter).processRemaining(writer, this); 125 } 126 finally { 127 reader.close(); 128 } 129 } 130 131 /** 132 * Returns a BufferedReader configured to read the log files based on the given startTime and endTime. 133 * 134 * @param startTime the start time 135 * @param endTime the end time 136 * @return A BufferedReader for the log files 137 * @throws IOException Signals that an I/O exception has occurred. 138 */ 139 140 private MultiFileReader getReader(Date startTime, Date endTime) throws IOException { 141 calculateAndValidateDateRange(startTime, endTime); 142 return new MultiFileReader(getFileList(logFilter.getStartDate(), logFilter.getEndDate())); 143 } 144 145 protected void calculateAndValidateDateRange(Date startTime, Date endTime) throws IOException { 146 logFilter.calculateAndCheckDates(startTime, endTime); 147 logFilter.validateDateRange(startTime, endTime); 148 } 149 150 public BufferedReader makeReader(Date startTime, Date endTime) throws IOException { 151 return new BufferedReader(getReader(startTime, endTime)); 152 } 153 154 /** 155 * Gets the log file list for specific date range. 156 * 157 * @param startTime the start time 158 * @param endTime the end time 159 * @return log file list 160 * @throws IOException Signals that an I/O exception has occurred. 161 */ 162 private ArrayList<File> getFileList(Date startTime, Date endTime) throws IOException { 163 long startTimeMillis = 0; 164 long endTimeMillis; 165 if (startTime != null) { 166 startTimeMillis = startTime.getTime(); 167 } 168 if (endTime == null) { 169 endTimeMillis = System.currentTimeMillis(); 170 } 171 else { 172 endTimeMillis = endTime.getTime(); 173 } 174 File dir = new File(logPath); 175 return getFileList(dir, startTimeMillis, endTimeMillis, logRotation, logFile); 176 } 177 178 /** 179 * File along with the modified time which will be used to sort later. 180 */ 181 public class FileInfo implements Comparable<FileInfo> { 182 File file; 183 long modTime; 184 185 public FileInfo(File file, long modTime) { 186 this.file = file; 187 this.modTime = modTime; 188 } 189 190 public File getFile() { 191 return file; 192 } 193 194 public long getModTime() { 195 return modTime; 196 } 197 198 public int compareTo(FileInfo fileInfo) { 199 long diff = this.modTime - fileInfo.modTime; 200 if (diff > 0) { 201 return 1; 202 } 203 else if (diff < 0) { 204 return -1; 205 } 206 else { 207 return 0; 208 } 209 } 210 } 211 212 /** 213 * Gets the file list that will have the logs between startTime and endTime. 214 * 215 * @param dir the directory to list 216 * @param startTime the start time 217 * @param endTime the end time 218 * @param logRotationTime the log rotation time 219 * @param logFile the file to look up 220 * @return List of files to be streamed 221 */ 222 private ArrayList<File> getFileList(File dir, long startTime, long endTime, long logRotationTime, String logFile) { 223 String[] children = dir.list(); 224 ArrayList<FileInfo> fileList = new ArrayList<FileInfo>(); 225 if (children == null) { 226 return new ArrayList<File>(); 227 } 228 else { 229 for (int i = 0; i < children.length; i++) { 230 String fileName = children[i]; 231 if (!fileName.startsWith(logFile) && !fileName.equals(logFile)) { 232 continue; 233 } 234 File file = new File(dir.getAbsolutePath(), fileName); 235 if (fileName.endsWith(".gz")) { 236 long gzFileCreationTime = getGZFileCreationTime(fileName, startTime, endTime); 237 if (gzFileCreationTime != -1) { 238 fileList.add(new FileInfo(file, gzFileCreationTime)); 239 } 240 continue; 241 } 242 long modTime = file.lastModified(); 243 if (modTime < startTime) { 244 continue; 245 } 246 if (modTime / logRotationTime > (endTime / logRotationTime + 1)) { 247 continue; 248 } 249 fileList.add(new FileInfo(file, modTime)); 250 } 251 } 252 Collections.sort(fileList); 253 ArrayList<File> files = new ArrayList<File>(fileList.size()); 254 for (FileInfo info : fileList) { 255 files.add(info.getFile()); 256 } 257 return files; 258 } 259 260 /** 261 * This pattern matches the end of a gzip filename to have a format like "-YYYY-MM-dd-HH.gz" with capturing groups for each part 262 * of the date 263 */ 264 public static final Pattern gzTimePattern = Pattern.compile(".*-(\\d\\d\\d\\d)-(\\d\\d)-(\\d\\d)-(\\d\\d)\\.gz"); 265 266 /** 267 * Returns the creation time of the .gz archive if it is relevant to the job 268 * 269 * @param fileName 270 * @param startTime 271 * @param endTime 272 * @return Modification time of .gz file after checking if it is relevant to the job 273 */ 274 private long getGZFileCreationTime(String fileName, long startTime, long endTime) { 275 // Default return value of -1 to exclude the file 276 long returnVal = -1; 277 278 // Include oozie.log as oozie.log.gz if it is accidentally GZipped 279 if (fileName.equals("oozie.log.gz")) { 280 LOG.warn("oozie.log has been GZipped, which is unexpected"); 281 // Return a value other than -1 to include the file in list 282 returnVal = 0; 283 } 284 else { 285 Matcher m = gzTimePattern.matcher(fileName); 286 if (m.matches() && m.groupCount() == 4) { 287 int year = Integer.parseInt(m.group(1)); 288 int month = Integer.parseInt(m.group(2)); 289 int day = Integer.parseInt(m.group(3)); 290 int hour = Integer.parseInt(m.group(4)); 291 int minute = 0; 292 Calendar calendarEntry = Calendar.getInstance(); 293 calendarEntry.set(year, month - 1, day, hour, minute); // give month-1(Say, 7 for August) 294 long logFileStartTime = calendarEntry.getTimeInMillis(); 295 long milliSecondsPerHour = 3600000; 296 long logFileEndTime = logFileStartTime + milliSecondsPerHour; 297 /* To check whether the log content is there in the initial or later part of the log file or 298 the log content is contained entirely within this log file or 299 the entire log file contains the event log where the event spans across hours 300 */ 301 if ((startTime >= logFileStartTime && startTime <= logFileEndTime) 302 || (endTime >= logFileStartTime && endTime <= logFileEndTime) 303 || (startTime <= logFileStartTime && endTime >= logFileEndTime)) { 304 returnVal = logFileStartTime; 305 } 306 } 307 else { 308 LOG.debug("Filename " + fileName + " does not match the expected format"); 309 returnVal = -1; 310 } 311 } 312 return returnVal; 313 } 314 315 public boolean isLogEnabled() { 316 return Services.get().get(XLogService.class).getLogOverWS(); 317 } 318 319 public String getLogType() { 320 return RestConstants.JOB_SHOW_LOG; 321 } 322 323 public XLogFilter getXLogFilter() { 324 return logFilter; 325 } 326 327 public String getLogDisableMessage() { 328 return "Log streaming disabled!!"; 329 } 330 331 public Map<String, String[]> getRequestParam() { 332 return requestParam; 333 } 334 335 public boolean shouldFlushOutput(int writtenBytes) { 336 this.totalDataWritten += writtenBytes; 337 if (this.totalDataWritten > getBufferLen()) { 338 this.totalDataWritten = 0; 339 return true; 340 } 341 else { 342 return false; 343 } 344 } 345 346 public int getBufferLen() { 347 return bufferLen; 348 } 349}