001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.util;
020
021import java.io.File;
022import java.io.IOException;
023import java.io.Writer;
024import java.util.ArrayList;
025import java.util.Calendar;
026import java.util.Collections;
027import java.util.Date;
028import java.util.Map;
029import java.util.regex.Matcher;
030import java.util.regex.Pattern;
031import java.io.BufferedReader;
032
033import org.apache.commons.lang.StringUtils;
034import org.apache.oozie.client.rest.RestConstants;
035import org.apache.oozie.command.CommandException;
036import org.apache.oozie.service.ConfigurationService;
037import org.apache.oozie.service.Service;
038import org.apache.oozie.service.Services;
039import org.apache.oozie.service.XLogService;
040
041/**
042 * XLogStreamer streams the given log file to writer after applying the given filter.
043 */
044public class XLogStreamer {
045    private static XLog LOG = XLog.getLog(XLogStreamer.class);
046    protected static final String CONF_PREFIX = Service.CONF_PREFIX + "XLogStreamingService.";
047    public static final String STREAM_BUFFER_LEN = CONF_PREFIX + "buffer.len";
048
049    private String logFile;
050    private String logPath;
051    protected XLogFilter logFilter;
052    private long logRotation;
053    Map<String, String[]> requestParam;
054    protected int totalDataWritten;
055    protected int bufferLen;
056
057    public XLogStreamer(XLogFilter logFilter, String logPath, String logFile, long logRotationSecs) {
058        if (logFile == null) {
059            logFile = "oozie-app.log";
060        }
061
062        this.logFilter = logFilter;
063        this.logFile = logFile;
064        this.logPath = logPath;
065        this.logRotation = logRotationSecs * 1000l;
066        bufferLen = ConfigurationService.getInt(STREAM_BUFFER_LEN, 4096);
067    }
068
069    public XLogStreamer(XLogFilter logFilter) {
070        this(logFilter, Services.get().get(XLogService.class).getOozieLogPath(), Services.get().get(XLogService.class)
071                .getOozieLogName(), Services.get().get(XLogService.class).getOozieLogRotation());
072
073    }
074
075    public XLogStreamer(XLogFilter logFilter, Map<String, String[]> params) {
076        this(logFilter, Services.get().get(XLogService.class).getOozieLogPath(), Services.get().get(XLogService.class)
077                .getOozieLogName(), Services.get().get(XLogService.class).getOozieLogRotation());
078        this.requestParam = params;
079
080    }
081
082
083    public XLogStreamer(Map<String, String[]> params) throws CommandException {
084        this(new XLogFilter(new XLogUserFilterParam(params)));
085        this.requestParam = params;
086    }
087
088    /**
089     * Gets the files that are modified between startTime and endTime in the given logPath and streams the log after
090     * applying the filters.
091     *
092     * @param writer the target writer
093     * @param startTime the start time
094     * @param endTime the end time
095     * @throws IOException Signals that an I/O exception has occurred.
096     */
097    public void streamLog(Writer writer, Date startTime, Date endTime) throws IOException {
098        streamLog(writer, startTime, endTime, true);
099    }
100
101    /**
102     * Gets the files that are modified between startTime and endTime in the given logPath and streams the log after
103     * applying the filters
104     *
105     * @param writer the writer
106     * @param startTime the start time
107     * @param endTime the end time
108     * @param appendDebug the append debug
109     * @throws IOException Signals that an I/O exception has occurred.
110     */
111    public void streamLog(Writer writer, Date startTime, Date endTime, boolean appendDebug) throws IOException {
112        // Get a Reader for the log file(s)
113        BufferedReader reader = new BufferedReader(getReader(startTime, endTime));
114        try {
115            if (appendDebug) {
116                if (!StringUtils.isEmpty(logFilter.getTruncatedMessage())) {
117                    writer.write(logFilter.getTruncatedMessage());
118                }
119                if (logFilter.isDebugMode()) {
120                    writer.write(logFilter.getDebugMessage());
121                }
122            }
123            // Process the entire logs from the reader using the logFilter
124            new TimestampedMessageParser(reader, logFilter).processRemaining(writer, this);
125        }
126        finally {
127            reader.close();
128        }
129    }
130
131    /**
132     * Returns a BufferedReader configured to read the log files based on the given startTime and endTime.
133     *
134     * @param startTime the start time
135     * @param endTime the end time
136     * @return A BufferedReader for the log files
137     * @throws IOException Signals that an I/O exception has occurred.
138     */
139
140    private MultiFileReader getReader(Date startTime, Date endTime) throws IOException {
141        calculateAndValidateDateRange(startTime, endTime);
142        return new MultiFileReader(getFileList(logFilter.getStartDate(), logFilter.getEndDate()));
143    }
144
145    protected void calculateAndValidateDateRange(Date startTime, Date endTime) throws IOException {
146        logFilter.calculateAndCheckDates(startTime, endTime);
147        logFilter.validateDateRange(startTime, endTime);
148    }
149
150    public BufferedReader makeReader(Date startTime, Date endTime) throws IOException {
151        return new BufferedReader(getReader(startTime, endTime));
152    }
153
154    /**
155     * Gets the log file list for specific date range.
156     *
157     * @param startTime the start time
158     * @param endTime the end time
159     * @return log file list
160     * @throws IOException Signals that an I/O exception has occurred.
161     */
162    private ArrayList<File> getFileList(Date startTime, Date endTime) throws IOException {
163        long startTimeMillis = 0;
164        long endTimeMillis;
165        if (startTime != null) {
166            startTimeMillis = startTime.getTime();
167        }
168        if (endTime == null) {
169            endTimeMillis = System.currentTimeMillis();
170        }
171        else {
172            endTimeMillis = endTime.getTime();
173        }
174        File dir = new File(logPath);
175        return getFileList(dir, startTimeMillis, endTimeMillis, logRotation, logFile);
176    }
177
178    /**
179     * File along with the modified time which will be used to sort later.
180     */
181    public class FileInfo implements Comparable<FileInfo> {
182        File file;
183        long modTime;
184
185        public FileInfo(File file, long modTime) {
186            this.file = file;
187            this.modTime = modTime;
188        }
189
190        public File getFile() {
191            return file;
192        }
193
194        public long getModTime() {
195            return modTime;
196        }
197
198        public int compareTo(FileInfo fileInfo) {
199            long diff = this.modTime - fileInfo.modTime;
200            if (diff > 0) {
201                return 1;
202            }
203            else if (diff < 0) {
204                return -1;
205            }
206            else {
207                return 0;
208            }
209        }
210    }
211
212    /**
213     * Gets the file list that will have the logs between startTime and endTime.
214     *
215     * @param dir the directory to list
216     * @param startTime the start time
217     * @param endTime the end time
218     * @param logRotationTime the log rotation time
219     * @param logFile the file to look up
220     * @return List of files to be streamed
221     */
222    private ArrayList<File> getFileList(File dir, long startTime, long endTime, long logRotationTime, String logFile) {
223        String[] children = dir.list();
224        ArrayList<FileInfo> fileList = new ArrayList<FileInfo>();
225        if (children == null) {
226            return new ArrayList<File>();
227        }
228        else {
229            for (int i = 0; i < children.length; i++) {
230                String fileName = children[i];
231                if (!fileName.startsWith(logFile) && !fileName.equals(logFile)) {
232                    continue;
233                }
234                File file = new File(dir.getAbsolutePath(), fileName);
235                if (fileName.endsWith(".gz")) {
236                    long gzFileCreationTime = getGZFileCreationTime(fileName, startTime, endTime);
237                    if (gzFileCreationTime != -1) {
238                        fileList.add(new FileInfo(file, gzFileCreationTime));
239                    }
240                    continue;
241                }
242                long modTime = file.lastModified();
243                if (modTime < startTime) {
244                    continue;
245                }
246                if (modTime / logRotationTime > (endTime / logRotationTime + 1)) {
247                    continue;
248                }
249                fileList.add(new FileInfo(file, modTime));
250            }
251        }
252        Collections.sort(fileList);
253        ArrayList<File> files = new ArrayList<File>(fileList.size());
254        for (FileInfo info : fileList) {
255            files.add(info.getFile());
256        }
257        return files;
258    }
259
260    /**
261     * This pattern matches the end of a gzip filename to have a format like "-YYYY-MM-dd-HH.gz" with capturing groups for each part
262     * of the date
263     */
264    public static final Pattern gzTimePattern = Pattern.compile(".*-(\\d\\d\\d\\d)-(\\d\\d)-(\\d\\d)-(\\d\\d)\\.gz");
265
266    /**
267     * Returns the creation time of the .gz archive if it is relevant to the job
268     *
269     * @param fileName
270     * @param startTime
271     * @param endTime
272     * @return Modification time of .gz file after checking if it is relevant to the job
273     */
274    private long getGZFileCreationTime(String fileName, long startTime, long endTime) {
275        // Default return value of -1 to exclude the file
276        long returnVal = -1;
277
278        // Include oozie.log as oozie.log.gz if it is accidentally GZipped
279        if (fileName.equals("oozie.log.gz")) {
280            LOG.warn("oozie.log has been GZipped, which is unexpected");
281            // Return a value other than -1 to include the file in list
282            returnVal = 0;
283        }
284        else {
285            Matcher m = gzTimePattern.matcher(fileName);
286            if (m.matches() && m.groupCount() == 4) {
287                int year = Integer.parseInt(m.group(1));
288                int month = Integer.parseInt(m.group(2));
289                int day = Integer.parseInt(m.group(3));
290                int hour = Integer.parseInt(m.group(4));
291                int minute = 0;
292                Calendar calendarEntry = Calendar.getInstance();
293                calendarEntry.set(year, month - 1, day, hour, minute); // give month-1(Say, 7 for August)
294                long logFileStartTime = calendarEntry.getTimeInMillis();
295                long milliSecondsPerHour = 3600000;
296                long logFileEndTime = logFileStartTime + milliSecondsPerHour;
297                /*  To check whether the log content is there in the initial or later part of the log file or
298                    the log content is contained entirely within this log file or
299                    the entire log file contains the event log where the event spans across hours
300                */
301                if ((startTime >= logFileStartTime && startTime <= logFileEndTime)
302                        || (endTime >= logFileStartTime && endTime <= logFileEndTime)
303                        || (startTime <= logFileStartTime && endTime >= logFileEndTime)) {
304                    returnVal = logFileStartTime;
305                }
306            }
307            else {
308                LOG.debug("Filename " + fileName + " does not match the expected format");
309                returnVal = -1;
310            }
311        }
312        return returnVal;
313    }
314
315    public boolean isLogEnabled() {
316        return Services.get().get(XLogService.class).getLogOverWS();
317    }
318
319    public String getLogType() {
320        return RestConstants.JOB_SHOW_LOG;
321    }
322
323    public XLogFilter getXLogFilter() {
324        return logFilter;
325    }
326
327    public String getLogDisableMessage() {
328        return "Log streaming disabled!!";
329    }
330
331    public Map<String, String[]> getRequestParam() {
332        return requestParam;
333    }
334
335    public boolean shouldFlushOutput(int writtenBytes) {
336        this.totalDataWritten += writtenBytes;
337        if (this.totalDataWritten > getBufferLen()) {
338            this.totalDataWritten = 0;
339            return true;
340        }
341        else {
342            return false;
343        }
344    }
345
346    public int getBufferLen() {
347        return bufferLen;
348    }
349}