001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.service; 019 020import java.io.BufferedReader; 021import java.io.IOException; 022import java.io.Writer; 023import java.net.URLEncoder; 024import java.util.ArrayList; 025import java.util.Date; 026import java.util.List; 027import java.util.Map; 028import java.util.TreeMap; 029 030import org.apache.curator.x.discovery.ServiceInstance; 031import org.apache.oozie.ErrorCode; 032import org.apache.oozie.client.OozieClient; 033import org.apache.oozie.client.rest.RestConstants; 034import org.apache.oozie.util.AuthUrlClient; 035import org.apache.oozie.util.Instrumentable; 036import org.apache.oozie.util.Instrumentation; 037import org.apache.oozie.util.SimpleTimestampedMessageParser; 038import org.apache.oozie.util.TimestampedMessageParser; 039import org.apache.oozie.util.XLog; 040import org.apache.oozie.util.XLogFilter; 041import org.apache.oozie.util.XLogStreamer; 042import org.apache.oozie.util.ZKUtils; 043 044/** 045 * Service that performs streaming of log files over Web Services if enabled in XLogService and collates logs from other Oozie 046 * servers. Requires that a ZooKeeper ensemble is available. 047 */ 048public class ZKXLogStreamingService extends XLogStreamingService implements Service, Instrumentable { 049 050 private ZKUtils zk; 051 private XLog log; 052 053 /** 054 * Initialize the log streaming service. 055 * 056 * @param services services instance. 057 * @throws ServiceException thrown if the log streaming service could not be initialized. 058 */ 059 @Override 060 public void init(Services services) throws ServiceException { 061 super.init(services); 062 try { 063 zk = ZKUtils.register(this); 064 } 065 catch (Exception ex) { 066 throw new ServiceException(ErrorCode.E1700, ex.getMessage(), ex); 067 } 068 log = XLog.getLog(this.getClass()); 069 } 070 071 /** 072 * Destroy the log streaming service. 073 */ 074 @Override 075 public void destroy() { 076 if (zk != null) { 077 zk.unregister(this); 078 } 079 zk = null; 080 super.destroy(); 081 } 082 083 /** 084 * Instruments the log streaming service. 085 * 086 * @param instr instrumentation to use. 087 */ 088 @Override 089 public void instrument(Instrumentation instr) { 090 super.instrument(instr); 091 } 092 093 /** 094 * Stream the log of a job. It contacts any other running Oozie servers to collate relevant logs while streaming. 095 * 096 * @param filter log streamer filter. 097 * @param startTime start time for log events to filter. 098 * @param endTime end time for log events to filter. 099 * @param writer writer to stream the log to. 100 * @param params additional parameters from the request 101 * @throws IOException thrown if the log cannot be streamed. 102 */ 103 @Override 104 public void streamLog(XLogFilter filter, Date startTime, Date endTime, Writer writer, Map<String, String[]> params) 105 throws IOException { 106 XLogService xLogService = Services.get().get(XLogService.class); 107 if (xLogService.getLogOverWS()) { 108 // If ALL_SERVERS_PARAM is set to false, then only stream our log 109 if (!Services.get().get(JobsConcurrencyService.class).isAllServerRequest(params)) { 110 new XLogStreamer(filter, xLogService.getOozieLogPath(), xLogService.getOozieLogName(), 111 xLogService.getOozieLogRotation()).streamLog(writer, startTime, endTime, bufferLen); 112 } 113 // Otherwise, we have to go collate relevant logs from the other Oozie servers 114 else { 115 collateLogs(filter, startTime, endTime, writer, params, xLogService.getOozieLogPath(), 116 xLogService.getOozieLogName(), xLogService.getOozieLogRotation(), RestConstants.JOB_SHOW_LOG); 117 } 118 } 119 else { 120 writer.write("Log streaming disabled!!"); 121 } 122 } 123 124 /** 125 * Stream the error log of a job. It contacts any other running Oozie servers to collate relevant error logs while streaming. 126 * 127 * @param filter log streamer filter. 128 * @param startTime start time for log events to filter. 129 * @param endTime end time for log events to filter. 130 * @param writer writer to stream the log to. 131 * @param params additional parameters from the request 132 * @throws IOException thrown if the log cannot be streamed. 133 */ 134 public void streamErrorLog(XLogFilter filter, Date startTime, Date endTime, Writer writer, Map<String, String[]> params) 135 throws IOException { 136 XLogService xLogService = Services.get().get(XLogService.class); 137 if (xLogService.isErrorLogEnabled()) { 138 // If ALL_SERVERS_PARAM is set to false, then only stream our log 139 if (!Services.get().get(JobsConcurrencyService.class).isAllServerRequest(params)) { 140 new XLogStreamer(filter, xLogService.getOozieErrorLogPath(), xLogService.getOozieErrorLogName(), 141 xLogService.getOozieErrorLogRotation()).streamLog(writer, startTime, endTime, bufferLen); 142 } 143 // Otherwise, we have to go collate relevant logs from the other Oozie servers 144 else { 145 collateLogs(filter, startTime, endTime, writer, params, xLogService.getOozieLogPath(), 146 xLogService.getOozieErrorLogName(), xLogService.getOozieErrorLogRotation(), 147 RestConstants.JOB_SHOW_ERROR_LOG); 148 } 149 } 150 else { 151 writer.write("Error Log streaming disabled!!"); 152 } 153 } 154 155 /** 156 * Stream the audit log of a job. It contacts any other running Oozie servers to collate relevant audit logs while streaming. 157 * 158 * @param filter log streamer filter. 159 * @param startTime start time for log events to filter. 160 * @param endTime end time for log events to filter. 161 * @param writer writer to stream the log to. 162 * @param params additional parameters from the request 163 * @throws IOException thrown if the log cannot be streamed. 164 */ 165 @Override 166 public void streamAuditLog(XLogFilter filter, Date startTime, Date endTime, Writer writer, Map<String, String[]> params) 167 throws IOException { 168 XLogService xLogService = Services.get().get(XLogService.class); 169 if (xLogService.isAuditLogEnabled()) { 170 // If ALL_SERVERS_PARAM is set to false, then only stream our log 171 if (!Services.get().get(JobsConcurrencyService.class).isAllServerRequest(params)) { 172 new XLogStreamer(filter, xLogService.getOozieAuditLogPath(), xLogService.getOozieAuditLogName(), 173 xLogService.getOozieAuditLogRotation()).streamLog(writer, startTime, endTime, bufferLen); 174 } 175 // Otherwise, we have to go collate relevant logs from the other Oozie servers 176 else { 177 collateLogs(filter, startTime, endTime, writer, params, xLogService.getOozieAuditLogPath(), 178 xLogService.getOozieAuditLogName(), xLogService.getOozieAuditLogRotation(), 179 RestConstants.JOB_SHOW_AUDIT_LOG); 180 } 181 } 182 else { 183 writer.write("Audit Log streaming disabled!!"); 184 } 185 } 186 187 188 189 /** 190 * Contacts each of the other Oozie servers, gets their logs for the job, collates them, and sends them to the user via the 191 * Writer. It will make sure to not read all of the log messages into memory at the same time to not use up the heap. If there 192 * is a problem talking to one of the other servers, it will ignore that server and prepend a message to the Writer about it. 193 * For getting the logs from this server, it won't use the REST API and instead get them directly to be more efficient. 194 * 195 * @param filter the job filter 196 * @param startTime the job start time 197 * @param endTime the job end time 198 * @param writer the writer 199 * @param params the params 200 * @param logPath the log path 201 * @param logName the log name 202 * @param rotation the rotation 203 * @param logType the log type 204 * @throws IOException Signals that an I/O exception has occurred. 205 */ 206 private void collateLogs(XLogFilter filter, Date startTime, Date endTime, Writer writer, 207 Map<String, String[]> params, String logPath, String logName, int rotation, final String logType) throws IOException { 208 XLogService xLogService = Services.get().get(XLogService.class); 209 List<String> badOozies = new ArrayList<String>(); 210 List<ServiceInstance<Map>> oozies = null; 211 try { 212 oozies = zk.getAllMetaData(); 213 } 214 catch (Exception ex) { 215 throw new IOException("Issue communicating with ZooKeeper: " + ex.getMessage(), ex); 216 } 217 List<TimestampedMessageParser> parsers = new ArrayList<TimestampedMessageParser>(oozies.size()); 218 try { 219 // Create a BufferedReader for getting the logs of each server and put them in a TimestampedMessageParser 220 for (ServiceInstance<Map> oozie : oozies) { 221 Map<String, String> oozieMeta = oozie.getPayload(); 222 String otherId = oozieMeta.get(ZKUtils.ZKMetadataKeys.OOZIE_ID); 223 // If it's this server, we can just get them directly 224 if (otherId.equals(zk.getZKId())) { 225 BufferedReader reader = new XLogStreamer(filter, logPath, logName, rotation).makeReader(startTime, 226 endTime); 227 parsers.add(new TimestampedMessageParser(reader, filter)); 228 } 229 // If it's another server, we'll have to use the REST API 230 else { 231 String otherUrl = oozieMeta.get(ZKUtils.ZKMetadataKeys.OOZIE_URL); 232 String jobId = filter.getFilterParams().get(DagXLogInfoService.JOB); 233 try { 234 // It's important that we specify ALL_SERVERS_PARAM=false in the GET request to prevent the other Oozie 235 // Server from trying aggregate logs from the other Oozie servers (and creating an infinite recursion) 236 final String url = otherUrl + "/v" + OozieClient.WS_PROTOCOL_VERSION + "/" + RestConstants.JOB 237 + "/" + jobId + "?" + RestConstants.JOB_SHOW_PARAM + "=" + logType 238 + "&" + RestConstants.ALL_SERVER_REQUEST + "=false" + AuthUrlClient.getQueryParamString(params); 239 // remove doAs from url to avoid failure while fetching 240 // logs in case of HA mode 241 String key = "doAs"; 242 String[] value = params.get(key); 243 String urlWithoutdoAs = null; 244 if (value != null && value.length > 0 && value[0] != null && value[0].length() > 0) { 245 urlWithoutdoAs = url.replace("&" + key + "=" + URLEncoder.encode(value[0], "UTF-8"), ""); 246 } 247 else { 248 urlWithoutdoAs = url; 249 } 250 BufferedReader reader = AuthUrlClient.callServer(urlWithoutdoAs); 251 parsers.add(new SimpleTimestampedMessageParser(reader, filter)); 252 } 253 catch(IOException ioe) { 254 log.warn("Failed to retrieve logs for job [" + jobId + "] from Oozie server with ID [" + otherId 255 + "] at [" + otherUrl + "]; log information may be incomplete", ioe); 256 badOozies.add(otherId); 257 } 258 } 259 } 260 261 //If log param debug is set, we need to write start date and end date to outputstream. 262 if(filter.isDebugMode()){ 263 writer.write(filter.getDebugMessage()); 264 } 265 266 // Add a message about any servers we couldn't contact 267 if (!badOozies.isEmpty()) { 268 writer.write("Unable to contact the following Oozie Servers for logs (log information may be incomplete):\n"); 269 for (String badOozie : badOozies) { 270 writer.write(" "); 271 writer.write(badOozie); 272 writer.write("\n"); 273 } 274 writer.write("\n"); 275 writer.flush(); 276 } 277 278 // If it's just the one server (this server), then we don't need to do any more processing and can just copy it directly 279 if (parsers.size() == 1) { 280 TimestampedMessageParser parser = parsers.get(0); 281 parser.processRemaining(writer, bufferLen); 282 } 283 else { 284 // Now that we have a Reader for each server to get the logs from that server, we have to collate them. Within each 285 // server, the logs should already be in the correct order, so we can take advantage of that. We'll use the 286 // BufferedReaders to read the messages from the logs of each server and put them in order without having to bring 287 // every message into memory at the same time. 288 TreeMap<String, TimestampedMessageParser> timestampMap = new TreeMap<String, TimestampedMessageParser>(); 289 // populate timestampMap with initial values 290 for (TimestampedMessageParser parser : parsers) { 291 if (parser.increment()) { 292 timestampMap.put(parser.getLastTimestamp(), parser); 293 } 294 } 295 int bytesWritten = 0; 296 while (timestampMap.size() > 1) { 297 // The first entry will be the earliest based on the timestamp (also removes it) from the map 298 TimestampedMessageParser earliestParser = timestampMap.pollFirstEntry().getValue(); 299 // Write the message from that parser at that timestamp 300 writer.write(earliestParser.getLastMessage()); 301 bytesWritten = earliestParser.getLastMessage().length(); 302 if (bytesWritten > bufferLen) { 303 writer.flush(); 304 bytesWritten = 0; 305 } 306 // Increment that parser to read the next message 307 if (earliestParser.increment()) { 308 // If it still has messages left, put it back in the map with the new last timestamp for it 309 timestampMap.put(earliestParser.getLastTimestamp(), earliestParser); 310 } 311 } 312 // If there's only one parser left in the map, then we can simply copy the rest of its lines directly to be faster 313 if (timestampMap.size() == 1) { 314 TimestampedMessageParser parser = timestampMap.values().iterator().next(); 315 writer.write(parser.getLastMessage()); // don't forget the last message read by the parser 316 parser.processRemaining(writer, bufferLen, bytesWritten + parser.getLastMessage().length()); 317 } 318 } 319 } 320 finally { 321 for (TimestampedMessageParser parser : parsers) { 322 parser.closeReader(); 323 } 324 writer.flush(); 325 } 326 } 327}