001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.service;
020
021import org.apache.oozie.util.XLogFilter;
022import org.apache.oozie.util.Instrumentable;
023import org.apache.oozie.util.Instrumentation;
024import org.apache.oozie.util.XLogStreamer;
025
026import java.io.IOException;
027import java.io.Writer;
028import java.util.Map;
029import java.util.Date;
030
031/**
032 * Service that performs streaming of log files over Web Services if enabled in XLogService
033 */
034public class XLogStreamingService implements Service, Instrumentable {
035    private static final String CONF_PREFIX = Service.CONF_PREFIX + "XLogStreamingService.";
036    public static final String STREAM_BUFFER_LEN = CONF_PREFIX + "buffer.len";
037
038    protected int bufferLen;
039
040    /**
041     * Initialize the log streaming service.
042     *
043     * @param services services instance.
044     * @throws ServiceException thrown if the log streaming service could not be initialized.
045     */
046    public void init(Services services) throws ServiceException {
047        bufferLen = ConfigurationService.getInt(services.getConf(), STREAM_BUFFER_LEN);
048    }
049
050    /**
051     * Destroy the log streaming service.
052     */
053    public void destroy() {
054    }
055
056    /**
057     * Return the public interface for log streaming service.
058     *
059     * @return {@link XLogStreamingService}.
060     */
061    public Class<? extends Service> getInterface() {
062        return XLogStreamingService.class;
063    }
064
065    /**
066     * Instruments the log streaming service.
067     *
068     * @param instr instrumentation to use.
069     */
070    public void instrument(Instrumentation instr) {
071        // nothing to instrument
072    }
073
074    /**
075     * Stream the log of a job.
076     *
077     * @param filter log streamer filter.
078     * @param startTime start time for log events to filter.
079     * @param endTime end time for log events to filter.
080     * @param writer writer to stream the log to.
081     * @param params additional parameters from the request
082     * @throws IOException thrown if the log cannot be streamed.
083     */
084    public void streamLog(XLogFilter filter, Date startTime, Date endTime, Writer writer, Map<String, String[]> params)
085            throws IOException {
086        XLogService xLogService = Services.get().get(XLogService.class);
087        if (xLogService.getLogOverWS()) {
088            new XLogStreamer(filter, xLogService.getOozieLogPath(), xLogService.getOozieLogName(),
089                    xLogService.getOozieLogRotation()).streamLog(writer, startTime, endTime, bufferLen);
090        }
091        else {
092            writer.write("Log streaming disabled!!");
093        }
094    }
095
096    /**
097     * Stream the error log of a job.
098     *
099     * @param filter log streamer filter.
100     * @param startTime start time for log events to filter.
101     * @param endTime end time for log events to filter.
102     * @param writer writer to stream the log to.
103     * @param params additional parameters from the request
104     * @throws IOException thrown if the log cannot be streamed.
105     */
106    public void streamErrorLog(XLogFilter filter, Date startTime, Date endTime, Writer writer, Map<String, String[]> params)
107            throws IOException {
108        XLogService xLogService = Services.get().get(XLogService.class);
109        if (xLogService.isErrorLogEnabled()) {
110            new XLogStreamer(filter, xLogService.getOozieErrorLogPath(), xLogService.getOozieErrorLogName(),
111                    xLogService.getOozieErrorLogRotation()).streamLog(writer, startTime, endTime, bufferLen);
112        }
113        else {
114            writer.write("Error Log is disabled!!");
115        }
116    }
117
118    /**
119     * Stream the audit log of a job.
120     *
121     * @param filter log streamer filter.
122     * @param startTime start time for log events to filter.
123     * @param endTime end time for log events to filter.
124     * @param writer writer to stream the log to.
125     * @param params additional parameters from the request
126     * @throws IOException thrown if the log cannot be streamed.
127     */
128    public void streamAuditLog(XLogFilter filter, Date startTime, Date endTime, Writer writer, Map<String, String[]> params)
129            throws IOException {
130        XLogService xLogService = Services.get().get(XLogService.class);
131        if (xLogService.isAuditLogEnabled()) {
132            new XLogStreamer(filter, xLogService.getOozieAuditLogPath(), xLogService.getOozieAuditLogName(),
133                    xLogService.getOozieAuditLogRotation()).streamLog(writer, startTime, endTime, bufferLen);
134        }
135        else {
136            writer.write("Audit Log is disabled!!");
137        }
138    }
139
140
141
142    public int getBufferLen() {
143        return bufferLen;
144    }
145}