001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.util;
019    
020    import java.io.File;
021    import java.io.FileInputStream;
022    import java.io.IOException;
023    import java.io.InputStream;
024    import java.io.Writer;
025    import java.util.ArrayList;
026    import java.util.Calendar;
027    import java.util.Collections;
028    import java.util.Date;
029    import java.util.HashMap;
030    import java.util.List;
031    import java.util.Map;
032    import java.util.regex.Matcher;
033    import java.util.regex.Pattern;
034    import java.util.zip.GZIPInputStream;
035    
036    /**
037     * XLogStreamer streams the given log file to logWriter after applying the given filter.
038     */
039    public class XLogStreamer {
040        private static XLog LOG = XLog.getLog(XLogStreamer.class);
041    
042        /**
043         * Filter that will construct the regular expression that will be used to filter the log statement. And also checks
044         * if the given log message go through the filter. Filters that can be used are logLevel(Multi values separated by
045         * "|") jobId appName actionId token
046         */
047        public static class Filter {
048            private Map<String, Integer> logLevels;
049            private Map<String, String> filterParams;
050            private static List<String> parameters = new ArrayList<String>();
051            private boolean noFilter;
052            private Pattern filterPattern;
053    
054            // TODO Patterns to be read from config file
055            private static final String DEFAULT_REGEX = "[^\\]]*";
056    
057            public static final String ALLOW_ALL_REGEX = "(.*)";
058            private static final String TIMESTAMP_REGEX = "(\\d\\d\\d\\d-\\d\\d-\\d\\d \\d\\d:\\d\\d:\\d\\d,\\d\\d\\d)";
059            private static final String WHITE_SPACE_REGEX = "\\s+";
060            private static final String LOG_LEVEL_REGEX = "(\\w+)";
061            private static final String PREFIX_REGEX = TIMESTAMP_REGEX + WHITE_SPACE_REGEX + LOG_LEVEL_REGEX
062                    + WHITE_SPACE_REGEX;
063            private static final Pattern SPLITTER_PATTERN = Pattern.compile(PREFIX_REGEX + ALLOW_ALL_REGEX);
064    
065            public Filter() {
066                filterParams = new HashMap<String, String>();
067                for (int i = 0; i < parameters.size(); i++) {
068                    filterParams.put(parameters.get(i), DEFAULT_REGEX);
069                }
070                logLevels = null;
071                noFilter = true;
072                filterPattern = null;
073            }
074    
075            public void setLogLevel(String logLevel) {
076                if (logLevel != null && logLevel.trim().length() > 0) {
077                    this.logLevels = new HashMap<String, Integer>();
078                    String[] levels = logLevel.split("\\|");
079                    for (int i = 0; i < levels.length; i++) {
080                        String s = levels[i].trim().toUpperCase();
081                        try {
082                            XLog.Level.valueOf(s);
083                        }
084                        catch (Exception ex) {
085                            continue;
086                        }
087                        this.logLevels.put(levels[i].toUpperCase(), 1);
088                    }
089                }
090            }
091    
092            public void setParameter(String filterParam, String value) {
093                if (filterParams.containsKey(filterParam)) {
094                    noFilter = false;
095                    filterParams.put(filterParam, value);
096                }
097            }
098    
099            public static void defineParameter(String filterParam) {
100                parameters.add(filterParam);
101            }
102    
103            public boolean isFilterPresent() {
104                if (noFilter && logLevels == null) {
105                    return false;
106                }
107                return true;
108            }
109    
110            /**
111             * Checks if the logLevel and logMessage goes through the logFilter.
112             *
113             * @param logParts
114             * @return
115             */
116            public boolean matches(ArrayList<String> logParts) {
117                String logLevel = logParts.get(0);
118                String logMessage = logParts.get(1);
119                if (this.logLevels == null || this.logLevels.containsKey(logLevel.toUpperCase())) {
120                    Matcher logMatcher = filterPattern.matcher(logMessage);
121                    return logMatcher.matches();
122                }
123                else {
124                    return false;
125                }
126            }
127    
128            /**
129             * Splits the log line into timestamp, logLevel and remaining log message. Returns array containing logLevel and
130             * logMessage if the pattern matches i.e A new log statement, else returns null.
131             *
132             * @param logLine
133             * @return Array containing log level and log message
134             */
135            public ArrayList<String> splitLogMessage(String logLine) {
136                Matcher splitter = SPLITTER_PATTERN.matcher(logLine);
137                if (splitter.matches()) {
138                    ArrayList<String> logParts = new ArrayList<String>();
139                    logParts.add(splitter.group(2));// log level
140                    logParts.add(splitter.group(3));// Log Message
141                    return logParts;
142                }
143                else {
144                    return null;
145                }
146            }
147    
148            /**
149             * Constructs the regular expression according to the filter and assigns it to fileterPattarn. ".*" will be
150             * assigned if no filters are set.
151             */
152            public void constructPattern() {
153                if (noFilter && logLevels == null) {
154                    filterPattern = Pattern.compile(ALLOW_ALL_REGEX);
155                    return;
156                }
157                StringBuilder sb = new StringBuilder();
158                if (noFilter) {
159                    sb.append("(.*)");
160                }
161                else {
162                    sb.append("(.* - ");
163                    for (int i = 0; i < parameters.size(); i++) {
164                        sb.append(parameters.get(i) + "\\[");
165                        sb.append(filterParams.get(parameters.get(i)) + "\\] ");
166                    }
167                    sb.append(".*)");
168                }
169                filterPattern = Pattern.compile(sb.toString());
170            }
171    
172            public static void reset() {
173                parameters.clear();
174            }
175        }
176    
177        private String logFile;
178        private String logPath;
179        private Filter logFilter;
180        private Writer logWriter;
181        private long logRotation;
182    
183        public XLogStreamer(Filter logFilter, Writer logWriter, String logPath, String logFile, long logRotationSecs) {
184            this.logWriter = logWriter;
185            this.logFilter = logFilter;
186            if (logFile == null) {
187                logFile = "oozie-app.log";
188            }
189            this.logFile = logFile;
190            this.logPath = logPath;
191            this.logRotation = logRotationSecs * 1000l;
192        }
193    
194        /**
195         * Gets the files that are modified between startTime and endTime in the given logPath and streams the log after
196         * applying the filters.
197         *
198         * @param startTime
199         * @param endTime
200         * @throws IOException
201         */
202        public void streamLog(Date startTime, Date endTime) throws IOException {
203            long startTimeMillis = 0;
204            long endTimeMillis;
205            if (startTime != null) {
206                startTimeMillis = startTime.getTime();
207            }
208            if (endTime == null) {
209                endTimeMillis = System.currentTimeMillis();
210            }
211            else {
212                endTimeMillis = endTime.getTime();
213            }
214            File dir = new File(logPath);
215            ArrayList<FileInfo> fileList = getFileList(dir, startTimeMillis, endTimeMillis, logRotation, logFile);
216            File file;
217            String fileName;
218            XLogReader logReader;
219            for (int i = 0; i < fileList.size(); i++) {
220                fileName = fileList.get(i).getFileName();
221                if (fileName.endsWith(".gz")) {
222                    file = new File(fileName);
223                    GZIPInputStream gzipInputStream = null;
224                    gzipInputStream = new GZIPInputStream(new FileInputStream(file));
225                    logReader = new XLogReader(gzipInputStream, logFilter, logWriter);
226                    logReader.processLog();
227                    logReader.close();
228                    continue;
229                }
230                InputStream ifs;
231                ifs = new FileInputStream(fileName);
232                logReader = new XLogReader(ifs, logFilter, logWriter);
233                logReader.processLog();
234                ifs.close();
235            }
236        }
237    
238        /**
239         * File name along with the modified time which will be used to sort later.
240         */
241        class FileInfo implements Comparable<FileInfo> {
242            String fileName;
243            long modTime;
244    
245            public FileInfo(String fileName, long modTime) {
246                this.fileName = fileName;
247                this.modTime = modTime;
248            }
249    
250            public String getFileName() {
251                return fileName;
252            }
253    
254            public long getModTime() {
255                return modTime;
256            }
257    
258            public int compareTo(FileInfo fileInfo) {
259                long diff = this.modTime - fileInfo.modTime;
260                if (diff > 0) {
261                    return 1;
262                }
263                else if (diff < 0) {
264                    return -1;
265                }
266                else {
267                    return 0;
268                }
269            }
270        }
271    
272        /**
273         * Gets the file list that will have the logs between startTime and endTime.
274         *
275         * @param dir
276         * @param startTime
277         * @param endTime
278         * @param logRotationTime
279         * @param logFile
280         * @return List of files to be streamed
281         */
282        private ArrayList<FileInfo> getFileList(File dir, long startTime, long endTime, long logRotationTime, String logFile) {
283            String[] children = dir.list();
284            ArrayList<FileInfo> fileList = new ArrayList<FileInfo>();
285            if (children == null) {
286                return fileList;
287            }
288            else {
289                for (int i = 0; i < children.length; i++) {
290                    String fileName = children[i];
291                    if (!fileName.startsWith(logFile) && !fileName.equals(logFile)) {
292                        continue;
293                    }
294                    File file = new File(dir.getAbsolutePath(), fileName);
295                    if (fileName.endsWith(".gz")) {
296                        long gzFileCreationTime = getGZFileCreationTime(fileName, startTime, endTime);
297                        if (gzFileCreationTime != -1) {
298                            fileList.add(new FileInfo(file.getAbsolutePath(), gzFileCreationTime));
299                        }
300                        continue;
301                    }
302                    long modTime = file.lastModified();
303                    if (modTime < startTime) {
304                        continue;
305                    }
306                    if (modTime / logRotationTime > (endTime / logRotationTime + 1)) {
307                        continue;
308                    }
309                    fileList.add(new FileInfo(file.getAbsolutePath(), modTime));
310                }
311            }
312            Collections.sort(fileList);
313            return fileList;
314        }
315        
316        /**
317         * This pattern matches the end of a gzip filename to have a format like "-YYYY-MM-dd-HH.gz" with capturing groups for each part
318         * of the date
319         */
320        public static final Pattern gzTimePattern = Pattern.compile(".*-(\\d\\d\\d\\d)-(\\d\\d)-(\\d\\d)-(\\d\\d)\\.gz");
321    
322        /**
323         * Returns the creation time of the .gz archive if it is relevant to the job
324         *
325         * @param fileName
326         * @param startTime
327         * @param endTime
328         * @return Modification time of .gz file after checking if it is relevant to the job
329         */
330        private long getGZFileCreationTime(String fileName, long startTime, long endTime) {
331            // Default return value of -1 to exclude the file
332            long returnVal = -1;
333            
334            // Include oozie.log as oozie.log.gz if it is accidentally GZipped
335            if (fileName.equals("oozie.log.gz")) {
336                LOG.warn("oozie.log has been GZipped, which is unexpected");
337                // Return a value other than -1 to include the file in list
338                returnVal = 0;
339            } else {
340                Matcher m = gzTimePattern.matcher(fileName);
341                if (m.matches() && m.groupCount() == 4) {
342                    int year = Integer.parseInt(m.group(1));
343                    int month = Integer.parseInt(m.group(2));
344                    int day = Integer.parseInt(m.group(3));
345                    int hour = Integer.parseInt(m.group(4));
346                    int minute = 0;
347                    Calendar calendarEntry = Calendar.getInstance();
348                    calendarEntry.set(year, month - 1, day, hour, minute); // give month-1(Say, 7 for August)
349                    long logFileStartTime = calendarEntry.getTimeInMillis();
350                    long milliSecondsPerHour = 3600000;
351                    long logFileEndTime = logFileStartTime + milliSecondsPerHour;
352                    /*  To check whether the log content is there in the initial or later part of the log file or
353                        the log content is contained entirely within this log file or
354                        the entire log file contains the event log where the event spans across hours
355                    */
356                    if ((startTime >= logFileStartTime && startTime <= logFileEndTime)
357                            || (endTime >= logFileStartTime && endTime <= logFileEndTime)
358                            || (startTime <= logFileStartTime && endTime >= logFileEndTime)) {
359                        returnVal = logFileStartTime;
360                    }
361                } else {
362                    LOG.debug("Filename " + fileName + " does not match the expected format");
363                    returnVal = -1;
364                }
365            }
366            return returnVal;
367        }
368    }