This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.util;
019
020 import java.io.File;
021 import java.io.FileInputStream;
022 import java.io.IOException;
023 import java.io.InputStream;
024 import java.io.Writer;
025 import java.util.ArrayList;
026 import java.util.Calendar;
027 import java.util.Collections;
028 import java.util.Date;
029 import java.util.HashMap;
030 import java.util.List;
031 import java.util.Map;
032 import java.util.regex.Matcher;
033 import java.util.regex.Pattern;
034 import java.util.zip.GZIPInputStream;
035
036 /**
037 * XLogStreamer streams the given log file to logWriter after applying the given filter.
038 */
039 public class XLogStreamer {
040
041 /**
042 * Filter that will construct the regular expression that will be used to filter the log statement. And also checks
043 * if the given log message go through the filter. Filters that can be used are logLevel(Multi values separated by
044 * "|") jobId appName actionId token
045 */
046 public static class Filter {
047 private Map<String, Integer> logLevels;
048 private Map<String, String> filterParams;
049 private static List<String> parameters = new ArrayList<String>();
050 private boolean noFilter;
051 private Pattern filterPattern;
052
053 // TODO Patterns to be read from config file
054 private static final String DEFAULT_REGEX = "[^\\]]*";
055
056 public static final String ALLOW_ALL_REGEX = "(.*)";
057 private static final String TIMESTAMP_REGEX = "(\\d\\d\\d\\d-\\d\\d-\\d\\d \\d\\d:\\d\\d:\\d\\d,\\d\\d\\d)";
058 private static final String WHITE_SPACE_REGEX = "\\s+";
059 private static final String LOG_LEVEL_REGEX = "(\\w+)";
060 private static final String PREFIX_REGEX = TIMESTAMP_REGEX + WHITE_SPACE_REGEX + LOG_LEVEL_REGEX
061 + WHITE_SPACE_REGEX;
062 private static final Pattern SPLITTER_PATTERN = Pattern.compile(PREFIX_REGEX + ALLOW_ALL_REGEX);
063
064 public Filter() {
065 filterParams = new HashMap<String, String>();
066 for (int i = 0; i < parameters.size(); i++) {
067 filterParams.put(parameters.get(i), DEFAULT_REGEX);
068 }
069 logLevels = null;
070 noFilter = true;
071 filterPattern = null;
072 }
073
074 public void setLogLevel(String logLevel) {
075 if (logLevel != null && logLevel.trim().length() > 0) {
076 this.logLevels = new HashMap<String, Integer>();
077 String[] levels = logLevel.split("\\|");
078 for (int i = 0; i < levels.length; i++) {
079 String s = levels[i].trim().toUpperCase();
080 try {
081 XLog.Level.valueOf(s);
082 }
083 catch (Exception ex) {
084 continue;
085 }
086 this.logLevels.put(levels[i].toUpperCase(), 1);
087 }
088 }
089 }
090
091 public void setParameter(String filterParam, String value) {
092 if (filterParams.containsKey(filterParam)) {
093 noFilter = false;
094 filterParams.put(filterParam, value);
095 }
096 }
097
098 public static void defineParameter(String filterParam) {
099 parameters.add(filterParam);
100 }
101
102 public boolean isFilterPresent() {
103 if (noFilter && logLevels == null) {
104 return false;
105 }
106 return true;
107 }
108
109 /**
110 * Checks if the logLevel and logMessage goes through the logFilter.
111 *
112 * @param logParts
113 * @return
114 */
115 public boolean matches(ArrayList<String> logParts) {
116 String logLevel = logParts.get(0);
117 String logMessage = logParts.get(1);
118 if (this.logLevels == null || this.logLevels.containsKey(logLevel.toUpperCase())) {
119 Matcher logMatcher = filterPattern.matcher(logMessage);
120 return logMatcher.matches();
121 }
122 else {
123 return false;
124 }
125 }
126
127 /**
128 * Splits the log line into timestamp, logLevel and remaining log message. Returns array containing logLevel and
129 * logMessage if the pattern matches i.e A new log statement, else returns null.
130 *
131 * @param logLine
132 * @return Array containing log level and log message
133 */
134 public ArrayList<String> splitLogMessage(String logLine) {
135 Matcher splitter = SPLITTER_PATTERN.matcher(logLine);
136 if (splitter.matches()) {
137 ArrayList<String> logParts = new ArrayList<String>();
138 logParts.add(splitter.group(2));// log level
139 logParts.add(splitter.group(3));// Log Message
140 return logParts;
141 }
142 else {
143 return null;
144 }
145 }
146
147 /**
148 * Constructs the regular expression according to the filter and assigns it to fileterPattarn. ".*" will be
149 * assigned if no filters are set.
150 */
151 public void constructPattern() {
152 if (noFilter && logLevels == null) {
153 filterPattern = Pattern.compile(ALLOW_ALL_REGEX);
154 return;
155 }
156 StringBuilder sb = new StringBuilder();
157 if (noFilter) {
158 sb.append("(.*)");
159 }
160 else {
161 sb.append("(.* - ");
162 for (int i = 0; i < parameters.size(); i++) {
163 sb.append(parameters.get(i) + "\\[");
164 sb.append(filterParams.get(parameters.get(i)) + "\\] ");
165 }
166 sb.append(".*)");
167 }
168 filterPattern = Pattern.compile(sb.toString());
169 }
170
171 public static void reset() {
172 parameters.clear();
173 }
174 }
175
176 private String logFile;
177 private String logPath;
178 private Filter logFilter;
179 private Writer logWriter;
180 private long logRotation;
181
182 public XLogStreamer(Filter logFilter, Writer logWriter, String logPath, String logFile, long logRotationSecs) {
183 this.logWriter = logWriter;
184 this.logFilter = logFilter;
185 if (logFile == null) {
186 logFile = "oozie-app.log";
187 }
188 this.logFile = logFile;
189 this.logPath = logPath;
190 this.logRotation = logRotationSecs * 1000l;
191 }
192
193 /**
194 * Gets the files that are modified between startTime and endTime in the given logPath and streams the log after
195 * applying the filters.
196 *
197 * @param startTime
198 * @param endTime
199 * @throws IOException
200 */
201 public void streamLog(Date startTime, Date endTime) throws IOException {
202 long startTimeMillis = 0;
203 long endTimeMillis;
204 if (startTime != null) {
205 startTimeMillis = startTime.getTime();
206 }
207 if (endTime == null) {
208 endTimeMillis = System.currentTimeMillis();
209 }
210 else {
211 endTimeMillis = endTime.getTime();
212 }
213 File dir = new File(logPath);
214 ArrayList<FileInfo> fileList = getFileList(dir, startTimeMillis, endTimeMillis, logRotation, logFile);
215 File file;
216 String fileName;
217 XLogReader logReader;
218 for (int i = 0; i < fileList.size(); i++) {
219 fileName = fileList.get(i).getFileName();
220 if (fileName.endsWith(".gz")) {
221 file = new File(fileName);
222 GZIPInputStream gzipInputStream = null;
223 gzipInputStream = new GZIPInputStream(new FileInputStream(file));
224 logReader = new XLogReader(gzipInputStream, logFilter, logWriter);
225 logReader.processLog();
226 logReader.close();
227 continue;
228 }
229 InputStream ifs;
230 ifs = new FileInputStream(fileName);
231 logReader = new XLogReader(ifs, logFilter, logWriter);
232 logReader.processLog();
233 ifs.close();
234 }
235 }
236
237 /**
238 * File name along with the modified time which will be used to sort later.
239 */
240 class FileInfo implements Comparable<FileInfo> {
241 String fileName;
242 long modTime;
243
244 public FileInfo(String fileName, long modTime) {
245 this.fileName = fileName;
246 this.modTime = modTime;
247 }
248
249 public String getFileName() {
250 return fileName;
251 }
252
253 public long getModTime() {
254 return modTime;
255 }
256
257 public int compareTo(FileInfo fileInfo) {
258 long diff = this.modTime - fileInfo.modTime;
259 if (diff > 0) {
260 return 1;
261 }
262 else if (diff < 0) {
263 return -1;
264 }
265 else {
266 return 0;
267 }
268 }
269 }
270
271 /**
272 * Gets the file list that will have the logs between startTime and endTime.
273 *
274 * @param dir
275 * @param startTime
276 * @param endTime
277 * @param logRotationTime
278 * @param logFile
279 * @return List of files to be streamed
280 */
281 private ArrayList<FileInfo> getFileList(File dir, long startTime, long endTime, long logRotationTime, String logFile) {
282 String[] children = dir.list();
283 ArrayList<FileInfo> fileList = new ArrayList<FileInfo>();
284 if (children == null) {
285 return fileList;
286 }
287 else {
288 for (int i = 0; i < children.length; i++) {
289 String fileName = children[i];
290 if (!fileName.startsWith(logFile) && !fileName.equals(logFile)) {
291 continue;
292 }
293 File file = new File(dir.getAbsolutePath(), fileName);
294 if (fileName.endsWith(".gz")) {
295 long gzFileCreationTime = getGZFileCreationTime(fileName, startTime, endTime);
296 if (gzFileCreationTime != -1) {
297 fileList.add(new FileInfo(file.getAbsolutePath(), gzFileCreationTime));
298 }
299 continue;
300 }
301 long modTime = file.lastModified();
302 if (modTime < startTime) {
303 continue;
304 }
305 if (modTime / logRotationTime > (endTime / logRotationTime + 1)) {
306 continue;
307 }
308 fileList.add(new FileInfo(file.getAbsolutePath(), modTime));
309 }
310 }
311 Collections.sort(fileList);
312 return fileList;
313 }
314
315 /**
316 * Returns the creation time of the .gz archive if it is relevant to the job
317 *
318 * @param fileName
319 * @param startTime
320 * @param endTime
321 * @return Modification time of .gz file after checking if it is relevant to the job
322 */
323 private long getGZFileCreationTime(String fileName, long startTime, long endTime) {
324 long returnVal = -1;
325 int dateStartIndex = 10;
326 String[] dateDetails;
327 dateDetails = fileName.substring(dateStartIndex, fileName.length() - 3).split("-");
328 int year = Integer.parseInt(dateDetails[0]);
329 int month = Integer.parseInt(dateDetails[1]);
330 int day = Integer.parseInt(dateDetails[2]);
331 int hour = Integer.parseInt(dateDetails[3]);
332 int minute = 0;
333 Calendar calendarEntry = Calendar.getInstance();
334 calendarEntry.set(year, month - 1, day, hour, minute); // give month-1(Say, 7 for August)
335 long logFileStartTime = calendarEntry.getTimeInMillis();
336 long milliSecondsPerHour = 3600000;
337 long logFileEndTime = logFileStartTime + milliSecondsPerHour;
338 if ((startTime >= logFileStartTime && startTime <= logFileEndTime)
339 || (endTime >= logFileStartTime && endTime <= logFileEndTime)) {
340 returnVal = logFileStartTime;
341 }
342 return returnVal;
343 }
344 }