001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.util;
020
021import java.io.BufferedReader;
022import java.io.IOException;
023import java.io.Writer;
024import java.util.ArrayList;
025import java.util.regex.Pattern;
026
027import org.apache.oozie.service.Services;
028import org.apache.oozie.service.XLogStreamingService;
029import org.apache.oozie.util.LogLine.MATCHED_PATTERN;
030
031  /**
032 * Encapsulates the parsing and filtering of the log messages from a BufferedReader. It makes sure not to read in the entire log
033 * into memory at the same time; at most, it will have two messages (which can be multi-line in the case of exception stack traces).
034 * <p>
035 * To use this class: Calling {@link TimestampedMessageParser#increment()} will tell the parser to read the next message from the
036 * Reader. It will return true if there are more messages and false if not. Calling
037 * {@link TimestampedMessageParser#getLastMessage()} and {@link TimestampedMessageParser#getLastTimestamp()} will return the last
038 * message and timestamp, respectively, that were parsed when {@link TimestampedMessageParser#increment()} was called. Calling
039 * {@link TimestampedMessageParser#processRemaining(java.io.Writer,org.apache.oozie.util.XLogStreamer)} will write the
040 * remaining log messages to the given Writer.
041 */
042public class TimestampedMessageParser {
043
044    static final String SYSTEM_LINE_SEPARATOR = System.getProperty("line.separator");
045    protected BufferedReader reader;
046    private LogLine nextLine = null;
047    private String lastTimestamp = null;
048    private XLogFilter filter;
049    private boolean empty = false;
050    private String lastMessage = null;
051    private boolean patternMatched = false;
052    public int count = 0;
053    private Pattern splitPattern = null;
054
055    /**
056     * Creates a TimestampedMessageParser with the given BufferedReader and filter.
057     *
058     * @param reader The BufferedReader to get the log messages from
059     * @param filter The filter
060     */
061    public TimestampedMessageParser(BufferedReader reader, XLogFilter filter) {
062        this.reader = reader;
063        this.filter = filter;
064        if (filter == null) {
065            filter = new XLogFilter();
066        }
067        filter.constructPattern();
068        String regEx = XLogFilter.PREFIX_REGEX + filter.getFilterPattern().pattern();
069        this.splitPattern = Pattern.compile(regEx);
070    }
071
072
073    /**
074     * Causes the next message and timestamp to be parsed from the BufferedReader.
075     *
076     * @return true if there are more messages left; false if not
077     * @throws IOException If there was a problem reading from the Reader
078     */
079    public boolean increment() throws IOException {
080        if (empty) {
081            return false;
082        }
083
084        StringBuilder message = new StringBuilder();
085        if (nextLine == null) {     // first time only
086            nextLine = parseNextLogLine();
087            if (nextLine == null || nextLine.getLine() == null) {
088                // reader finished
089                empty = true;
090                return false;
091            }
092        }
093        lastTimestamp = parseTimestamp(nextLine);
094        String nextTimestamp = null;
095        while (nextTimestamp == null) {
096            message.append(nextLine.getLine()).append(SYSTEM_LINE_SEPARATOR);
097            nextLine = parseNextLogLine();
098            if (nextLine != null && nextLine.getLine() != null) {
099                // exit loop if we have a timestamp, continue if not
100                nextTimestamp = parseTimestamp(nextLine);
101            }
102            else {                                          // reader finished
103                empty = true;
104                nextTimestamp = "";                         // exit loop
105            }
106        }
107
108        lastMessage = message.toString();
109        return true;
110    }
111
112    /**
113     * Returns the timestamp from the last message that was parsed.
114     *
115     * @return the timestamp from the last message that was parsed
116     */
117    public String getLastTimestamp() {
118        return lastTimestamp;
119    }
120
121    /**
122     * Returns the message that was last parsed.
123     *
124     * @return the message that was last parsed
125     */
126    public String getLastMessage() {
127        return lastMessage;
128    }
129
130    /**
131     * Closes the Reader.
132     *
133     * @throws IOException if the reader can't be closed
134     */
135    public void closeReader() throws IOException {
136        reader.close();
137    }
138
139    /**
140     * Reads the next line from the Reader and checks if it matches the filter.
141     * It can also handle multi-line messages (i.e. exception stack traces). If
142     * it returns null, then there are no lines left in the Reader.
143     *
144     * @return LogLine
145     * @throws IOException in case of an error in the Reader
146     */
147    protected LogLine parseNextLogLine() throws IOException {
148        String line;
149        LogLine logLine = new LogLine();
150        while ((line = reader.readLine()) != null) {
151            logLine.setLine(line);
152            logLine.setLogParts(null);
153            filter.splitLogMessage(logLine, splitPattern);
154            // check the splits if logLine matches with the splitPattern
155            // Otherwise, go with previous patternMatched value. This is needed
156            // in parsing stack trace
157            patternMatched = logLine.getMatchedPattern() == MATCHED_PATTERN.NONE ? patternMatched
158                    : filter.splitsMatches(logLine);
159            if (patternMatched) {
160                if (filter.getLogLimit() != -1) {
161                    if (logLine.getLogParts() != null) {
162                        if (count >= filter.getLogLimit()) {
163                            return null;
164                        }
165                        count++;
166                    }
167                }
168                if (logLine.getLogParts() != null) {
169                    if (filter.getEndDate() != null) {
170                        // Ignore the milli second part
171                        if (logLine.getLogParts().get(0).substring(0, 19).compareTo(filter.getFormattedEndDate()) > 0)
172                            return null;
173                    }
174                }
175                return logLine;
176            }
177        }
178        logLine.setLine(null);
179        return logLine;
180    }
181
182    /**
183     * Parses the timestamp out of the passed in line. If there isn't one, it
184     * returns null.
185     *
186     * @param logLine The LogLine to check
187     * @return the timestamp of the line, or null
188     */
189    private String parseTimestamp(LogLine logLine) {
190        String timestamp = null;
191        if (logLine != null && logLine.getLogParts() != null && logLine.getLogParts().size() > 0) {
192            timestamp = logLine.getLogParts().get(0);
193        }
194        return timestamp;
195    }
196
197    /**
198     * Streams log messages to the passed in Writer, with zero bytes already
199     * written
200     *
201     * @param writer the target writer
202     * @param logStreamer the log streamer
203     * @throws IOException in case of IO error
204     */
205    public void processRemaining(Writer writer, XLogStreamer logStreamer) throws IOException {
206        while (increment()) {
207            writer.write(lastMessage);
208            if (logStreamer.shouldFlushOutput(lastMessage.length())) {
209                writer.flush();
210            }
211        }
212        writer.flush();
213    }
214
215    /**
216     * Splits the log message into parts
217     *
218     * @param line the line to split
219     * @return List of log parts
220     */
221    protected ArrayList<String> splitLogMessage(String line) {
222        return filter.splitLogMessage(line);
223    }
224
225
226}