001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.service;
019
020import org.apache.oozie.util.XLogFilter;
021import org.apache.oozie.util.Instrumentable;
022import org.apache.oozie.util.Instrumentation;
023import org.apache.oozie.util.XLogStreamer;
024
025import java.io.IOException;
026import java.io.Writer;
027import java.util.Map;
028import java.util.Date;
029
030/**
031 * Service that performs streaming of log files over Web Services if enabled in XLogService
032 */
033public class XLogStreamingService implements Service, Instrumentable {
034    private static final String CONF_PREFIX = Service.CONF_PREFIX + "XLogStreamingService.";
035    public static final String STREAM_BUFFER_LEN = CONF_PREFIX + "buffer.len";
036
037    protected int bufferLen;
038
039    /**
040     * Initialize the log streaming service.
041     *
042     * @param services services instance.
043     * @throws ServiceException thrown if the log streaming service could not be initialized.
044     */
045    public void init(Services services) throws ServiceException {
046        bufferLen = services.getConf().getInt(STREAM_BUFFER_LEN, 4096);
047    }
048
049    /**
050     * Destroy the log streaming service.
051     */
052    public void destroy() {
053    }
054
055    /**
056     * Return the public interface for log streaming service.
057     *
058     * @return {@link XLogStreamingService}.
059     */
060    public Class<? extends Service> getInterface() {
061        return XLogStreamingService.class;
062    }
063
064    /**
065     * Instruments the log streaming service.
066     *
067     * @param instr instrumentation to use.
068     */
069    public void instrument(Instrumentation instr) {
070        // nothing to instrument
071    }
072
073    /**
074     * Stream the log of a job.
075     *
076     * @param filter log streamer filter.
077     * @param startTime start time for log events to filter.
078     * @param endTime end time for log events to filter.
079     * @param writer writer to stream the log to.
080     * @param params additional parameters from the request
081     * @throws IOException thrown if the log cannot be streamed.
082     */
083    public void streamLog(XLogFilter filter, Date startTime, Date endTime, Writer writer, Map<String, String[]> params)
084            throws IOException {
085        XLogService xLogService = Services.get().get(XLogService.class);
086        if (xLogService.getLogOverWS()) {
087            new XLogStreamer(filter, xLogService.getOozieLogPath(), xLogService.getOozieLogName(),
088                    xLogService.getOozieLogRotation()).streamLog(writer, startTime, endTime, bufferLen);
089        }
090        else {
091            writer.write("Log streaming disabled!!");
092        }
093    }
094
095    public int getBufferLen() {
096        return bufferLen;
097    }
098}