This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.util;
019
020 import java.io.File;
021 import java.io.FileInputStream;
022 import java.io.IOException;
023 import java.io.InputStream;
024 import java.io.Writer;
025 import java.util.ArrayList;
026 import java.util.Calendar;
027 import java.util.Collections;
028 import java.util.Date;
029 import java.util.HashMap;
030 import java.util.List;
031 import java.util.Map;
032 import java.util.regex.Matcher;
033 import java.util.regex.Pattern;
034 import java.util.zip.GZIPInputStream;
035
036 import com.google.common.annotations.VisibleForTesting;
037
038 /**
039 * XLogStreamer streams the given log file to logWriter after applying the given filter.
040 */
041 public class XLogStreamer {
042 private static XLog LOG = XLog.getLog(XLogStreamer.class);
043
044 /**
045 * Filter that will construct the regular expression that will be used to filter the log statement. And also checks
046 * if the given log message go through the filter. Filters that can be used are logLevel(Multi values separated by
047 * "|") jobId appName actionId token
048 */
049 public static class Filter {
050 private Map<String, Integer> logLevels;
051 private final Map<String, String> filterParams;
052 private static List<String> parameters = new ArrayList<String>();
053 private boolean noFilter;
054 private Pattern filterPattern;
055
056 // TODO Patterns to be read from config file
057 private static final String DEFAULT_REGEX = "[^\\]]*";
058
059 public static final String ALLOW_ALL_REGEX = "(.*)";
060 private static final String TIMESTAMP_REGEX = "(\\d\\d\\d\\d-\\d\\d-\\d\\d \\d\\d:\\d\\d:\\d\\d,\\d\\d\\d)";
061 private static final String WHITE_SPACE_REGEX = "\\s+";
062 private static final String LOG_LEVEL_REGEX = "(\\w+)";
063 private static final String PREFIX_REGEX = TIMESTAMP_REGEX + WHITE_SPACE_REGEX + LOG_LEVEL_REGEX
064 + WHITE_SPACE_REGEX;
065 private static final Pattern SPLITTER_PATTERN = Pattern.compile(PREFIX_REGEX + ALLOW_ALL_REGEX);
066
067 public Filter() {
068 filterParams = new HashMap<String, String>();
069 for (int i = 0; i < parameters.size(); i++) {
070 filterParams.put(parameters.get(i), DEFAULT_REGEX);
071 }
072 logLevels = null;
073 noFilter = true;
074 filterPattern = null;
075 }
076
077 public void setLogLevel(String logLevel) {
078 if (logLevel != null && logLevel.trim().length() > 0) {
079 this.logLevels = new HashMap<String, Integer>();
080 String[] levels = logLevel.split("\\|");
081 for (int i = 0; i < levels.length; i++) {
082 String s = levels[i].trim().toUpperCase();
083 try {
084 XLog.Level.valueOf(s);
085 }
086 catch (Exception ex) {
087 continue;
088 }
089 this.logLevels.put(levels[i].toUpperCase(), 1);
090 }
091 }
092 }
093
094 public void setParameter(String filterParam, String value) {
095 if (filterParams.containsKey(filterParam)) {
096 noFilter = false;
097 filterParams.put(filterParam, value);
098 }
099 }
100
101 public static void defineParameter(String filterParam) {
102 parameters.add(filterParam);
103 }
104
105 public boolean isFilterPresent() {
106 if (noFilter && logLevels == null) {
107 return false;
108 }
109 return true;
110 }
111
112 /**
113 * Checks if the logLevel and logMessage goes through the logFilter.
114 *
115 * @param logParts
116 * @return
117 */
118 public boolean matches(ArrayList<String> logParts) {
119 String logLevel = logParts.get(0);
120 String logMessage = logParts.get(1);
121 if (this.logLevels == null || this.logLevels.containsKey(logLevel.toUpperCase())) {
122 Matcher logMatcher = filterPattern.matcher(logMessage);
123 return logMatcher.matches();
124 }
125 else {
126 return false;
127 }
128 }
129
130 /**
131 * Splits the log line into timestamp, logLevel and remaining log message. Returns array containing logLevel and
132 * logMessage if the pattern matches i.e A new log statement, else returns null.
133 *
134 * @param logLine
135 * @return Array containing log level and log message
136 */
137 public ArrayList<String> splitLogMessage(String logLine) {
138 Matcher splitter = SPLITTER_PATTERN.matcher(logLine);
139 if (splitter.matches()) {
140 ArrayList<String> logParts = new ArrayList<String>();
141 logParts.add(splitter.group(2));// log level
142 logParts.add(splitter.group(3));// Log Message
143 return logParts;
144 }
145 else {
146 return null;
147 }
148 }
149
150 /**
151 * Constructs the regular expression according to the filter and assigns it to fileterPattarn. ".*" will be
152 * assigned if no filters are set.
153 */
154 public void constructPattern() {
155 if (noFilter && logLevels == null) {
156 filterPattern = Pattern.compile(ALLOW_ALL_REGEX);
157 return;
158 }
159 StringBuilder sb = new StringBuilder();
160 if (noFilter) {
161 sb.append("(.*)");
162 }
163 else {
164 sb.append("(.* ");
165 for (int i = 0; i < parameters.size(); i++) {
166 sb.append(parameters.get(i) + "\\[");
167 sb.append(filterParams.get(parameters.get(i)) + "\\] ");
168 }
169 sb.append(".*)");
170 }
171 filterPattern = Pattern.compile(sb.toString());
172 }
173
174 public static void reset() {
175 parameters.clear();
176 }
177
178 @VisibleForTesting
179 public final Map<String, String> getFilterParams() {
180 return filterParams;
181 }
182 }
183
184 private String logFile;
185 private String logPath;
186 private Filter logFilter;
187 private Writer logWriter;
188 private long logRotation;
189
190 public XLogStreamer(Filter logFilter, Writer logWriter, String logPath, String logFile, long logRotationSecs) {
191 this.logWriter = logWriter;
192 this.logFilter = logFilter;
193 if (logFile == null) {
194 logFile = "oozie-app.log";
195 }
196 this.logFile = logFile;
197 this.logPath = logPath;
198 this.logRotation = logRotationSecs * 1000l;
199 }
200
201 /**
202 * Gets the files that are modified between startTime and endTime in the given logPath and streams the log after
203 * applying the filters.
204 *
205 * @param startTime
206 * @param endTime
207 * @throws IOException
208 */
209 public void streamLog(Date startTime, Date endTime) throws IOException {
210 long startTimeMillis = 0;
211 long endTimeMillis;
212 if (startTime != null) {
213 startTimeMillis = startTime.getTime();
214 }
215 if (endTime == null) {
216 endTimeMillis = System.currentTimeMillis();
217 }
218 else {
219 endTimeMillis = endTime.getTime();
220 }
221 File dir = new File(logPath);
222 ArrayList<FileInfo> fileList = getFileList(dir, startTimeMillis, endTimeMillis, logRotation, logFile);
223 File file;
224 String fileName;
225 XLogReader logReader;
226 for (int i = 0; i < fileList.size(); i++) {
227 fileName = fileList.get(i).getFileName();
228 if (fileName.endsWith(".gz")) {
229 file = new File(fileName);
230 GZIPInputStream gzipInputStream = null;
231 gzipInputStream = new GZIPInputStream(new FileInputStream(file));
232 logReader = new XLogReader(gzipInputStream, logFilter, logWriter);
233 logReader.processLog();
234 logReader.close();
235 continue;
236 }
237 InputStream ifs;
238 ifs = new FileInputStream(fileName);
239 logReader = new XLogReader(ifs, logFilter, logWriter);
240 logReader.processLog();
241 ifs.close();
242 }
243 }
244
245 /**
246 * File name along with the modified time which will be used to sort later.
247 */
248 class FileInfo implements Comparable<FileInfo> {
249 String fileName;
250 long modTime;
251
252 public FileInfo(String fileName, long modTime) {
253 this.fileName = fileName;
254 this.modTime = modTime;
255 }
256
257 public String getFileName() {
258 return fileName;
259 }
260
261 public long getModTime() {
262 return modTime;
263 }
264
265 public int compareTo(FileInfo fileInfo) {
266 long diff = this.modTime - fileInfo.modTime;
267 if (diff > 0) {
268 return 1;
269 }
270 else if (diff < 0) {
271 return -1;
272 }
273 else {
274 return 0;
275 }
276 }
277 }
278
279 /**
280 * Gets the file list that will have the logs between startTime and endTime.
281 *
282 * @param dir
283 * @param startTime
284 * @param endTime
285 * @param logRotationTime
286 * @param logFile
287 * @return List of files to be streamed
288 */
289 private ArrayList<FileInfo> getFileList(File dir, long startTime, long endTime, long logRotationTime, String logFile) {
290 String[] children = dir.list();
291 ArrayList<FileInfo> fileList = new ArrayList<FileInfo>();
292 if (children == null) {
293 return fileList;
294 }
295 else {
296 for (int i = 0; i < children.length; i++) {
297 String fileName = children[i];
298 if (!fileName.startsWith(logFile) && !fileName.equals(logFile)) {
299 continue;
300 }
301 File file = new File(dir.getAbsolutePath(), fileName);
302 if (fileName.endsWith(".gz")) {
303 long gzFileCreationTime = getGZFileCreationTime(fileName, startTime, endTime);
304 if (gzFileCreationTime != -1) {
305 fileList.add(new FileInfo(file.getAbsolutePath(), gzFileCreationTime));
306 }
307 continue;
308 }
309 long modTime = file.lastModified();
310 if (modTime < startTime) {
311 continue;
312 }
313 if (modTime / logRotationTime > (endTime / logRotationTime + 1)) {
314 continue;
315 }
316 fileList.add(new FileInfo(file.getAbsolutePath(), modTime));
317 }
318 }
319 Collections.sort(fileList);
320 return fileList;
321 }
322
323 /**
324 * This pattern matches the end of a gzip filename to have a format like "-YYYY-MM-dd-HH.gz" with capturing groups for each part
325 * of the date
326 */
327 public static final Pattern gzTimePattern = Pattern.compile(".*-(\\d\\d\\d\\d)-(\\d\\d)-(\\d\\d)-(\\d\\d)\\.gz");
328
329 /**
330 * Returns the creation time of the .gz archive if it is relevant to the job
331 *
332 * @param fileName
333 * @param startTime
334 * @param endTime
335 * @return Modification time of .gz file after checking if it is relevant to the job
336 */
337 private long getGZFileCreationTime(String fileName, long startTime, long endTime) {
338 // Default return value of -1 to exclude the file
339 long returnVal = -1;
340
341 // Include oozie.log as oozie.log.gz if it is accidentally GZipped
342 if (fileName.equals("oozie.log.gz")) {
343 LOG.warn("oozie.log has been GZipped, which is unexpected");
344 // Return a value other than -1 to include the file in list
345 returnVal = 0;
346 } else {
347 Matcher m = gzTimePattern.matcher(fileName);
348 if (m.matches() && m.groupCount() == 4) {
349 int year = Integer.parseInt(m.group(1));
350 int month = Integer.parseInt(m.group(2));
351 int day = Integer.parseInt(m.group(3));
352 int hour = Integer.parseInt(m.group(4));
353 int minute = 0;
354 Calendar calendarEntry = Calendar.getInstance();
355 calendarEntry.set(year, month - 1, day, hour, minute); // give month-1(Say, 7 for August)
356 long logFileStartTime = calendarEntry.getTimeInMillis();
357 long milliSecondsPerHour = 3600000;
358 long logFileEndTime = logFileStartTime + milliSecondsPerHour;
359 /* To check whether the log content is there in the initial or later part of the log file or
360 the log content is contained entirely within this log file or
361 the entire log file contains the event log where the event spans across hours
362 */
363 if ((startTime >= logFileStartTime && startTime <= logFileEndTime)
364 || (endTime >= logFileStartTime && endTime <= logFileEndTime)
365 || (startTime <= logFileStartTime && endTime >= logFileEndTime)) {
366 returnVal = logFileStartTime;
367 }
368 } else {
369 LOG.debug("Filename " + fileName + " does not match the expected format");
370 returnVal = -1;
371 }
372 }
373 return returnVal;
374 }
375 }