001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.util;
019
020import java.io.BufferedReader;
021import java.io.IOException;
022import java.io.Writer;
023import java.text.ParseException;
024import java.text.SimpleDateFormat;
025import java.util.ArrayList;
026
027import org.apache.commons.lang.StringUtils;
028import org.apache.oozie.service.Services;
029import org.apache.oozie.service.XLogStreamingService;
030
031  /**
032 * Encapsulates the parsing and filtering of the log messages from a BufferedReader. It makes sure not to read in the entire log
033 * into memory at the same time; at most, it will have two messages (which can be multi-line in the case of exception stack traces).
034 * <p>
035 * To use this class: Calling {@link TimestampedMessageParser#increment()} will tell the parser to read the next message from the
036 * Reader. It will return true if there are more messages and false if not. Calling
037 * {@link TimestampedMessageParser#getLastMessage()} and {@link TimestampedMessageParser#getLastTimestamp()} will return the last
038 * message and timestamp, respectively, that were parsed when {@link TimestampedMessageParser#increment()} was called. Calling
039 * {@link TimestampedMessageParser#processRemaining(java.io.Writer)} will write the remaining log messages to the given Writer.
040 */
041public class TimestampedMessageParser {
042
043    protected BufferedReader reader;
044    private String nextLine = null;
045    private String lastTimestamp = null;
046    private XLogFilter filter;
047    private boolean empty = false;
048    private String lastMessage = null;
049    private boolean patternMatched = false;
050    public int count = 0;
051
052    /**
053     * Creates a TimestampedMessageParser with the given BufferedReader and filter.
054     *
055     * @param reader The BufferedReader to get the log messages from
056     * @param filter The filter
057     */
058    public TimestampedMessageParser(BufferedReader reader, XLogFilter filter) {
059        this.reader = reader;
060        this.filter = filter;
061        if (filter == null) {
062            filter = new XLogFilter();
063        }
064        filter.constructPattern();
065    }
066
067    /**
068     * Causes the next message and timestamp to be parsed from the BufferedReader.
069     *
070     * @return true if there are more messages left; false if not
071     * @throws IOException If there was a problem reading from the Reader
072     */
073    public boolean increment() throws IOException {
074        if (empty) {
075            return false;
076        }
077
078        StringBuilder message = new StringBuilder();
079
080        if (nextLine == null) {     // first time only
081            nextLine = parseNextLine();
082            if (nextLine == null) { // reader finished
083                empty = true;
084                return false;
085            }
086        }
087        lastTimestamp = parseTimestamp(nextLine);
088        String nextTimestamp = null;
089        while (nextTimestamp == null) {
090            message.append(nextLine).append("\n");
091            nextLine = parseNextLine();
092            if (nextLine != null) {
093                nextTimestamp = parseTimestamp(nextLine);   // exit loop if we have a timestamp, continue if not
094            }
095            else {                                          // reader finished
096                empty = true;
097                nextTimestamp = "";                         // exit loop
098            }
099        }
100
101        lastMessage = message.toString();
102        return true;
103    }
104
105    /**
106     * Returns the timestamp from the last message that was parsed.
107     *
108     * @return the timestamp from the last message that was parsed
109     */
110    public String getLastTimestamp() {
111        return lastTimestamp;
112    }
113
114    /**
115     * Returns the message that was last parsed.
116     *
117     * @return the message that was last parsed
118     */
119    public String getLastMessage() {
120        return lastMessage;
121    }
122
123    /**
124     * Closes the Reader.
125     *
126     * @throws IOException
127     */
128    public void closeReader() throws IOException {
129        reader.close();
130    }
131
132    /**
133     * Reads the next line from the Reader and checks if it matches the filter.  It can also handle multi-line messages (i.e.
134     * exception stack traces).  If it returns null, then there are no lines left in the Reader.
135     *
136     * @return The next line, or null
137     * @throws IOException
138     */
139    protected String parseNextLine() throws IOException {
140        String line;
141        while ((line = reader.readLine()) != null) {
142            ArrayList<String> logParts = filter.splitLogMessage(line);
143            if (logParts != null) {
144                patternMatched = filter.matches(logParts);
145            }
146            if (patternMatched) {
147                if (filter.getLogLimit() != -1) {
148                    if (logParts != null) {
149                        if (count >= filter.getLogLimit()) {
150                            return null;
151                        }
152                        count++;
153                    }
154                }
155                if (logParts != null) {
156                    if (filter.getEndDate() != null) {
157                        //Ignore the milli second part
158                        if (logParts.get(0).substring(0, 19).compareTo(filter.getFormattedEndDate()) > 0)
159                            return null;
160                    }
161                }
162                return line;
163            }
164        }
165        return line;
166    }
167
168    /**
169     * Parses the timestamp out of the passed in line.  If there isn't one, it returns null.
170     *
171     * @param line The line to check
172     * @return the timestamp of the line, or null
173     */
174    private String parseTimestamp(String line) {
175        String timestamp = null;
176        ArrayList<String> logParts = filter.splitLogMessage(line);
177        if (logParts != null) {
178            timestamp = logParts.get(0);
179        }
180        return timestamp;
181    }
182
183    /**
184     * Streams log messages to the passed in Writer. Flushes the log writing
185     * based on buffer len
186     *
187     * @param writer
188     * @param bufferLen maximum len of log buffer
189     * @param bytesWritten num bytes already written to writer
190     * @throws IOException
191     */
192    public void processRemaining(Writer writer, int bufferLen, int bytesWritten) throws IOException {
193        while (increment()) {
194            writer.write(lastMessage);
195            bytesWritten += lastMessage.length();
196            if (bytesWritten > bufferLen) {
197                writer.flush();
198                bytesWritten = 0;
199            }
200        }
201        writer.flush();
202    }
203
204    /**
205     * Streams log messages to the passed in Writer, with zero bytes already
206     * written
207     *
208     * @param writer
209     * @param bufferLen maximum len of log buffer
210     * @throws IOException
211     */
212    public void processRemaining(Writer writer, int bufferLen) throws IOException {
213        processRemaining(writer, bufferLen, 0);
214    }
215
216    /**
217     * Streams log messages to the passed in Writer, with default buffer len 4K
218     * and zero bytes already written
219     *
220     * @param writer
221     * @throws IOException
222     */
223    public void processRemaining(Writer writer) throws IOException {
224        processRemaining(writer, Services.get().get(XLogStreamingService.class).getBufferLen());
225    }
226
227}