001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.util; 020 021import java.io.BufferedReader; 022import java.io.IOException; 023import java.io.Writer; 024import java.util.ArrayList; 025import org.apache.oozie.service.Services; 026import org.apache.oozie.service.XLogStreamingService; 027 028 /** 029 * Encapsulates the parsing and filtering of the log messages from a BufferedReader. It makes sure not to read in the entire log 030 * into memory at the same time; at most, it will have two messages (which can be multi-line in the case of exception stack traces). 031 * <p> 032 * To use this class: Calling {@link TimestampedMessageParser#increment()} will tell the parser to read the next message from the 033 * Reader. It will return true if there are more messages and false if not. Calling 034 * {@link TimestampedMessageParser#getLastMessage()} and {@link TimestampedMessageParser#getLastTimestamp()} will return the last 035 * message and timestamp, respectively, that were parsed when {@link TimestampedMessageParser#increment()} was called. Calling 036 * {@link TimestampedMessageParser#processRemaining(java.io.Writer)} will write the remaining log messages to the given Writer. 037 */ 038public class TimestampedMessageParser { 039 040 static final String SYSTEM_LINE_SEPARATOR = System.getProperty("line.separator"); 041 protected BufferedReader reader; 042 private String nextLine = null; 043 private String lastTimestamp = null; 044 private XLogFilter filter; 045 private boolean empty = false; 046 private String lastMessage = null; 047 private boolean patternMatched = false; 048 public int count = 0; 049 050 /** 051 * Creates a TimestampedMessageParser with the given BufferedReader and filter. 052 * 053 * @param reader The BufferedReader to get the log messages from 054 * @param filter The filter 055 */ 056 public TimestampedMessageParser(BufferedReader reader, XLogFilter filter) { 057 this.reader = reader; 058 this.filter = filter; 059 if (filter == null) { 060 filter = new XLogFilter(); 061 } 062 filter.constructPattern(); 063 } 064 065 066 /** 067 * Causes the next message and timestamp to be parsed from the BufferedReader. 068 * 069 * @return true if there are more messages left; false if not 070 * @throws IOException If there was a problem reading from the Reader 071 */ 072 public boolean increment() throws IOException { 073 if (empty) { 074 return false; 075 } 076 077 StringBuilder message = new StringBuilder(); 078 079 if (nextLine == null) { // first time only 080 nextLine = parseNextLine(); 081 if (nextLine == null) { // reader finished 082 empty = true; 083 return false; 084 } 085 } 086 lastTimestamp = parseTimestamp(nextLine); 087 String nextTimestamp = null; 088 while (nextTimestamp == null) { 089 message.append(nextLine).append(SYSTEM_LINE_SEPARATOR); 090 nextLine = parseNextLine(); 091 if (nextLine != null) { 092 nextTimestamp = parseTimestamp(nextLine); // exit loop if we have a timestamp, continue if not 093 } 094 else { // reader finished 095 empty = true; 096 nextTimestamp = ""; // exit loop 097 } 098 } 099 100 lastMessage = message.toString(); 101 return true; 102 } 103 104 /** 105 * Returns the timestamp from the last message that was parsed. 106 * 107 * @return the timestamp from the last message that was parsed 108 */ 109 public String getLastTimestamp() { 110 return lastTimestamp; 111 } 112 113 /** 114 * Returns the message that was last parsed. 115 * 116 * @return the message that was last parsed 117 */ 118 public String getLastMessage() { 119 return lastMessage; 120 } 121 122 /** 123 * Closes the Reader. 124 * 125 * @throws IOException 126 */ 127 public void closeReader() throws IOException { 128 reader.close(); 129 } 130 131 /** 132 * Reads the next line from the Reader and checks if it matches the filter. It can also handle multi-line messages (i.e. 133 * exception stack traces). If it returns null, then there are no lines left in the Reader. 134 * 135 * @return The next line, or null 136 * @throws IOException 137 */ 138 protected String parseNextLine() throws IOException { 139 String line; 140 while ((line = reader.readLine()) != null) { 141 ArrayList<String> logParts = filter.splitLogMessage(line); 142 if (logParts != null) { 143 patternMatched = filter.matches(logParts); 144 } 145 if (patternMatched) { 146 if (filter.getLogLimit() != -1) { 147 if (logParts != null) { 148 if (count >= filter.getLogLimit()) { 149 return null; 150 } 151 count++; 152 } 153 } 154 if (logParts != null) { 155 if (filter.getEndDate() != null) { 156 //Ignore the milli second part 157 if (logParts.get(0).substring(0, 19).compareTo(filter.getFormattedEndDate()) > 0) 158 return null; 159 } 160 } 161 return line; 162 } 163 } 164 return line; 165 } 166 167 /** 168 * Parses the timestamp out of the passed in line. If there isn't one, it returns null. 169 * 170 * @param line The line to check 171 * @return the timestamp of the line, or null 172 */ 173 private String parseTimestamp(String line) { 174 String timestamp = null; 175 ArrayList<String> logParts = filter.splitLogMessage(line); 176 if (logParts != null) { 177 timestamp = logParts.get(0); 178 } 179 return timestamp; 180 } 181 182 /** 183 * Streams log messages to the passed in Writer. Flushes the log writing 184 * based on buffer len 185 * 186 * @param writer 187 * @param bufferLen maximum len of log buffer 188 * @param bytesWritten num bytes already written to writer 189 * @throws IOException 190 */ 191 public void processRemaining(Writer writer, int bufferLen, int bytesWritten) throws IOException { 192 while (increment()) { 193 writer.write(lastMessage); 194 bytesWritten += lastMessage.length(); 195 if (bytesWritten > bufferLen) { 196 writer.flush(); 197 bytesWritten = 0; 198 } 199 } 200 writer.flush(); 201 } 202 203 /** 204 * Streams log messages to the passed in Writer, with zero bytes already 205 * written 206 * 207 * @param writer 208 * @param bufferLen maximum len of log buffer 209 * @throws IOException 210 */ 211 public void processRemaining(Writer writer, int bufferLen) throws IOException { 212 processRemaining(writer, bufferLen, 0); 213 } 214 215 /** 216 * Streams log messages to the passed in Writer, with default buffer len 4K 217 * and zero bytes already written 218 * 219 * @param writer 220 * @throws IOException 221 */ 222 public void processRemaining(Writer writer) throws IOException { 223 processRemaining(writer, Services.get().get(XLogStreamingService.class).getBufferLen()); 224 } 225 226}