001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.util; 020 021import java.io.BufferedReader; 022import java.io.IOException; 023import java.io.Writer; 024import java.util.ArrayList; 025import org.apache.oozie.service.Services; 026import org.apache.oozie.service.XLogStreamingService; 027 028 /** 029 * Encapsulates the parsing and filtering of the log messages from a BufferedReader. It makes sure not to read in the entire log 030 * into memory at the same time; at most, it will have two messages (which can be multi-line in the case of exception stack traces). 031 * <p> 032 * To use this class: Calling {@link TimestampedMessageParser#increment()} will tell the parser to read the next message from the 033 * Reader. It will return true if there are more messages and false if not. Calling 034 * {@link TimestampedMessageParser#getLastMessage()} and {@link TimestampedMessageParser#getLastTimestamp()} will return the last 035 * message and timestamp, respectively, that were parsed when {@link TimestampedMessageParser#increment()} was called. Calling 036 * {@link TimestampedMessageParser#processRemaining(java.io.Writer)} will write the remaining log messages to the given Writer. 037 */ 038public class TimestampedMessageParser { 039 040 protected BufferedReader reader; 041 private String nextLine = null; 042 private String lastTimestamp = null; 043 private XLogFilter filter; 044 private boolean empty = false; 045 private String lastMessage = null; 046 private boolean patternMatched = false; 047 public int count = 0; 048 049 /** 050 * Creates a TimestampedMessageParser with the given BufferedReader and filter. 051 * 052 * @param reader The BufferedReader to get the log messages from 053 * @param filter The filter 054 */ 055 public TimestampedMessageParser(BufferedReader reader, XLogFilter filter) { 056 this.reader = reader; 057 this.filter = filter; 058 if (filter == null) { 059 filter = new XLogFilter(); 060 } 061 filter.constructPattern(); 062 } 063 064 065 /** 066 * Causes the next message and timestamp to be parsed from the BufferedReader. 067 * 068 * @return true if there are more messages left; false if not 069 * @throws IOException If there was a problem reading from the Reader 070 */ 071 public boolean increment() throws IOException { 072 if (empty) { 073 return false; 074 } 075 076 StringBuilder message = new StringBuilder(); 077 078 if (nextLine == null) { // first time only 079 nextLine = parseNextLine(); 080 if (nextLine == null) { // reader finished 081 empty = true; 082 return false; 083 } 084 } 085 lastTimestamp = parseTimestamp(nextLine); 086 String nextTimestamp = null; 087 while (nextTimestamp == null) { 088 message.append(nextLine).append("\n"); 089 nextLine = parseNextLine(); 090 if (nextLine != null) { 091 nextTimestamp = parseTimestamp(nextLine); // exit loop if we have a timestamp, continue if not 092 } 093 else { // reader finished 094 empty = true; 095 nextTimestamp = ""; // exit loop 096 } 097 } 098 099 lastMessage = message.toString(); 100 return true; 101 } 102 103 /** 104 * Returns the timestamp from the last message that was parsed. 105 * 106 * @return the timestamp from the last message that was parsed 107 */ 108 public String getLastTimestamp() { 109 return lastTimestamp; 110 } 111 112 /** 113 * Returns the message that was last parsed. 114 * 115 * @return the message that was last parsed 116 */ 117 public String getLastMessage() { 118 return lastMessage; 119 } 120 121 /** 122 * Closes the Reader. 123 * 124 * @throws IOException 125 */ 126 public void closeReader() throws IOException { 127 reader.close(); 128 } 129 130 /** 131 * Reads the next line from the Reader and checks if it matches the filter. It can also handle multi-line messages (i.e. 132 * exception stack traces). If it returns null, then there are no lines left in the Reader. 133 * 134 * @return The next line, or null 135 * @throws IOException 136 */ 137 protected String parseNextLine() throws IOException { 138 String line; 139 while ((line = reader.readLine()) != null) { 140 ArrayList<String> logParts = filter.splitLogMessage(line); 141 if (logParts != null) { 142 patternMatched = filter.matches(logParts); 143 } 144 if (patternMatched) { 145 if (filter.getLogLimit() != -1) { 146 if (logParts != null) { 147 if (count >= filter.getLogLimit()) { 148 return null; 149 } 150 count++; 151 } 152 } 153 if (logParts != null) { 154 if (filter.getEndDate() != null) { 155 //Ignore the milli second part 156 if (logParts.get(0).substring(0, 19).compareTo(filter.getFormattedEndDate()) > 0) 157 return null; 158 } 159 } 160 return line; 161 } 162 } 163 return line; 164 } 165 166 /** 167 * Parses the timestamp out of the passed in line. If there isn't one, it returns null. 168 * 169 * @param line The line to check 170 * @return the timestamp of the line, or null 171 */ 172 private String parseTimestamp(String line) { 173 String timestamp = null; 174 ArrayList<String> logParts = filter.splitLogMessage(line); 175 if (logParts != null) { 176 timestamp = logParts.get(0); 177 } 178 return timestamp; 179 } 180 181 /** 182 * Streams log messages to the passed in Writer. Flushes the log writing 183 * based on buffer len 184 * 185 * @param writer 186 * @param bufferLen maximum len of log buffer 187 * @param bytesWritten num bytes already written to writer 188 * @throws IOException 189 */ 190 public void processRemaining(Writer writer, int bufferLen, int bytesWritten) throws IOException { 191 while (increment()) { 192 writer.write(lastMessage); 193 bytesWritten += lastMessage.length(); 194 if (bytesWritten > bufferLen) { 195 writer.flush(); 196 bytesWritten = 0; 197 } 198 } 199 writer.flush(); 200 } 201 202 /** 203 * Streams log messages to the passed in Writer, with zero bytes already 204 * written 205 * 206 * @param writer 207 * @param bufferLen maximum len of log buffer 208 * @throws IOException 209 */ 210 public void processRemaining(Writer writer, int bufferLen) throws IOException { 211 processRemaining(writer, bufferLen, 0); 212 } 213 214 /** 215 * Streams log messages to the passed in Writer, with default buffer len 4K 216 * and zero bytes already written 217 * 218 * @param writer 219 * @throws IOException 220 */ 221 public void processRemaining(Writer writer) throws IOException { 222 processRemaining(writer, Services.get().get(XLogStreamingService.class).getBufferLen()); 223 } 224 225}