001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.service; 019 020import java.io.BufferedReader; 021 022import org.apache.oozie.util.Instrumentable; 023import org.apache.oozie.util.Instrumentation; 024import org.apache.oozie.util.XLogStreamer; 025 026import java.io.IOException; 027import java.io.Writer; 028import java.util.ArrayList; 029import java.util.Date; 030import java.util.List; 031import java.util.Map; 032import java.util.TreeMap; 033 034import org.apache.curator.x.discovery.ServiceInstance; 035import org.apache.oozie.ErrorCode; 036import org.apache.oozie.client.OozieClient; 037import org.apache.oozie.client.rest.RestConstants; 038import org.apache.oozie.util.AuthUrlClient; 039import org.apache.oozie.util.XLogFilter; 040import org.apache.oozie.util.SimpleTimestampedMessageParser; 041import org.apache.oozie.util.TimestampedMessageParser; 042import org.apache.oozie.util.XLog; 043import org.apache.oozie.util.ZKUtils; 044 045/** 046 * Service that performs streaming of log files over Web Services if enabled in XLogService and collates logs from other Oozie 047 * servers. Requires that a ZooKeeper ensemble is available. 048 */ 049public class ZKXLogStreamingService extends XLogStreamingService implements Service, Instrumentable { 050 051 private ZKUtils zk; 052 private XLog log; 053 054 /** 055 * Initialize the log streaming service. 056 * 057 * @param services services instance. 058 * @throws ServiceException thrown if the log streaming service could not be initialized. 059 */ 060 @Override 061 public void init(Services services) throws ServiceException { 062 super.init(services); 063 try { 064 zk = ZKUtils.register(this); 065 } 066 catch (Exception ex) { 067 throw new ServiceException(ErrorCode.E1700, ex.getMessage(), ex); 068 } 069 log = XLog.getLog(this.getClass()); 070 } 071 072 /** 073 * Destroy the log streaming service. 074 */ 075 @Override 076 public void destroy() { 077 if (zk != null) { 078 zk.unregister(this); 079 } 080 zk = null; 081 super.destroy(); 082 } 083 084 /** 085 * Instruments the log streaming service. 086 * 087 * @param instr instrumentation to use. 088 */ 089 @Override 090 public void instrument(Instrumentation instr) { 091 super.instrument(instr); 092 } 093 094 /** 095 * Stream the log of a job. It contacts any other running Oozie servers to collate relevant logs while streaming. 096 * 097 * @param filter log streamer filter. 098 * @param startTime start time for log events to filter. 099 * @param endTime end time for log events to filter. 100 * @param writer writer to stream the log to. 101 * @param params additional parameters from the request 102 * @throws IOException thrown if the log cannot be streamed. 103 */ 104 @Override 105 public void streamLog(XLogFilter filter, Date startTime, Date endTime, Writer writer, Map<String, String[]> params) 106 throws IOException { 107 XLogService xLogService = Services.get().get(XLogService.class); 108 if (xLogService.getLogOverWS()) { 109 // If ALL_SERVERS_PARAM is set to false, then only stream our log 110 if (!Services.get().get(JobsConcurrencyService.class).isAllServerRequest(params)) { 111 new XLogStreamer(filter, xLogService.getOozieLogPath(), xLogService.getOozieLogName(), 112 xLogService.getOozieLogRotation()).streamLog(writer, startTime, endTime, bufferLen); 113 } 114 // Otherwise, we have to go collate relevant logs from the other Oozie servers 115 else { 116 collateLogs(filter, startTime, endTime, writer, params, xLogService.getOozieLogPath(), 117 xLogService.getOozieLogName(), xLogService.getOozieLogRotation(), RestConstants.JOB_SHOW_LOG); 118 } 119 } 120 else { 121 writer.write("Log streaming disabled!!"); 122 } 123 } 124 125 /** 126 * Stream the error log of a job. It contacts any other running Oozie servers to collate relevant error logs while streaming. 127 * 128 * @param filter log streamer filter. 129 * @param startTime start time for log events to filter. 130 * @param endTime end time for log events to filter. 131 * @param writer writer to stream the log to. 132 * @param params additional parameters from the request 133 * @throws IOException thrown if the log cannot be streamed. 134 */ 135 public void streamErrorLog(XLogFilter filter, Date startTime, Date endTime, Writer writer, Map<String, String[]> params) 136 throws IOException { 137 XLogService xLogService = Services.get().get(XLogService.class); 138 if (xLogService.isErrorLogEnabled()) { 139 // If ALL_SERVERS_PARAM is set to false, then only stream our log 140 if (!Services.get().get(JobsConcurrencyService.class).isAllServerRequest(params)) { 141 new XLogStreamer(filter, xLogService.getOozieErrorLogPath(), xLogService.getOozieErrorLogName(), 142 xLogService.getOozieErrorLogRotation()).streamLog(writer, startTime, endTime, bufferLen); 143 } 144 // Otherwise, we have to go collate relevant logs from the other Oozie servers 145 else { 146 collateLogs(filter, startTime, endTime, writer, params, xLogService.getOozieLogPath(), 147 xLogService.getOozieErrorLogName(), xLogService.getOozieErrorLogRotation(), 148 RestConstants.JOB_SHOW_ERROR_LOG); 149 } 150 } 151 else { 152 writer.write("Error Log streaming disabled!!"); 153 } 154 } 155 156 /** 157 * Stream the audit log of a job. It contacts any other running Oozie servers to collate relevant audit logs while streaming. 158 * 159 * @param filter log streamer filter. 160 * @param startTime start time for log events to filter. 161 * @param endTime end time for log events to filter. 162 * @param writer writer to stream the log to. 163 * @param params additional parameters from the request 164 * @throws IOException thrown if the log cannot be streamed. 165 */ 166 @Override 167 public void streamAuditLog(XLogFilter filter, Date startTime, Date endTime, Writer writer, Map<String, String[]> params) 168 throws IOException { 169 XLogService xLogService = Services.get().get(XLogService.class); 170 if (xLogService.isAuditLogEnabled()) { 171 // If ALL_SERVERS_PARAM is set to false, then only stream our log 172 if (!Services.get().get(JobsConcurrencyService.class).isAllServerRequest(params)) { 173 new XLogStreamer(filter, xLogService.getOozieAuditLogPath(), xLogService.getOozieAuditLogName(), 174 xLogService.getOozieAuditLogRotation()).streamLog(writer, startTime, endTime, bufferLen); 175 } 176 // Otherwise, we have to go collate relevant logs from the other Oozie servers 177 else { 178 collateLogs(filter, startTime, endTime, writer, params, xLogService.getOozieAuditLogPath(), 179 xLogService.getOozieAuditLogName(), xLogService.getOozieAuditLogRotation(), 180 RestConstants.JOB_SHOW_AUDIT_LOG); 181 } 182 } 183 else { 184 writer.write("Audit Log streaming disabled!!"); 185 } 186 } 187 188 189 190 /** 191 * Contacts each of the other Oozie servers, gets their logs for the job, collates them, and sends them to the user via the 192 * Writer. It will make sure to not read all of the log messages into memory at the same time to not use up the heap. If there 193 * is a problem talking to one of the other servers, it will ignore that server and prepend a message to the Writer about it. 194 * For getting the logs from this server, it won't use the REST API and instead get them directly to be more efficient. 195 * 196 * @param filter the job filter 197 * @param startTime the job start time 198 * @param endTime the job end time 199 * @param writer the writer 200 * @param params the params 201 * @param logPath the log path 202 * @param logName the log name 203 * @param rotation the rotation 204 * @param logType the log type 205 * @throws IOException Signals that an I/O exception has occurred. 206 */ 207 private void collateLogs(XLogFilter filter, Date startTime, Date endTime, Writer writer, 208 Map<String, String[]> params, String logPath, String logName, int rotation, final String logType) throws IOException { 209 XLogService xLogService = Services.get().get(XLogService.class); 210 List<String> badOozies = new ArrayList<String>(); 211 List<ServiceInstance<Map>> oozies = null; 212 try { 213 oozies = zk.getAllMetaData(); 214 } 215 catch (Exception ex) { 216 throw new IOException("Issue communicating with ZooKeeper: " + ex.getMessage(), ex); 217 } 218 List<TimestampedMessageParser> parsers = new ArrayList<TimestampedMessageParser>(oozies.size()); 219 try { 220 // Create a BufferedReader for getting the logs of each server and put them in a TimestampedMessageParser 221 for (ServiceInstance<Map> oozie : oozies) { 222 Map<String, String> oozieMeta = oozie.getPayload(); 223 String otherId = oozieMeta.get(ZKUtils.ZKMetadataKeys.OOZIE_ID); 224 // If it's this server, we can just get them directly 225 if (otherId.equals(zk.getZKId())) { 226 BufferedReader reader = new XLogStreamer(filter, logPath, logName, rotation).makeReader(startTime, 227 endTime); 228 parsers.add(new TimestampedMessageParser(reader, filter)); 229 } 230 // If it's another server, we'll have to use the REST API 231 else { 232 String otherUrl = oozieMeta.get(ZKUtils.ZKMetadataKeys.OOZIE_URL); 233 String jobId = filter.getFilterParams().get(DagXLogInfoService.JOB); 234 try { 235 // It's important that we specify ALL_SERVERS_PARAM=false in the GET request to prevent the other Oozie 236 // Server from trying aggregate logs from the other Oozie servers (and creating an infinite recursion) 237 final String url = otherUrl + "/v" + OozieClient.WS_PROTOCOL_VERSION + "/" + RestConstants.JOB 238 + "/" + jobId + "?" + RestConstants.JOB_SHOW_PARAM + "=" + logType 239 + "&" + RestConstants.ALL_SERVER_REQUEST + "=false" + AuthUrlClient.getQueryParamString(params); 240 241 BufferedReader reader = AuthUrlClient.callServer(url); 242 parsers.add(new SimpleTimestampedMessageParser(reader, filter)); 243 } 244 catch(IOException ioe) { 245 log.warn("Failed to retrieve logs for job [" + jobId + "] from Oozie server with ID [" + otherId 246 + "] at [" + otherUrl + "]; log information may be incomplete", ioe); 247 badOozies.add(otherId); 248 } 249 } 250 } 251 252 //If log param debug is set, we need to write start date and end date to outputstream. 253 if(filter.isDebugMode()){ 254 writer.write(filter.getDebugMessage()); 255 } 256 257 // Add a message about any servers we couldn't contact 258 if (!badOozies.isEmpty()) { 259 writer.write("Unable to contact the following Oozie Servers for logs (log information may be incomplete):\n"); 260 for (String badOozie : badOozies) { 261 writer.write(" "); 262 writer.write(badOozie); 263 writer.write("\n"); 264 } 265 writer.write("\n"); 266 writer.flush(); 267 } 268 269 // If it's just the one server (this server), then we don't need to do any more processing and can just copy it directly 270 if (parsers.size() == 1) { 271 TimestampedMessageParser parser = parsers.get(0); 272 parser.processRemaining(writer, bufferLen); 273 } 274 else { 275 // Now that we have a Reader for each server to get the logs from that server, we have to collate them. Within each 276 // server, the logs should already be in the correct order, so we can take advantage of that. We'll use the 277 // BufferedReaders to read the messages from the logs of each server and put them in order without having to bring 278 // every message into memory at the same time. 279 TreeMap<String, TimestampedMessageParser> timestampMap = new TreeMap<String, TimestampedMessageParser>(); 280 // populate timestampMap with initial values 281 for (TimestampedMessageParser parser : parsers) { 282 if (parser.increment()) { 283 timestampMap.put(parser.getLastTimestamp(), parser); 284 } 285 } 286 int bytesWritten = 0; 287 while (timestampMap.size() > 1) { 288 // The first entry will be the earliest based on the timestamp (also removes it) from the map 289 TimestampedMessageParser earliestParser = timestampMap.pollFirstEntry().getValue(); 290 // Write the message from that parser at that timestamp 291 writer.write(earliestParser.getLastMessage()); 292 bytesWritten = earliestParser.getLastMessage().length(); 293 if (bytesWritten > bufferLen) { 294 writer.flush(); 295 bytesWritten = 0; 296 } 297 // Increment that parser to read the next message 298 if (earliestParser.increment()) { 299 // If it still has messages left, put it back in the map with the new last timestamp for it 300 timestampMap.put(earliestParser.getLastTimestamp(), earliestParser); 301 } 302 } 303 // If there's only one parser left in the map, then we can simply copy the rest of its lines directly to be faster 304 if (timestampMap.size() == 1) { 305 TimestampedMessageParser parser = timestampMap.values().iterator().next(); 306 writer.write(parser.getLastMessage()); // don't forget the last message read by the parser 307 parser.processRemaining(writer, bufferLen, bytesWritten + parser.getLastMessage().length()); 308 } 309 } 310 } 311 finally { 312 for (TimestampedMessageParser parser : parsers) { 313 parser.closeReader(); 314 } 315 writer.flush(); 316 } 317 } 318}