001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.service; 020 021import org.apache.oozie.util.XLogFilter; 022import org.apache.oozie.util.Instrumentable; 023import org.apache.oozie.util.Instrumentation; 024import org.apache.oozie.util.XLogStreamer; 025 026import java.io.IOException; 027import java.io.Writer; 028import java.util.Map; 029import java.util.Date; 030 031/** 032 * Service that performs streaming of log files over Web Services if enabled in XLogService 033 */ 034public class XLogStreamingService implements Service, Instrumentable { 035 private static final String CONF_PREFIX = Service.CONF_PREFIX + "XLogStreamingService."; 036 public static final String STREAM_BUFFER_LEN = CONF_PREFIX + "buffer.len"; 037 038 protected int bufferLen; 039 040 /** 041 * Initialize the log streaming service. 042 * 043 * @param services services instance. 044 * @throws ServiceException thrown if the log streaming service could not be initialized. 045 */ 046 public void init(Services services) throws ServiceException { 047 bufferLen = ConfigurationService.getInt(services.getConf(), STREAM_BUFFER_LEN); 048 } 049 050 /** 051 * Destroy the log streaming service. 052 */ 053 public void destroy() { 054 } 055 056 /** 057 * Return the public interface for log streaming service. 058 * 059 * @return {@link XLogStreamingService}. 060 */ 061 public Class<? extends Service> getInterface() { 062 return XLogStreamingService.class; 063 } 064 065 /** 066 * Instruments the log streaming service. 067 * 068 * @param instr instrumentation to use. 069 */ 070 public void instrument(Instrumentation instr) { 071 // nothing to instrument 072 } 073 074 /** 075 * Stream the log of a job. 076 * 077 * @param filter log streamer filter. 078 * @param startTime start time for log events to filter. 079 * @param endTime end time for log events to filter. 080 * @param writer writer to stream the log to. 081 * @param params additional parameters from the request 082 * @throws IOException thrown if the log cannot be streamed. 083 */ 084 public void streamLog(XLogFilter filter, Date startTime, Date endTime, Writer writer, Map<String, String[]> params) 085 throws IOException { 086 XLogService xLogService = Services.get().get(XLogService.class); 087 if (xLogService.getLogOverWS()) { 088 new XLogStreamer(filter, xLogService.getOozieLogPath(), xLogService.getOozieLogName(), 089 xLogService.getOozieLogRotation()).streamLog(writer, startTime, endTime, bufferLen); 090 } 091 else { 092 writer.write("Log streaming disabled!!"); 093 } 094 } 095 096 /** 097 * Stream the error log of a job. 098 * 099 * @param filter log streamer filter. 100 * @param startTime start time for log events to filter. 101 * @param endTime end time for log events to filter. 102 * @param writer writer to stream the log to. 103 * @param params additional parameters from the request 104 * @throws IOException thrown if the log cannot be streamed. 105 */ 106 public void streamErrorLog(XLogFilter filter, Date startTime, Date endTime, Writer writer, Map<String, String[]> params) 107 throws IOException { 108 XLogService xLogService = Services.get().get(XLogService.class); 109 if (xLogService.isErrorLogEnabled()) { 110 new XLogStreamer(filter, xLogService.getOozieErrorLogPath(), xLogService.getOozieErrorLogName(), 111 xLogService.getOozieErrorLogRotation()).streamLog(writer, startTime, endTime, bufferLen); 112 } 113 else { 114 writer.write("Error Log is disabled!!"); 115 } 116 } 117 118 /** 119 * Stream the audit log of a job. 120 * 121 * @param filter log streamer filter. 122 * @param startTime start time for log events to filter. 123 * @param endTime end time for log events to filter. 124 * @param writer writer to stream the log to. 125 * @param params additional parameters from the request 126 * @throws IOException thrown if the log cannot be streamed. 127 */ 128 public void streamAuditLog(XLogFilter filter, Date startTime, Date endTime, Writer writer, Map<String, String[]> params) 129 throws IOException { 130 XLogService xLogService = Services.get().get(XLogService.class); 131 if (xLogService.isAuditLogEnabled()) { 132 new XLogStreamer(filter, xLogService.getOozieAuditLogPath(), xLogService.getOozieAuditLogName(), 133 xLogService.getOozieAuditLogRotation()).streamLog(writer, startTime, endTime, bufferLen); 134 } 135 else { 136 writer.write("Audit Log is disabled!!"); 137 } 138 } 139 140 141 142 public int getBufferLen() { 143 return bufferLen; 144 } 145}