This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.util;
020
021import java.io.BufferedReader;
022import java.io.IOException;
023import java.io.Writer;
024import java.util.ArrayList;
025import org.apache.oozie.service.Services;
026import org.apache.oozie.service.XLogStreamingService;
027
028  /**
029 * Encapsulates the parsing and filtering of the log messages from a BufferedReader. It makes sure not to read in the entire log
030 * into memory at the same time; at most, it will have two messages (which can be multi-line in the case of exception stack traces).
031 * <p>
032 * To use this class: Calling {@link TimestampedMessageParser#increment()} will tell the parser to read the next message from the
033 * Reader. It will return true if there are more messages and false if not. Calling
034 * {@link TimestampedMessageParser#getLastMessage()} and {@link TimestampedMessageParser#getLastTimestamp()} will return the last
035 * message and timestamp, respectively, that were parsed when {@link TimestampedMessageParser#increment()} was called. Calling
036 * {@link TimestampedMessageParser#processRemaining(java.io.Writer)} will write the remaining log messages to the given Writer.
037 */
038public class TimestampedMessageParser {
039
040    protected BufferedReader reader;
041    private String nextLine = null;
042    private String lastTimestamp = null;
043    private XLogFilter filter;
044    private boolean empty = false;
045    private String lastMessage = null;
046    private boolean patternMatched = false;
047    public int count = 0;
048
049    /**
050     * Creates a TimestampedMessageParser with the given BufferedReader and filter.
051     *
052     * @param reader The BufferedReader to get the log messages from
053     * @param filter The filter
054     */
055    public TimestampedMessageParser(BufferedReader reader, XLogFilter filter) {
056        this.reader = reader;
057        this.filter = filter;
058        if (filter == null) {
059            filter = new XLogFilter();
060        }
061        filter.constructPattern();
062    }
063
064
065    /**
066     * Causes the next message and timestamp to be parsed from the BufferedReader.
067     *
068     * @return true if there are more messages left; false if not
069     * @throws IOException If there was a problem reading from the Reader
070     */
071    public boolean increment() throws IOException {
072        if (empty) {
073            return false;
074        }
075
076        StringBuilder message = new StringBuilder();
077
078        if (nextLine == null) {     // first time only
079            nextLine = parseNextLine();
080            if (nextLine == null) { // reader finished
081                empty = true;
082                return false;
083            }
084        }
085        lastTimestamp = parseTimestamp(nextLine);
086        String nextTimestamp = null;
087        while (nextTimestamp == null) {
088            message.append(nextLine).append("\n");
089            nextLine = parseNextLine();
090            if (nextLine != null) {
091                nextTimestamp = parseTimestamp(nextLine);   // exit loop if we have a timestamp, continue if not
092            }
093            else {                                          // reader finished
094                empty = true;
095                nextTimestamp = "";                         // exit loop
096            }
097        }
098
099        lastMessage = message.toString();
100        return true;
101    }
102
103    /**
104     * Returns the timestamp from the last message that was parsed.
105     *
106     * @return the timestamp from the last message that was parsed
107     */
108    public String getLastTimestamp() {
109        return lastTimestamp;
110    }
111
112    /**
113     * Returns the message that was last parsed.
114     *
115     * @return the message that was last parsed
116     */
117    public String getLastMessage() {
118        return lastMessage;
119    }
120
121    /**
122     * Closes the Reader.
123     *
124     * @throws IOException
125     */
126    public void closeReader() throws IOException {
127        reader.close();
128    }
129
130    /**
131     * Reads the next line from the Reader and checks if it matches the filter.  It can also handle multi-line messages (i.e.
132     * exception stack traces).  If it returns null, then there are no lines left in the Reader.
133     *
134     * @return The next line, or null
135     * @throws IOException
136     */
137    protected String parseNextLine() throws IOException {
138        String line;
139        while ((line = reader.readLine()) != null) {
140            ArrayList<String> logParts = filter.splitLogMessage(line);
141            if (logParts != null) {
142                patternMatched = filter.matches(logParts);
143            }
144            if (patternMatched) {
145                if (filter.getLogLimit() != -1) {
146                    if (logParts != null) {
147                        if (count >= filter.getLogLimit()) {
148                            return null;
149                        }
150                        count++;
151                    }
152                }
153                if (logParts != null) {
154                    if (filter.getEndDate() != null) {
155                        //Ignore the milli second part
156                        if (logParts.get(0).substring(0, 19).compareTo(filter.getFormattedEndDate()) > 0)
157                            return null;
158                    }
159                }
160                return line;
161            }
162        }
163        return line;
164    }
165
166    /**
167     * Parses the timestamp out of the passed in line.  If there isn't one, it returns null.
168     *
169     * @param line The line to check
170     * @return the timestamp of the line, or null
171     */
172    private String parseTimestamp(String line) {
173        String timestamp = null;
174        ArrayList<String> logParts = filter.splitLogMessage(line);
175        if (logParts != null) {
176            timestamp = logParts.get(0);
177        }
178        return timestamp;
179    }
180
181    /**
182     * Streams log messages to the passed in Writer. Flushes the log writing
183     * based on buffer len
184     *
185     * @param writer
186     * @param bufferLen maximum len of log buffer
187     * @param bytesWritten num bytes already written to writer
188     * @throws IOException
189     */
190    public void processRemaining(Writer writer, int bufferLen, int bytesWritten) throws IOException {
191        while (increment()) {
192            writer.write(lastMessage);
193            bytesWritten += lastMessage.length();
194            if (bytesWritten > bufferLen) {
195                writer.flush();
196                bytesWritten = 0;
197            }
198        }
199        writer.flush();
200    }
201
202    /**
203     * Streams log messages to the passed in Writer, with zero bytes already
204     * written
205     *
206     * @param writer
207     * @param bufferLen maximum len of log buffer
208     * @throws IOException
209     */
210    public void processRemaining(Writer writer, int bufferLen) throws IOException {
211        processRemaining(writer, bufferLen, 0);
212    }
213
214    /**
215     * Streams log messages to the passed in Writer, with default buffer len 4K
216     * and zero bytes already written
217     *
218     * @param writer
219     * @throws IOException
220     */
221    public void processRemaining(Writer writer) throws IOException {
222        processRemaining(writer, Services.get().get(XLogStreamingService.class).getBufferLen());
223    }
224
225}