This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.util;
019
020 import java.io.File;
021 import java.io.FileInputStream;
022 import java.io.IOException;
023 import java.io.InputStream;
024 import java.io.Writer;
025 import java.util.ArrayList;
026 import java.util.Calendar;
027 import java.util.Collections;
028 import java.util.Date;
029 import java.util.HashMap;
030 import java.util.List;
031 import java.util.Map;
032 import java.util.regex.Matcher;
033 import java.util.regex.Pattern;
034 import java.util.zip.GZIPInputStream;
035
036 /**
037 * XLogStreamer streams the given log file to logWriter after applying the given filter.
038 */
039 public class XLogStreamer {
040 private static XLog LOG = XLog.getLog(XLogStreamer.class);
041
042 /**
043 * Filter that will construct the regular expression that will be used to filter the log statement. And also checks
044 * if the given log message go through the filter. Filters that can be used are logLevel(Multi values separated by
045 * "|") jobId appName actionId token
046 */
047 public static class Filter {
048 private Map<String, Integer> logLevels;
049 private Map<String, String> filterParams;
050 private static List<String> parameters = new ArrayList<String>();
051 private boolean noFilter;
052 private Pattern filterPattern;
053
054 // TODO Patterns to be read from config file
055 private static final String DEFAULT_REGEX = "[^\\]]*";
056
057 public static final String ALLOW_ALL_REGEX = "(.*)";
058 private static final String TIMESTAMP_REGEX = "(\\d\\d\\d\\d-\\d\\d-\\d\\d \\d\\d:\\d\\d:\\d\\d,\\d\\d\\d)";
059 private static final String WHITE_SPACE_REGEX = "\\s+";
060 private static final String LOG_LEVEL_REGEX = "(\\w+)";
061 private static final String PREFIX_REGEX = TIMESTAMP_REGEX + WHITE_SPACE_REGEX + LOG_LEVEL_REGEX
062 + WHITE_SPACE_REGEX;
063 private static final Pattern SPLITTER_PATTERN = Pattern.compile(PREFIX_REGEX + ALLOW_ALL_REGEX);
064
065 public Filter() {
066 filterParams = new HashMap<String, String>();
067 for (int i = 0; i < parameters.size(); i++) {
068 filterParams.put(parameters.get(i), DEFAULT_REGEX);
069 }
070 logLevels = null;
071 noFilter = true;
072 filterPattern = null;
073 }
074
075 public void setLogLevel(String logLevel) {
076 if (logLevel != null && logLevel.trim().length() > 0) {
077 this.logLevels = new HashMap<String, Integer>();
078 String[] levels = logLevel.split("\\|");
079 for (int i = 0; i < levels.length; i++) {
080 String s = levels[i].trim().toUpperCase();
081 try {
082 XLog.Level.valueOf(s);
083 }
084 catch (Exception ex) {
085 continue;
086 }
087 this.logLevels.put(levels[i].toUpperCase(), 1);
088 }
089 }
090 }
091
092 public void setParameter(String filterParam, String value) {
093 if (filterParams.containsKey(filterParam)) {
094 noFilter = false;
095 filterParams.put(filterParam, value);
096 }
097 }
098
099 public static void defineParameter(String filterParam) {
100 parameters.add(filterParam);
101 }
102
103 public boolean isFilterPresent() {
104 if (noFilter && logLevels == null) {
105 return false;
106 }
107 return true;
108 }
109
110 /**
111 * Checks if the logLevel and logMessage goes through the logFilter.
112 *
113 * @param logParts
114 * @return
115 */
116 public boolean matches(ArrayList<String> logParts) {
117 String logLevel = logParts.get(0);
118 String logMessage = logParts.get(1);
119 if (this.logLevels == null || this.logLevels.containsKey(logLevel.toUpperCase())) {
120 Matcher logMatcher = filterPattern.matcher(logMessage);
121 return logMatcher.matches();
122 }
123 else {
124 return false;
125 }
126 }
127
128 /**
129 * Splits the log line into timestamp, logLevel and remaining log message. Returns array containing logLevel and
130 * logMessage if the pattern matches i.e A new log statement, else returns null.
131 *
132 * @param logLine
133 * @return Array containing log level and log message
134 */
135 public ArrayList<String> splitLogMessage(String logLine) {
136 Matcher splitter = SPLITTER_PATTERN.matcher(logLine);
137 if (splitter.matches()) {
138 ArrayList<String> logParts = new ArrayList<String>();
139 logParts.add(splitter.group(2));// log level
140 logParts.add(splitter.group(3));// Log Message
141 return logParts;
142 }
143 else {
144 return null;
145 }
146 }
147
148 /**
149 * Constructs the regular expression according to the filter and assigns it to fileterPattarn. ".*" will be
150 * assigned if no filters are set.
151 */
152 public void constructPattern() {
153 if (noFilter && logLevels == null) {
154 filterPattern = Pattern.compile(ALLOW_ALL_REGEX);
155 return;
156 }
157 StringBuilder sb = new StringBuilder();
158 if (noFilter) {
159 sb.append("(.*)");
160 }
161 else {
162 sb.append("(.* ");
163 for (int i = 0; i < parameters.size(); i++) {
164 sb.append(parameters.get(i) + "\\[");
165 sb.append(filterParams.get(parameters.get(i)) + "\\] ");
166 }
167 sb.append(".*)");
168 }
169 filterPattern = Pattern.compile(sb.toString());
170 }
171
172 public static void reset() {
173 parameters.clear();
174 }
175 }
176
177 private String logFile;
178 private String logPath;
179 private Filter logFilter;
180 private Writer logWriter;
181 private long logRotation;
182
183 public XLogStreamer(Filter logFilter, Writer logWriter, String logPath, String logFile, long logRotationSecs) {
184 this.logWriter = logWriter;
185 this.logFilter = logFilter;
186 if (logFile == null) {
187 logFile = "oozie-app.log";
188 }
189 this.logFile = logFile;
190 this.logPath = logPath;
191 this.logRotation = logRotationSecs * 1000l;
192 }
193
194 /**
195 * Gets the files that are modified between startTime and endTime in the given logPath and streams the log after
196 * applying the filters.
197 *
198 * @param startTime
199 * @param endTime
200 * @throws IOException
201 */
202 public void streamLog(Date startTime, Date endTime) throws IOException {
203 long startTimeMillis = 0;
204 long endTimeMillis;
205 if (startTime != null) {
206 startTimeMillis = startTime.getTime();
207 }
208 if (endTime == null) {
209 endTimeMillis = System.currentTimeMillis();
210 }
211 else {
212 endTimeMillis = endTime.getTime();
213 }
214 File dir = new File(logPath);
215 ArrayList<FileInfo> fileList = getFileList(dir, startTimeMillis, endTimeMillis, logRotation, logFile);
216 File file;
217 String fileName;
218 XLogReader logReader;
219 for (int i = 0; i < fileList.size(); i++) {
220 fileName = fileList.get(i).getFileName();
221 if (fileName.endsWith(".gz")) {
222 file = new File(fileName);
223 GZIPInputStream gzipInputStream = null;
224 gzipInputStream = new GZIPInputStream(new FileInputStream(file));
225 logReader = new XLogReader(gzipInputStream, logFilter, logWriter);
226 logReader.processLog();
227 logReader.close();
228 continue;
229 }
230 InputStream ifs;
231 ifs = new FileInputStream(fileName);
232 logReader = new XLogReader(ifs, logFilter, logWriter);
233 logReader.processLog();
234 ifs.close();
235 }
236 }
237
238 /**
239 * File name along with the modified time which will be used to sort later.
240 */
241 class FileInfo implements Comparable<FileInfo> {
242 String fileName;
243 long modTime;
244
245 public FileInfo(String fileName, long modTime) {
246 this.fileName = fileName;
247 this.modTime = modTime;
248 }
249
250 public String getFileName() {
251 return fileName;
252 }
253
254 public long getModTime() {
255 return modTime;
256 }
257
258 public int compareTo(FileInfo fileInfo) {
259 long diff = this.modTime - fileInfo.modTime;
260 if (diff > 0) {
261 return 1;
262 }
263 else if (diff < 0) {
264 return -1;
265 }
266 else {
267 return 0;
268 }
269 }
270 }
271
272 /**
273 * Gets the file list that will have the logs between startTime and endTime.
274 *
275 * @param dir
276 * @param startTime
277 * @param endTime
278 * @param logRotationTime
279 * @param logFile
280 * @return List of files to be streamed
281 */
282 private ArrayList<FileInfo> getFileList(File dir, long startTime, long endTime, long logRotationTime, String logFile) {
283 String[] children = dir.list();
284 ArrayList<FileInfo> fileList = new ArrayList<FileInfo>();
285 if (children == null) {
286 return fileList;
287 }
288 else {
289 for (int i = 0; i < children.length; i++) {
290 String fileName = children[i];
291 if (!fileName.startsWith(logFile) && !fileName.equals(logFile)) {
292 continue;
293 }
294 File file = new File(dir.getAbsolutePath(), fileName);
295 if (fileName.endsWith(".gz")) {
296 long gzFileCreationTime = getGZFileCreationTime(fileName, startTime, endTime);
297 if (gzFileCreationTime != -1) {
298 fileList.add(new FileInfo(file.getAbsolutePath(), gzFileCreationTime));
299 }
300 continue;
301 }
302 long modTime = file.lastModified();
303 if (modTime < startTime) {
304 continue;
305 }
306 if (modTime / logRotationTime > (endTime / logRotationTime + 1)) {
307 continue;
308 }
309 fileList.add(new FileInfo(file.getAbsolutePath(), modTime));
310 }
311 }
312 Collections.sort(fileList);
313 return fileList;
314 }
315
316 /**
317 * This pattern matches the end of a gzip filename to have a format like "-YYYY-MM-dd-HH.gz" with capturing groups for each part
318 * of the date
319 */
320 public static final Pattern gzTimePattern = Pattern.compile(".*-(\\d\\d\\d\\d)-(\\d\\d)-(\\d\\d)-(\\d\\d)\\.gz");
321
322 /**
323 * Returns the creation time of the .gz archive if it is relevant to the job
324 *
325 * @param fileName
326 * @param startTime
327 * @param endTime
328 * @return Modification time of .gz file after checking if it is relevant to the job
329 */
330 private long getGZFileCreationTime(String fileName, long startTime, long endTime) {
331 // Default return value of -1 to exclude the file
332 long returnVal = -1;
333
334 // Include oozie.log as oozie.log.gz if it is accidentally GZipped
335 if (fileName.equals("oozie.log.gz")) {
336 LOG.warn("oozie.log has been GZipped, which is unexpected");
337 // Return a value other than -1 to include the file in list
338 returnVal = 0;
339 } else {
340 Matcher m = gzTimePattern.matcher(fileName);
341 if (m.matches() && m.groupCount() == 4) {
342 int year = Integer.parseInt(m.group(1));
343 int month = Integer.parseInt(m.group(2));
344 int day = Integer.parseInt(m.group(3));
345 int hour = Integer.parseInt(m.group(4));
346 int minute = 0;
347 Calendar calendarEntry = Calendar.getInstance();
348 calendarEntry.set(year, month - 1, day, hour, minute); // give month-1(Say, 7 for August)
349 long logFileStartTime = calendarEntry.getTimeInMillis();
350 long milliSecondsPerHour = 3600000;
351 long logFileEndTime = logFileStartTime + milliSecondsPerHour;
352 /* To check whether the log content is there in the initial or later part of the log file or
353 the log content is contained entirely within this log file or
354 the entire log file contains the event log where the event spans across hours
355 */
356 if ((startTime >= logFileStartTime && startTime <= logFileEndTime)
357 || (endTime >= logFileStartTime && endTime <= logFileEndTime)
358 || (startTime <= logFileStartTime && endTime >= logFileEndTime)) {
359 returnVal = logFileStartTime;
360 }
361 } else {
362 LOG.debug("Filename " + fileName + " does not match the expected format");
363 returnVal = -1;
364 }
365 }
366 return returnVal;
367 }
368 }