001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.util; 019 020import java.io.BufferedReader; 021import java.io.IOException; 022import java.io.Writer; 023import java.text.ParseException; 024import java.text.SimpleDateFormat; 025import java.util.ArrayList; 026 027import org.apache.commons.lang.StringUtils; 028import org.apache.oozie.service.Services; 029import org.apache.oozie.service.XLogStreamingService; 030 031 /** 032 * Encapsulates the parsing and filtering of the log messages from a BufferedReader. It makes sure not to read in the entire log 033 * into memory at the same time; at most, it will have two messages (which can be multi-line in the case of exception stack traces). 034 * <p> 035 * To use this class: Calling {@link TimestampedMessageParser#increment()} will tell the parser to read the next message from the 036 * Reader. It will return true if there are more messages and false if not. Calling 037 * {@link TimestampedMessageParser#getLastMessage()} and {@link TimestampedMessageParser#getLastTimestamp()} will return the last 038 * message and timestamp, respectively, that were parsed when {@link TimestampedMessageParser#increment()} was called. Calling 039 * {@link TimestampedMessageParser#processRemaining(java.io.Writer)} will write the remaining log messages to the given Writer. 040 */ 041public class TimestampedMessageParser { 042 043 protected BufferedReader reader; 044 private String nextLine = null; 045 private String lastTimestamp = null; 046 private XLogFilter filter; 047 private boolean empty = false; 048 private String lastMessage = null; 049 private boolean patternMatched = false; 050 public int count = 0; 051 052 /** 053 * Creates a TimestampedMessageParser with the given BufferedReader and filter. 054 * 055 * @param reader The BufferedReader to get the log messages from 056 * @param filter The filter 057 */ 058 public TimestampedMessageParser(BufferedReader reader, XLogFilter filter) { 059 this.reader = reader; 060 this.filter = filter; 061 if (filter == null) { 062 filter = new XLogFilter(); 063 } 064 filter.constructPattern(); 065 } 066 067 /** 068 * Causes the next message and timestamp to be parsed from the BufferedReader. 069 * 070 * @return true if there are more messages left; false if not 071 * @throws IOException If there was a problem reading from the Reader 072 */ 073 public boolean increment() throws IOException { 074 if (empty) { 075 return false; 076 } 077 078 StringBuilder message = new StringBuilder(); 079 080 if (nextLine == null) { // first time only 081 nextLine = parseNextLine(); 082 if (nextLine == null) { // reader finished 083 empty = true; 084 return false; 085 } 086 } 087 lastTimestamp = parseTimestamp(nextLine); 088 String nextTimestamp = null; 089 while (nextTimestamp == null) { 090 message.append(nextLine).append("\n"); 091 nextLine = parseNextLine(); 092 if (nextLine != null) { 093 nextTimestamp = parseTimestamp(nextLine); // exit loop if we have a timestamp, continue if not 094 } 095 else { // reader finished 096 empty = true; 097 nextTimestamp = ""; // exit loop 098 } 099 } 100 101 lastMessage = message.toString(); 102 return true; 103 } 104 105 /** 106 * Returns the timestamp from the last message that was parsed. 107 * 108 * @return the timestamp from the last message that was parsed 109 */ 110 public String getLastTimestamp() { 111 return lastTimestamp; 112 } 113 114 /** 115 * Returns the message that was last parsed. 116 * 117 * @return the message that was last parsed 118 */ 119 public String getLastMessage() { 120 return lastMessage; 121 } 122 123 /** 124 * Closes the Reader. 125 * 126 * @throws IOException 127 */ 128 public void closeReader() throws IOException { 129 reader.close(); 130 } 131 132 /** 133 * Reads the next line from the Reader and checks if it matches the filter. It can also handle multi-line messages (i.e. 134 * exception stack traces). If it returns null, then there are no lines left in the Reader. 135 * 136 * @return The next line, or null 137 * @throws IOException 138 */ 139 protected String parseNextLine() throws IOException { 140 String line; 141 while ((line = reader.readLine()) != null) { 142 ArrayList<String> logParts = filter.splitLogMessage(line); 143 if (logParts != null) { 144 patternMatched = filter.matches(logParts); 145 } 146 if (patternMatched) { 147 if (filter.getLogLimit() != -1) { 148 if (logParts != null) { 149 if (count >= filter.getLogLimit()) { 150 return null; 151 } 152 count++; 153 } 154 } 155 if (logParts != null) { 156 if (filter.getEndDate() != null) { 157 //Ignore the milli second part 158 if (logParts.get(0).substring(0, 19).compareTo(filter.getFormattedEndDate()) > 0) 159 return null; 160 } 161 } 162 return line; 163 } 164 } 165 return line; 166 } 167 168 /** 169 * Parses the timestamp out of the passed in line. If there isn't one, it returns null. 170 * 171 * @param line The line to check 172 * @return the timestamp of the line, or null 173 */ 174 private String parseTimestamp(String line) { 175 String timestamp = null; 176 ArrayList<String> logParts = filter.splitLogMessage(line); 177 if (logParts != null) { 178 timestamp = logParts.get(0); 179 } 180 return timestamp; 181 } 182 183 /** 184 * Streams log messages to the passed in Writer. Flushes the log writing 185 * based on buffer len 186 * 187 * @param writer 188 * @param bufferLen maximum len of log buffer 189 * @param bytesWritten num bytes already written to writer 190 * @throws IOException 191 */ 192 public void processRemaining(Writer writer, int bufferLen, int bytesWritten) throws IOException { 193 while (increment()) { 194 writer.write(lastMessage); 195 bytesWritten += lastMessage.length(); 196 if (bytesWritten > bufferLen) { 197 writer.flush(); 198 bytesWritten = 0; 199 } 200 } 201 writer.flush(); 202 } 203 204 /** 205 * Streams log messages to the passed in Writer, with zero bytes already 206 * written 207 * 208 * @param writer 209 * @param bufferLen maximum len of log buffer 210 * @throws IOException 211 */ 212 public void processRemaining(Writer writer, int bufferLen) throws IOException { 213 processRemaining(writer, bufferLen, 0); 214 } 215 216 /** 217 * Streams log messages to the passed in Writer, with default buffer len 4K 218 * and zero bytes already written 219 * 220 * @param writer 221 * @throws IOException 222 */ 223 public void processRemaining(Writer writer) throws IOException { 224 processRemaining(writer, Services.get().get(XLogStreamingService.class).getBufferLen()); 225 } 226 227}