001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.service; 019 020import java.io.BufferedReader; 021 022import org.apache.oozie.util.Instrumentable; 023import org.apache.oozie.util.Instrumentation; 024import org.apache.oozie.util.XLogStreamer; 025 026import java.io.IOException; 027import java.io.Writer; 028import java.util.ArrayList; 029import java.util.Date; 030import java.util.List; 031import java.util.Map; 032import java.util.TreeMap; 033 034import org.apache.curator.x.discovery.ServiceInstance; 035import org.apache.oozie.ErrorCode; 036import org.apache.oozie.client.OozieClient; 037import org.apache.oozie.client.rest.RestConstants; 038import org.apache.oozie.util.AuthUrlClient; 039import org.apache.oozie.util.XLogFilter; 040import org.apache.oozie.util.SimpleTimestampedMessageParser; 041import org.apache.oozie.util.TimestampedMessageParser; 042import org.apache.oozie.util.XLog; 043import org.apache.oozie.util.ZKUtils; 044 045/** 046 * Service that performs streaming of log files over Web Services if enabled in XLogService and collates logs from other Oozie 047 * servers. Requires that a ZooKeeper ensemble is available. 048 */ 049public class ZKXLogStreamingService extends XLogStreamingService implements Service, Instrumentable { 050 051 private ZKUtils zk; 052 private XLog log; 053 054 /** 055 * Initialize the log streaming service. 056 * 057 * @param services services instance. 058 * @throws ServiceException thrown if the log streaming service could not be initialized. 059 */ 060 @Override 061 public void init(Services services) throws ServiceException { 062 super.init(services); 063 try { 064 zk = ZKUtils.register(this); 065 } 066 catch (Exception ex) { 067 throw new ServiceException(ErrorCode.E1700, ex.getMessage(), ex); 068 } 069 log = XLog.getLog(this.getClass()); 070 } 071 072 /** 073 * Destroy the log streaming service. 074 */ 075 @Override 076 public void destroy() { 077 if (zk != null) { 078 zk.unregister(this); 079 } 080 zk = null; 081 super.destroy(); 082 } 083 084 /** 085 * Instruments the log streaming service. 086 * 087 * @param instr instrumentation to use. 088 */ 089 @Override 090 public void instrument(Instrumentation instr) { 091 super.instrument(instr); 092 } 093 094 /** 095 * Stream the log of a job. It contacts any other running Oozie servers to collate relevant logs while streaming. 096 * 097 * @param filter log streamer filter. 098 * @param startTime start time for log events to filter. 099 * @param endTime end time for log events to filter. 100 * @param writer writer to stream the log to. 101 * @param params additional parameters from the request 102 * @throws IOException thrown if the log cannot be streamed. 103 */ 104 @Override 105 public void streamLog(XLogFilter filter, Date startTime, Date endTime, Writer writer, Map<String, String[]> params) 106 throws IOException { 107 XLogService xLogService = Services.get().get(XLogService.class); 108 if (xLogService.getLogOverWS()) { 109 // If ALL_SERVERS_PARAM is set to false, then only stream our log 110 if (!Services.get().get(JobsConcurrencyService.class).isAllServerRequest(params)) { 111 new XLogStreamer(filter, xLogService.getOozieLogPath(), xLogService.getOozieLogName(), 112 xLogService.getOozieLogRotation()).streamLog(writer, startTime, endTime, bufferLen); 113 } 114 // Otherwise, we have to go collate relevant logs from the other Oozie servers 115 else { 116 collateLogs(filter, startTime, endTime, writer, params); 117 } 118 } 119 else { 120 writer.write("Log streaming disabled!!"); 121 } 122 } 123 124 /** 125 * Contacts each of the other Oozie servers, gets their logs for the job, collates them, and sends them to the user via the 126 * Writer. It will make sure to not read all of the log messages into memory at the same time to not use up the heap. If there 127 * is a problem talking to one of the other servers, it will ignore that server and prepend a message to the Writer about it. 128 * For getting the logs from this server, it won't use the REST API and instead get them directly to be more efficient. 129 * 130 * @param filter 131 * @param startTime 132 * @param endTime 133 * @param writer 134 * @throws IOException 135 */ 136 private void collateLogs(XLogFilter filter, Date startTime, Date endTime, Writer writer, 137 Map<String, String[]> params) throws IOException { 138 XLogService xLogService = Services.get().get(XLogService.class); 139 List<String> badOozies = new ArrayList<String>(); 140 List<ServiceInstance<Map>> oozies = null; 141 try { 142 oozies = zk.getAllMetaData(); 143 } 144 catch (Exception ex) { 145 throw new IOException("Issue communicating with ZooKeeper: " + ex.getMessage(), ex); 146 } 147 List<TimestampedMessageParser> parsers = new ArrayList<TimestampedMessageParser>(oozies.size()); 148 try { 149 // Create a BufferedReader for getting the logs of each server and put them in a TimestampedMessageParser 150 for (ServiceInstance<Map> oozie : oozies) { 151 Map<String, String> oozieMeta = oozie.getPayload(); 152 String otherId = oozieMeta.get(ZKUtils.ZKMetadataKeys.OOZIE_ID); 153 // If it's this server, we can just get them directly 154 if (otherId.equals(zk.getZKId())) { 155 BufferedReader reader = new XLogStreamer(filter, xLogService.getOozieLogPath(), xLogService.getOozieLogName(), 156 xLogService.getOozieLogRotation()).makeReader(startTime, endTime); 157 parsers.add(new TimestampedMessageParser(reader, filter)); 158 } 159 // If it's another server, we'll have to use the REST API 160 else { 161 String otherUrl = oozieMeta.get(ZKUtils.ZKMetadataKeys.OOZIE_URL); 162 String jobId = filter.getFilterParams().get(DagXLogInfoService.JOB); 163 try { 164 // It's important that we specify ALL_SERVERS_PARAM=false in the GET request to prevent the other Oozie 165 // Server from trying aggregate logs from the other Oozie servers (and creating an infinite recursion) 166 final String url = otherUrl + "/v" + OozieClient.WS_PROTOCOL_VERSION + "/" + RestConstants.JOB 167 + "/" + jobId + "?" + RestConstants.JOB_SHOW_PARAM + "=" + RestConstants.JOB_SHOW_LOG 168 + "&" + RestConstants.ALL_SERVER_REQUEST + "=false" + AuthUrlClient.getQueryParamString(params); 169 170 BufferedReader reader = AuthUrlClient.callServer(url); 171 parsers.add(new SimpleTimestampedMessageParser(reader, filter)); 172 } 173 catch(IOException ioe) { 174 log.warn("Failed to retrieve logs for job [" + jobId + "] from Oozie server with ID [" + otherId 175 + "] at [" + otherUrl + "]; log information may be incomplete", ioe); 176 badOozies.add(otherId); 177 } 178 } 179 } 180 181 //If log param debug is set, we need to write start date and end date to outputstream. 182 if(filter.isDebugMode()){ 183 writer.write(filter.getDebugMessage()); 184 } 185 186 // Add a message about any servers we couldn't contact 187 if (!badOozies.isEmpty()) { 188 writer.write("Unable to contact the following Oozie Servers for logs (log information may be incomplete):\n"); 189 for (String badOozie : badOozies) { 190 writer.write(" "); 191 writer.write(badOozie); 192 writer.write("\n"); 193 } 194 writer.write("\n"); 195 writer.flush(); 196 } 197 198 // If it's just the one server (this server), then we don't need to do any more processing and can just copy it directly 199 if (parsers.size() == 1) { 200 TimestampedMessageParser parser = parsers.get(0); 201 parser.processRemaining(writer, bufferLen); 202 } 203 else { 204 // Now that we have a Reader for each server to get the logs from that server, we have to collate them. Within each 205 // server, the logs should already be in the correct order, so we can take advantage of that. We'll use the 206 // BufferedReaders to read the messages from the logs of each server and put them in order without having to bring 207 // every message into memory at the same time. 208 TreeMap<String, TimestampedMessageParser> timestampMap = new TreeMap<String, TimestampedMessageParser>(); 209 // populate timestampMap with initial values 210 for (TimestampedMessageParser parser : parsers) { 211 if (parser.increment()) { 212 timestampMap.put(parser.getLastTimestamp(), parser); 213 } 214 } 215 int bytesWritten = 0; 216 while (timestampMap.size() > 1) { 217 // The first entry will be the earliest based on the timestamp (also removes it) from the map 218 TimestampedMessageParser earliestParser = timestampMap.pollFirstEntry().getValue(); 219 // Write the message from that parser at that timestamp 220 writer.write(earliestParser.getLastMessage()); 221 bytesWritten = earliestParser.getLastMessage().length(); 222 if (bytesWritten > bufferLen) { 223 writer.flush(); 224 bytesWritten = 0; 225 } 226 // Increment that parser to read the next message 227 if (earliestParser.increment()) { 228 // If it still has messages left, put it back in the map with the new last timestamp for it 229 timestampMap.put(earliestParser.getLastTimestamp(), earliestParser); 230 } 231 } 232 // If there's only one parser left in the map, then we can simply copy the rest of its lines directly to be faster 233 if (timestampMap.size() == 1) { 234 TimestampedMessageParser parser = timestampMap.values().iterator().next(); 235 writer.write(parser.getLastMessage()); // don't forget the last message read by the parser 236 parser.processRemaining(writer, bufferLen, bytesWritten + parser.getLastMessage().length()); 237 } 238 } 239 } 240 finally { 241 for (TimestampedMessageParser parser : parsers) { 242 parser.closeReader(); 243 } 244 writer.flush(); 245 } 246 } 247}