001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.util;
019    
020    import java.io.File;
021    import java.io.FileInputStream;
022    import java.io.IOException;
023    import java.io.InputStream;
024    import java.io.Writer;
025    import java.util.ArrayList;
026    import java.util.Calendar;
027    import java.util.Collections;
028    import java.util.Date;
029    import java.util.HashMap;
030    import java.util.List;
031    import java.util.Map;
032    import java.util.regex.Matcher;
033    import java.util.regex.Pattern;
034    import java.util.zip.GZIPInputStream;
035    
036    import com.google.common.annotations.VisibleForTesting;
037    
038    /**
039     * XLogStreamer streams the given log file to logWriter after applying the given filter.
040     */
041    public class XLogStreamer {
042        private static XLog LOG = XLog.getLog(XLogStreamer.class);
043    
044        /**
045         * Filter that will construct the regular expression that will be used to filter the log statement. And also checks
046         * if the given log message go through the filter. Filters that can be used are logLevel(Multi values separated by
047         * "|") jobId appName actionId token
048         */
049        public static class Filter {
050            private Map<String, Integer> logLevels;
051            private final Map<String, String> filterParams;
052            private static List<String> parameters = new ArrayList<String>();
053            private boolean noFilter;
054            private Pattern filterPattern;
055    
056            // TODO Patterns to be read from config file
057            private static final String DEFAULT_REGEX = "[^\\]]*";
058    
059            public static final String ALLOW_ALL_REGEX = "(.*)";
060            private static final String TIMESTAMP_REGEX = "(\\d\\d\\d\\d-\\d\\d-\\d\\d \\d\\d:\\d\\d:\\d\\d,\\d\\d\\d)";
061            private static final String WHITE_SPACE_REGEX = "\\s+";
062            private static final String LOG_LEVEL_REGEX = "(\\w+)";
063            private static final String PREFIX_REGEX = TIMESTAMP_REGEX + WHITE_SPACE_REGEX + LOG_LEVEL_REGEX
064                    + WHITE_SPACE_REGEX;
065            private static final Pattern SPLITTER_PATTERN = Pattern.compile(PREFIX_REGEX + ALLOW_ALL_REGEX);
066    
067            public Filter() {
068                filterParams = new HashMap<String, String>();
069                for (int i = 0; i < parameters.size(); i++) {
070                    filterParams.put(parameters.get(i), DEFAULT_REGEX);
071                }
072                logLevels = null;
073                noFilter = true;
074                filterPattern = null;
075            }
076    
077            public void setLogLevel(String logLevel) {
078                if (logLevel != null && logLevel.trim().length() > 0) {
079                    this.logLevels = new HashMap<String, Integer>();
080                    String[] levels = logLevel.split("\\|");
081                    for (int i = 0; i < levels.length; i++) {
082                        String s = levels[i].trim().toUpperCase();
083                        try {
084                            XLog.Level.valueOf(s);
085                        }
086                        catch (Exception ex) {
087                            continue;
088                        }
089                        this.logLevels.put(levels[i].toUpperCase(), 1);
090                    }
091                }
092            }
093    
094            public void setParameter(String filterParam, String value) {
095                if (filterParams.containsKey(filterParam)) {
096                    noFilter = false;
097                    filterParams.put(filterParam, value);
098                }
099            }
100    
101            public static void defineParameter(String filterParam) {
102                parameters.add(filterParam);
103            }
104    
105            public boolean isFilterPresent() {
106                if (noFilter && logLevels == null) {
107                    return false;
108                }
109                return true;
110            }
111    
112            /**
113             * Checks if the logLevel and logMessage goes through the logFilter.
114             *
115             * @param logParts
116             * @return
117             */
118            public boolean matches(ArrayList<String> logParts) {
119                String logLevel = logParts.get(0);
120                String logMessage = logParts.get(1);
121                if (this.logLevels == null || this.logLevels.containsKey(logLevel.toUpperCase())) {
122                    Matcher logMatcher = filterPattern.matcher(logMessage);
123                    return logMatcher.matches();
124                }
125                else {
126                    return false;
127                }
128            }
129    
130            /**
131             * Splits the log line into timestamp, logLevel and remaining log message. Returns array containing logLevel and
132             * logMessage if the pattern matches i.e A new log statement, else returns null.
133             *
134             * @param logLine
135             * @return Array containing log level and log message
136             */
137            public ArrayList<String> splitLogMessage(String logLine) {
138                Matcher splitter = SPLITTER_PATTERN.matcher(logLine);
139                if (splitter.matches()) {
140                    ArrayList<String> logParts = new ArrayList<String>();
141                    logParts.add(splitter.group(2));// log level
142                    logParts.add(splitter.group(3));// Log Message
143                    return logParts;
144                }
145                else {
146                    return null;
147                }
148            }
149    
150            /**
151             * Constructs the regular expression according to the filter and assigns it to fileterPattarn. ".*" will be
152             * assigned if no filters are set.
153             */
154            public void constructPattern() {
155                if (noFilter && logLevels == null) {
156                    filterPattern = Pattern.compile(ALLOW_ALL_REGEX);
157                    return;
158                }
159                StringBuilder sb = new StringBuilder();
160                if (noFilter) {
161                    sb.append("(.*)");
162                }
163                else {
164                    sb.append("(.* ");
165                    for (int i = 0; i < parameters.size(); i++) {
166                        sb.append(parameters.get(i) + "\\[");
167                        sb.append(filterParams.get(parameters.get(i)) + "\\] ");
168                    }
169                    sb.append(".*)");
170                }
171                filterPattern = Pattern.compile(sb.toString());
172            }
173    
174            public static void reset() {
175                parameters.clear();
176            }
177    
178            @VisibleForTesting
179            public final Map<String, String> getFilterParams() {
180              return filterParams;
181            }
182        }
183    
184        private String logFile;
185        private String logPath;
186        private Filter logFilter;
187        private Writer logWriter;
188        private long logRotation;
189    
190        public XLogStreamer(Filter logFilter, Writer logWriter, String logPath, String logFile, long logRotationSecs) {
191            this.logWriter = logWriter;
192            this.logFilter = logFilter;
193            if (logFile == null) {
194                logFile = "oozie-app.log";
195            }
196            this.logFile = logFile;
197            this.logPath = logPath;
198            this.logRotation = logRotationSecs * 1000l;
199        }
200    
201        /**
202         * Gets the files that are modified between startTime and endTime in the given logPath and streams the log after
203         * applying the filters.
204         *
205         * @param startTime
206         * @param endTime
207         * @throws IOException
208         */
209        public void streamLog(Date startTime, Date endTime) throws IOException {
210            long startTimeMillis = 0;
211            long endTimeMillis;
212            if (startTime != null) {
213                startTimeMillis = startTime.getTime();
214            }
215            if (endTime == null) {
216                endTimeMillis = System.currentTimeMillis();
217            }
218            else {
219                endTimeMillis = endTime.getTime();
220            }
221            File dir = new File(logPath);
222            ArrayList<FileInfo> fileList = getFileList(dir, startTimeMillis, endTimeMillis, logRotation, logFile);
223            File file;
224            String fileName;
225            XLogReader logReader;
226            for (int i = 0; i < fileList.size(); i++) {
227                fileName = fileList.get(i).getFileName();
228                if (fileName.endsWith(".gz")) {
229                    file = new File(fileName);
230                    GZIPInputStream gzipInputStream = null;
231                    gzipInputStream = new GZIPInputStream(new FileInputStream(file));
232                    logReader = new XLogReader(gzipInputStream, logFilter, logWriter);
233                    logReader.processLog();
234                    logReader.close();
235                    continue;
236                }
237                InputStream ifs;
238                ifs = new FileInputStream(fileName);
239                logReader = new XLogReader(ifs, logFilter, logWriter);
240                logReader.processLog();
241                ifs.close();
242            }
243        }
244    
245        /**
246         * File name along with the modified time which will be used to sort later.
247         */
248        class FileInfo implements Comparable<FileInfo> {
249            String fileName;
250            long modTime;
251    
252            public FileInfo(String fileName, long modTime) {
253                this.fileName = fileName;
254                this.modTime = modTime;
255            }
256    
257            public String getFileName() {
258                return fileName;
259            }
260    
261            public long getModTime() {
262                return modTime;
263            }
264    
265            public int compareTo(FileInfo fileInfo) {
266                long diff = this.modTime - fileInfo.modTime;
267                if (diff > 0) {
268                    return 1;
269                }
270                else if (diff < 0) {
271                    return -1;
272                }
273                else {
274                    return 0;
275                }
276            }
277        }
278    
279        /**
280         * Gets the file list that will have the logs between startTime and endTime.
281         *
282         * @param dir
283         * @param startTime
284         * @param endTime
285         * @param logRotationTime
286         * @param logFile
287         * @return List of files to be streamed
288         */
289        private ArrayList<FileInfo> getFileList(File dir, long startTime, long endTime, long logRotationTime, String logFile) {
290            String[] children = dir.list();
291            ArrayList<FileInfo> fileList = new ArrayList<FileInfo>();
292            if (children == null) {
293                return fileList;
294            }
295            else {
296                for (int i = 0; i < children.length; i++) {
297                    String fileName = children[i];
298                    if (!fileName.startsWith(logFile) && !fileName.equals(logFile)) {
299                        continue;
300                    }
301                    File file = new File(dir.getAbsolutePath(), fileName);
302                    if (fileName.endsWith(".gz")) {
303                        long gzFileCreationTime = getGZFileCreationTime(fileName, startTime, endTime);
304                        if (gzFileCreationTime != -1) {
305                            fileList.add(new FileInfo(file.getAbsolutePath(), gzFileCreationTime));
306                        }
307                        continue;
308                    }
309                    long modTime = file.lastModified();
310                    if (modTime < startTime) {
311                        continue;
312                    }
313                    if (modTime / logRotationTime > (endTime / logRotationTime + 1)) {
314                        continue;
315                    }
316                    fileList.add(new FileInfo(file.getAbsolutePath(), modTime));
317                }
318            }
319            Collections.sort(fileList);
320            return fileList;
321        }
322    
323        /**
324         * This pattern matches the end of a gzip filename to have a format like "-YYYY-MM-dd-HH.gz" with capturing groups for each part
325         * of the date
326         */
327        public static final Pattern gzTimePattern = Pattern.compile(".*-(\\d\\d\\d\\d)-(\\d\\d)-(\\d\\d)-(\\d\\d)\\.gz");
328    
329        /**
330         * Returns the creation time of the .gz archive if it is relevant to the job
331         *
332         * @param fileName
333         * @param startTime
334         * @param endTime
335         * @return Modification time of .gz file after checking if it is relevant to the job
336         */
337        private long getGZFileCreationTime(String fileName, long startTime, long endTime) {
338            // Default return value of -1 to exclude the file
339            long returnVal = -1;
340    
341            // Include oozie.log as oozie.log.gz if it is accidentally GZipped
342            if (fileName.equals("oozie.log.gz")) {
343                LOG.warn("oozie.log has been GZipped, which is unexpected");
344                // Return a value other than -1 to include the file in list
345                returnVal = 0;
346            } else {
347                Matcher m = gzTimePattern.matcher(fileName);
348                if (m.matches() && m.groupCount() == 4) {
349                    int year = Integer.parseInt(m.group(1));
350                    int month = Integer.parseInt(m.group(2));
351                    int day = Integer.parseInt(m.group(3));
352                    int hour = Integer.parseInt(m.group(4));
353                    int minute = 0;
354                    Calendar calendarEntry = Calendar.getInstance();
355                    calendarEntry.set(year, month - 1, day, hour, minute); // give month-1(Say, 7 for August)
356                    long logFileStartTime = calendarEntry.getTimeInMillis();
357                    long milliSecondsPerHour = 3600000;
358                    long logFileEndTime = logFileStartTime + milliSecondsPerHour;
359                    /*  To check whether the log content is there in the initial or later part of the log file or
360                        the log content is contained entirely within this log file or
361                        the entire log file contains the event log where the event spans across hours
362                    */
363                    if ((startTime >= logFileStartTime && startTime <= logFileEndTime)
364                            || (endTime >= logFileStartTime && endTime <= logFileEndTime)
365                            || (startTime <= logFileStartTime && endTime >= logFileEndTime)) {
366                        returnVal = logFileStartTime;
367                    }
368                } else {
369                    LOG.debug("Filename " + fileName + " does not match the expected format");
370                    returnVal = -1;
371                }
372            }
373            return returnVal;
374        }
375    }