001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.util;
019    
020    import java.io.File;
021    import java.io.FileInputStream;
022    import java.io.IOException;
023    import java.io.InputStream;
024    import java.io.Writer;
025    import java.util.ArrayList;
026    import java.util.Calendar;
027    import java.util.Collections;
028    import java.util.Date;
029    import java.util.HashMap;
030    import java.util.List;
031    import java.util.Map;
032    import java.util.regex.Matcher;
033    import java.util.regex.Pattern;
034    import java.util.zip.GZIPInputStream;
035    
036    /**
037     * XLogStreamer streams the given log file to logWriter after applying the given filter.
038     */
039    public class XLogStreamer {
040    
041        /**
042         * Filter that will construct the regular expression that will be used to filter the log statement. And also checks
043         * if the given log message go through the filter. Filters that can be used are logLevel(Multi values separated by
044         * "|") jobId appName actionId token
045         */
046        public static class Filter {
047            private Map<String, Integer> logLevels;
048            private Map<String, String> filterParams;
049            private static List<String> parameters = new ArrayList<String>();
050            private boolean noFilter;
051            private Pattern filterPattern;
052    
053            // TODO Patterns to be read from config file
054            private static final String DEFAULT_REGEX = "[^\\]]*";
055    
056            public static final String ALLOW_ALL_REGEX = "(.*)";
057            private static final String TIMESTAMP_REGEX = "(\\d\\d\\d\\d-\\d\\d-\\d\\d \\d\\d:\\d\\d:\\d\\d,\\d\\d\\d)";
058            private static final String WHITE_SPACE_REGEX = "\\s+";
059            private static final String LOG_LEVEL_REGEX = "(\\w+)";
060            private static final String PREFIX_REGEX = TIMESTAMP_REGEX + WHITE_SPACE_REGEX + LOG_LEVEL_REGEX
061                    + WHITE_SPACE_REGEX;
062            private static final Pattern SPLITTER_PATTERN = Pattern.compile(PREFIX_REGEX + ALLOW_ALL_REGEX);
063    
064            public Filter() {
065                filterParams = new HashMap<String, String>();
066                for (int i = 0; i < parameters.size(); i++) {
067                    filterParams.put(parameters.get(i), DEFAULT_REGEX);
068                }
069                logLevels = null;
070                noFilter = true;
071                filterPattern = null;
072            }
073    
074            public void setLogLevel(String logLevel) {
075                if (logLevel != null && logLevel.trim().length() > 0) {
076                    this.logLevels = new HashMap<String, Integer>();
077                    String[] levels = logLevel.split("\\|");
078                    for (int i = 0; i < levels.length; i++) {
079                        String s = levels[i].trim().toUpperCase();
080                        try {
081                            XLog.Level.valueOf(s);
082                        }
083                        catch (Exception ex) {
084                            continue;
085                        }
086                        this.logLevels.put(levels[i].toUpperCase(), 1);
087                    }
088                }
089            }
090    
091            public void setParameter(String filterParam, String value) {
092                if (filterParams.containsKey(filterParam)) {
093                    noFilter = false;
094                    filterParams.put(filterParam, value);
095                }
096            }
097    
098            public static void defineParameter(String filterParam) {
099                parameters.add(filterParam);
100            }
101    
102            public boolean isFilterPresent() {
103                if (noFilter && logLevels == null) {
104                    return false;
105                }
106                return true;
107            }
108    
109            /**
110             * Checks if the logLevel and logMessage goes through the logFilter.
111             *
112             * @param logParts
113             * @return
114             */
115            public boolean matches(ArrayList<String> logParts) {
116                String logLevel = logParts.get(0);
117                String logMessage = logParts.get(1);
118                if (this.logLevels == null || this.logLevels.containsKey(logLevel.toUpperCase())) {
119                    Matcher logMatcher = filterPattern.matcher(logMessage);
120                    return logMatcher.matches();
121                }
122                else {
123                    return false;
124                }
125            }
126    
127            /**
128             * Splits the log line into timestamp, logLevel and remaining log message. Returns array containing logLevel and
129             * logMessage if the pattern matches i.e A new log statement, else returns null.
130             *
131             * @param logLine
132             * @return Array containing log level and log message
133             */
134            public ArrayList<String> splitLogMessage(String logLine) {
135                Matcher splitter = SPLITTER_PATTERN.matcher(logLine);
136                if (splitter.matches()) {
137                    ArrayList<String> logParts = new ArrayList<String>();
138                    logParts.add(splitter.group(2));// log level
139                    logParts.add(splitter.group(3));// Log Message
140                    return logParts;
141                }
142                else {
143                    return null;
144                }
145            }
146    
147            /**
148             * Constructs the regular expression according to the filter and assigns it to fileterPattarn. ".*" will be
149             * assigned if no filters are set.
150             */
151            public void constructPattern() {
152                if (noFilter && logLevels == null) {
153                    filterPattern = Pattern.compile(ALLOW_ALL_REGEX);
154                    return;
155                }
156                StringBuilder sb = new StringBuilder();
157                if (noFilter) {
158                    sb.append("(.*)");
159                }
160                else {
161                    sb.append("(.* - ");
162                    for (int i = 0; i < parameters.size(); i++) {
163                        sb.append(parameters.get(i) + "\\[");
164                        sb.append(filterParams.get(parameters.get(i)) + "\\] ");
165                    }
166                    sb.append(".*)");
167                }
168                filterPattern = Pattern.compile(sb.toString());
169            }
170    
171            public static void reset() {
172                parameters.clear();
173            }
174        }
175    
176        private String logFile;
177        private String logPath;
178        private Filter logFilter;
179        private Writer logWriter;
180        private long logRotation;
181    
182        public XLogStreamer(Filter logFilter, Writer logWriter, String logPath, String logFile, long logRotationSecs) {
183            this.logWriter = logWriter;
184            this.logFilter = logFilter;
185            if (logFile == null) {
186                logFile = "oozie-app.log";
187            }
188            this.logFile = logFile;
189            this.logPath = logPath;
190            this.logRotation = logRotationSecs * 1000l;
191        }
192    
193        /**
194         * Gets the files that are modified between startTime and endTime in the given logPath and streams the log after
195         * applying the filters.
196         *
197         * @param startTime
198         * @param endTime
199         * @throws IOException
200         */
201        public void streamLog(Date startTime, Date endTime) throws IOException {
202            long startTimeMillis = 0;
203            long endTimeMillis;
204            if (startTime != null) {
205                startTimeMillis = startTime.getTime();
206            }
207            if (endTime == null) {
208                endTimeMillis = System.currentTimeMillis();
209            }
210            else {
211                endTimeMillis = endTime.getTime();
212            }
213            File dir = new File(logPath);
214            ArrayList<FileInfo> fileList = getFileList(dir, startTimeMillis, endTimeMillis, logRotation, logFile);
215            File file;
216            String fileName;
217            XLogReader logReader;
218            for (int i = 0; i < fileList.size(); i++) {
219                fileName = fileList.get(i).getFileName();
220                if (fileName.endsWith(".gz")) {
221                    file = new File(fileName);
222                    GZIPInputStream gzipInputStream = null;
223                    gzipInputStream = new GZIPInputStream(new FileInputStream(file));
224                    logReader = new XLogReader(gzipInputStream, logFilter, logWriter);
225                    logReader.processLog();
226                    logReader.close();
227                    continue;
228                }
229                InputStream ifs;
230                ifs = new FileInputStream(fileName);
231                logReader = new XLogReader(ifs, logFilter, logWriter);
232                logReader.processLog();
233                ifs.close();
234            }
235        }
236    
237        /**
238         * File name along with the modified time which will be used to sort later.
239         */
240        class FileInfo implements Comparable<FileInfo> {
241            String fileName;
242            long modTime;
243    
244            public FileInfo(String fileName, long modTime) {
245                this.fileName = fileName;
246                this.modTime = modTime;
247            }
248    
249            public String getFileName() {
250                return fileName;
251            }
252    
253            public long getModTime() {
254                return modTime;
255            }
256    
257            public int compareTo(FileInfo fileInfo) {
258                long diff = this.modTime - fileInfo.modTime;
259                if (diff > 0) {
260                    return 1;
261                }
262                else if (diff < 0) {
263                    return -1;
264                }
265                else {
266                    return 0;
267                }
268            }
269        }
270    
271        /**
272         * Gets the file list that will have the logs between startTime and endTime.
273         *
274         * @param dir
275         * @param startTime
276         * @param endTime
277         * @param logRotationTime
278         * @param logFile
279         * @return List of files to be streamed
280         */
281        private ArrayList<FileInfo> getFileList(File dir, long startTime, long endTime, long logRotationTime, String logFile) {
282            String[] children = dir.list();
283            ArrayList<FileInfo> fileList = new ArrayList<FileInfo>();
284            if (children == null) {
285                return fileList;
286            }
287            else {
288                for (int i = 0; i < children.length; i++) {
289                    String fileName = children[i];
290                    if (!fileName.startsWith(logFile) && !fileName.equals(logFile)) {
291                        continue;
292                    }
293                    File file = new File(dir.getAbsolutePath(), fileName);
294                    if (fileName.endsWith(".gz")) {
295                        long gzFileCreationTime = getGZFileCreationTime(fileName, startTime, endTime);
296                        if (gzFileCreationTime != -1) {
297                            fileList.add(new FileInfo(file.getAbsolutePath(), gzFileCreationTime));
298                        }
299                        continue;
300                    }
301                    long modTime = file.lastModified();
302                    if (modTime < startTime) {
303                        continue;
304                    }
305                    if (modTime / logRotationTime > (endTime / logRotationTime + 1)) {
306                        continue;
307                    }
308                    fileList.add(new FileInfo(file.getAbsolutePath(), modTime));
309                }
310            }
311            Collections.sort(fileList);
312            return fileList;
313        }
314    
315        /**
316         * Returns the creation time of the .gz archive if it is relevant to the job
317         *
318         * @param fileName
319         * @param startTime
320         * @param endTime
321         * @return Modification time of .gz file after checking if it is relevant to the job
322         */
323        private long getGZFileCreationTime(String fileName, long startTime, long endTime) {
324            long returnVal = -1;
325            int dateStartIndex = 10;
326            String[] dateDetails;
327            dateDetails = fileName.substring(dateStartIndex, fileName.length() - 3).split("-");
328            int year = Integer.parseInt(dateDetails[0]);
329            int month = Integer.parseInt(dateDetails[1]);
330            int day = Integer.parseInt(dateDetails[2]);
331            int hour = Integer.parseInt(dateDetails[3]);
332            int minute = 0;
333            Calendar calendarEntry = Calendar.getInstance();
334            calendarEntry.set(year, month - 1, day, hour, minute); // give month-1(Say, 7 for August)
335            long logFileStartTime = calendarEntry.getTimeInMillis();
336            long milliSecondsPerHour = 3600000;
337            long logFileEndTime = logFileStartTime + milliSecondsPerHour;
338            if ((startTime >= logFileStartTime && startTime <= logFileEndTime)
339                    || (endTime >= logFileStartTime && endTime <= logFileEndTime)) {
340                returnVal = logFileStartTime;
341            }
342            return returnVal;
343        }
344    }