001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.util; 020 021import java.io.BufferedReader; 022import java.io.IOException; 023import java.io.Writer; 024import java.util.ArrayList; 025import java.util.regex.Pattern; 026 027import org.apache.oozie.service.Services; 028import org.apache.oozie.service.XLogStreamingService; 029import org.apache.oozie.util.LogLine.MATCHED_PATTERN; 030 031 /** 032 * Encapsulates the parsing and filtering of the log messages from a BufferedReader. It makes sure not to read in the entire log 033 * into memory at the same time; at most, it will have two messages (which can be multi-line in the case of exception stack traces). 034 * <p> 035 * To use this class: Calling {@link TimestampedMessageParser#increment()} will tell the parser to read the next message from the 036 * Reader. It will return true if there are more messages and false if not. Calling 037 * {@link TimestampedMessageParser#getLastMessage()} and {@link TimestampedMessageParser#getLastTimestamp()} will return the last 038 * message and timestamp, respectively, that were parsed when {@link TimestampedMessageParser#increment()} was called. Calling 039 * {@link TimestampedMessageParser#processRemaining(java.io.Writer,org.apache.oozie.util.XLogStreamer)} will write the 040 * remaining log messages to the given Writer. 041 */ 042public class TimestampedMessageParser { 043 044 static final String SYSTEM_LINE_SEPARATOR = System.getProperty("line.separator"); 045 protected BufferedReader reader; 046 private LogLine nextLine = null; 047 private String lastTimestamp = null; 048 private XLogFilter filter; 049 private boolean empty = false; 050 private String lastMessage = null; 051 private boolean patternMatched = false; 052 public int count = 0; 053 private Pattern splitPattern = null; 054 055 /** 056 * Creates a TimestampedMessageParser with the given BufferedReader and filter. 057 * 058 * @param reader The BufferedReader to get the log messages from 059 * @param filter The filter 060 */ 061 public TimestampedMessageParser(BufferedReader reader, XLogFilter filter) { 062 this.reader = reader; 063 this.filter = filter; 064 if (filter == null) { 065 filter = new XLogFilter(); 066 } 067 filter.constructPattern(); 068 String regEx = XLogFilter.PREFIX_REGEX + filter.getFilterPattern().pattern(); 069 this.splitPattern = Pattern.compile(regEx); 070 } 071 072 073 /** 074 * Causes the next message and timestamp to be parsed from the BufferedReader. 075 * 076 * @return true if there are more messages left; false if not 077 * @throws IOException If there was a problem reading from the Reader 078 */ 079 public boolean increment() throws IOException { 080 if (empty) { 081 return false; 082 } 083 084 StringBuilder message = new StringBuilder(); 085 if (nextLine == null) { // first time only 086 nextLine = parseNextLogLine(); 087 if (nextLine == null || nextLine.getLine() == null) { 088 // reader finished 089 empty = true; 090 return false; 091 } 092 } 093 lastTimestamp = parseTimestamp(nextLine); 094 String nextTimestamp = null; 095 while (nextTimestamp == null) { 096 message.append(nextLine.getLine()).append(SYSTEM_LINE_SEPARATOR); 097 nextLine = parseNextLogLine(); 098 if (nextLine != null && nextLine.getLine() != null) { 099 // exit loop if we have a timestamp, continue if not 100 nextTimestamp = parseTimestamp(nextLine); 101 } 102 else { // reader finished 103 empty = true; 104 nextTimestamp = ""; // exit loop 105 } 106 } 107 108 lastMessage = message.toString(); 109 return true; 110 } 111 112 /** 113 * Returns the timestamp from the last message that was parsed. 114 * 115 * @return the timestamp from the last message that was parsed 116 */ 117 public String getLastTimestamp() { 118 return lastTimestamp; 119 } 120 121 /** 122 * Returns the message that was last parsed. 123 * 124 * @return the message that was last parsed 125 */ 126 public String getLastMessage() { 127 return lastMessage; 128 } 129 130 /** 131 * Closes the Reader. 132 * 133 * @throws IOException if the reader can't be closed 134 */ 135 public void closeReader() throws IOException { 136 reader.close(); 137 } 138 139 /** 140 * Reads the next line from the Reader and checks if it matches the filter. 141 * It can also handle multi-line messages (i.e. exception stack traces). If 142 * it returns null, then there are no lines left in the Reader. 143 * 144 * @return LogLine 145 * @throws IOException in case of an error in the Reader 146 */ 147 protected LogLine parseNextLogLine() throws IOException { 148 String line; 149 LogLine logLine = new LogLine(); 150 while ((line = reader.readLine()) != null) { 151 logLine.setLine(line); 152 logLine.setLogParts(null); 153 filter.splitLogMessage(logLine, splitPattern); 154 // check the splits if logLine matches with the splitPattern 155 // Otherwise, go with previous patternMatched value. This is needed 156 // in parsing stack trace 157 patternMatched = logLine.getMatchedPattern() == MATCHED_PATTERN.NONE ? patternMatched 158 : filter.splitsMatches(logLine); 159 if (patternMatched) { 160 if (filter.getLogLimit() != -1) { 161 if (logLine.getLogParts() != null) { 162 if (count >= filter.getLogLimit()) { 163 return null; 164 } 165 count++; 166 } 167 } 168 if (logLine.getLogParts() != null) { 169 if (filter.getEndDate() != null) { 170 // Ignore the milli second part 171 if (logLine.getLogParts().get(0).substring(0, 19).compareTo(filter.getFormattedEndDate()) > 0) 172 return null; 173 } 174 } 175 return logLine; 176 } 177 } 178 logLine.setLine(null); 179 return logLine; 180 } 181 182 /** 183 * Parses the timestamp out of the passed in line. If there isn't one, it 184 * returns null. 185 * 186 * @param logLine The LogLine to check 187 * @return the timestamp of the line, or null 188 */ 189 private String parseTimestamp(LogLine logLine) { 190 String timestamp = null; 191 if (logLine != null && logLine.getLogParts() != null && logLine.getLogParts().size() > 0) { 192 timestamp = logLine.getLogParts().get(0); 193 } 194 return timestamp; 195 } 196 197 /** 198 * Streams log messages to the passed in Writer, with zero bytes already 199 * written 200 * 201 * @param writer the target writer 202 * @param logStreamer the log streamer 203 * @throws IOException in case of IO error 204 */ 205 public void processRemaining(Writer writer, XLogStreamer logStreamer) throws IOException { 206 while (increment()) { 207 writer.write(lastMessage); 208 if (logStreamer.shouldFlushOutput(lastMessage.length())) { 209 writer.flush(); 210 } 211 } 212 writer.flush(); 213 } 214 215 /** 216 * Splits the log message into parts 217 * 218 * @param line the line to split 219 * @return List of log parts 220 */ 221 protected ArrayList<String> splitLogMessage(String line) { 222 return filter.splitLogMessage(line); 223 } 224 225 226}