001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.util;
020
021import java.io.BufferedReader;
022import java.io.IOException;
023import java.io.Writer;
024import java.util.ArrayList;
025import org.apache.oozie.service.Services;
026import org.apache.oozie.service.XLogStreamingService;
027
028  /**
029 * Encapsulates the parsing and filtering of the log messages from a BufferedReader. It makes sure not to read in the entire log
030 * into memory at the same time; at most, it will have two messages (which can be multi-line in the case of exception stack traces).
031 * <p>
032 * To use this class: Calling {@link TimestampedMessageParser#increment()} will tell the parser to read the next message from the
033 * Reader. It will return true if there are more messages and false if not. Calling
034 * {@link TimestampedMessageParser#getLastMessage()} and {@link TimestampedMessageParser#getLastTimestamp()} will return the last
035 * message and timestamp, respectively, that were parsed when {@link TimestampedMessageParser#increment()} was called. Calling
036 * {@link TimestampedMessageParser#processRemaining(java.io.Writer)} will write the remaining log messages to the given Writer.
037 */
038public class TimestampedMessageParser {
039
040    static final String SYSTEM_LINE_SEPARATOR = System.getProperty("line.separator");
041    protected BufferedReader reader;
042    private String nextLine = null;
043    private String lastTimestamp = null;
044    private XLogFilter filter;
045    private boolean empty = false;
046    private String lastMessage = null;
047    private boolean patternMatched = false;
048    public int count = 0;
049
050    /**
051     * Creates a TimestampedMessageParser with the given BufferedReader and filter.
052     *
053     * @param reader The BufferedReader to get the log messages from
054     * @param filter The filter
055     */
056    public TimestampedMessageParser(BufferedReader reader, XLogFilter filter) {
057        this.reader = reader;
058        this.filter = filter;
059        if (filter == null) {
060            filter = new XLogFilter();
061        }
062        filter.constructPattern();
063    }
064
065
066    /**
067     * Causes the next message and timestamp to be parsed from the BufferedReader.
068     *
069     * @return true if there are more messages left; false if not
070     * @throws IOException If there was a problem reading from the Reader
071     */
072    public boolean increment() throws IOException {
073        if (empty) {
074            return false;
075        }
076
077        StringBuilder message = new StringBuilder();
078
079        if (nextLine == null) {     // first time only
080            nextLine = parseNextLine();
081            if (nextLine == null) { // reader finished
082                empty = true;
083                return false;
084            }
085        }
086        lastTimestamp = parseTimestamp(nextLine);
087        String nextTimestamp = null;
088        while (nextTimestamp == null) {
089            message.append(nextLine).append(SYSTEM_LINE_SEPARATOR);
090            nextLine = parseNextLine();
091            if (nextLine != null) {
092                nextTimestamp = parseTimestamp(nextLine);   // exit loop if we have a timestamp, continue if not
093            }
094            else {                                          // reader finished
095                empty = true;
096                nextTimestamp = "";                         // exit loop
097            }
098        }
099
100        lastMessage = message.toString();
101        return true;
102    }
103
104    /**
105     * Returns the timestamp from the last message that was parsed.
106     *
107     * @return the timestamp from the last message that was parsed
108     */
109    public String getLastTimestamp() {
110        return lastTimestamp;
111    }
112
113    /**
114     * Returns the message that was last parsed.
115     *
116     * @return the message that was last parsed
117     */
118    public String getLastMessage() {
119        return lastMessage;
120    }
121
122    /**
123     * Closes the Reader.
124     *
125     * @throws IOException
126     */
127    public void closeReader() throws IOException {
128        reader.close();
129    }
130
131    /**
132     * Reads the next line from the Reader and checks if it matches the filter.  It can also handle multi-line messages (i.e.
133     * exception stack traces).  If it returns null, then there are no lines left in the Reader.
134     *
135     * @return The next line, or null
136     * @throws IOException
137     */
138    protected String parseNextLine() throws IOException {
139        String line;
140        while ((line = reader.readLine()) != null) {
141            ArrayList<String> logParts = filter.splitLogMessage(line);
142            if (logParts != null) {
143                patternMatched = filter.matches(logParts);
144            }
145            if (patternMatched) {
146                if (filter.getLogLimit() != -1) {
147                    if (logParts != null) {
148                        if (count >= filter.getLogLimit()) {
149                            return null;
150                        }
151                        count++;
152                    }
153                }
154                if (logParts != null) {
155                    if (filter.getEndDate() != null) {
156                        //Ignore the milli second part
157                        if (logParts.get(0).substring(0, 19).compareTo(filter.getFormattedEndDate()) > 0)
158                            return null;
159                    }
160                }
161                return line;
162            }
163        }
164        return line;
165    }
166
167    /**
168     * Parses the timestamp out of the passed in line.  If there isn't one, it returns null.
169     *
170     * @param line The line to check
171     * @return the timestamp of the line, or null
172     */
173    private String parseTimestamp(String line) {
174        String timestamp = null;
175        ArrayList<String> logParts = filter.splitLogMessage(line);
176        if (logParts != null) {
177            timestamp = logParts.get(0);
178        }
179        return timestamp;
180    }
181
182    /**
183     * Streams log messages to the passed in Writer. Flushes the log writing
184     * based on buffer len
185     *
186     * @param writer
187     * @param bufferLen maximum len of log buffer
188     * @param bytesWritten num bytes already written to writer
189     * @throws IOException
190     */
191    public void processRemaining(Writer writer, int bufferLen, int bytesWritten) throws IOException {
192        while (increment()) {
193            writer.write(lastMessage);
194            bytesWritten += lastMessage.length();
195            if (bytesWritten > bufferLen) {
196                writer.flush();
197                bytesWritten = 0;
198            }
199        }
200        writer.flush();
201    }
202
203    /**
204     * Streams log messages to the passed in Writer, with zero bytes already
205     * written
206     *
207     * @param writer
208     * @param bufferLen maximum len of log buffer
209     * @throws IOException
210     */
211    public void processRemaining(Writer writer, int bufferLen) throws IOException {
212        processRemaining(writer, bufferLen, 0);
213    }
214
215    /**
216     * Streams log messages to the passed in Writer, with default buffer len 4K
217     * and zero bytes already written
218     *
219     * @param writer
220     * @throws IOException
221     */
222    public void processRemaining(Writer writer) throws IOException {
223        processRemaining(writer, Services.get().get(XLogStreamingService.class).getBufferLen());
224    }
225
226}