001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.service; 019 020import java.io.BufferedReader; 021import java.io.IOException; 022import java.io.Writer; 023import java.net.URLEncoder; 024import java.util.ArrayList; 025import java.util.Date; 026import java.util.List; 027import java.util.Map; 028import java.util.TreeMap; 029 030import org.apache.commons.lang.StringUtils; 031import org.apache.curator.x.discovery.ServiceInstance; 032import org.apache.oozie.ErrorCode; 033import org.apache.oozie.client.OozieClient; 034import org.apache.oozie.client.rest.RestConstants; 035import org.apache.oozie.util.AuthUrlClient; 036import org.apache.oozie.util.Instrumentable; 037import org.apache.oozie.util.Instrumentation; 038import org.apache.oozie.util.SimpleTimestampedMessageParser; 039import org.apache.oozie.util.TimestampedMessageParser; 040import org.apache.oozie.util.XLog; 041import org.apache.oozie.util.XLogStreamer; 042import org.apache.oozie.util.ZKUtils; 043 044/** 045 * Service that performs streaming of log files over Web Services if enabled in XLogService and collates logs from other Oozie 046 * servers. Requires that a ZooKeeper ensemble is available. 047 */ 048public class ZKXLogStreamingService extends XLogStreamingService implements Service, Instrumentable { 049 050 private ZKUtils zk; 051 private XLog log; 052 053 /** 054 * Initialize the log streaming service. 055 * 056 * @param services services instance. 057 * @throws ServiceException thrown if the log streaming service could not be initialized. 058 */ 059 @Override 060 public void init(Services services) throws ServiceException { 061 super.init(services); 062 try { 063 zk = ZKUtils.register(this); 064 } 065 catch (Exception ex) { 066 throw new ServiceException(ErrorCode.E1700, ex.getMessage(), ex); 067 } 068 log = XLog.getLog(this.getClass()); 069 } 070 071 /** 072 * Destroy the log streaming service. 073 */ 074 @Override 075 public void destroy() { 076 if (zk != null) { 077 zk.unregister(this); 078 } 079 zk = null; 080 super.destroy(); 081 } 082 083 /** 084 * Instruments the log streaming service. 085 * 086 * @param instr instrumentation to use. 087 */ 088 @Override 089 public void instrument(Instrumentation instr) { 090 super.instrument(instr); 091 } 092 093 /** 094 * Stream the log of a job. It contacts any other running Oozie servers to collate relevant logs while streaming. 095 * 096 * @param logStreamer the log streamer 097 * @param startTime start time for log events to filter. 098 * @param endTime end time for log events to filter. 099 * @param writer writer to stream the log to. 100 * @throws IOException thrown if the log cannot be streamed. 101 */ 102 @Override 103 public void streamLog(XLogStreamer logStreamer, Date startTime, Date endTime, Writer writer) throws IOException { 104 105 if (!logStreamer.isLogEnabled()) { 106 writer.write(logStreamer.getLogDisableMessage()); 107 return; 108 } 109 // If ALL_SERVERS_PARAM is set to false, then only stream our log 110 if (!Services.get().get(JobsConcurrencyService.class).isAllServerRequest(logStreamer.getRequestParam())) { 111 super.streamLog(logStreamer, startTime, endTime, writer, false); 112 } 113 // Otherwise, we have to go collate relevant logs from the other Oozie servers 114 else { 115 collateLogs(logStreamer, startTime, endTime, writer); 116 } 117 } 118 119 /** 120 * Contacts each of the other Oozie servers, gets their logs for the job, collates them, and sends them to the user via the 121 * Writer. It will make sure to not read all of the log messages into memory at the same time to not use up the heap. If there 122 * is a problem talking to one of the other servers, it will ignore that server and prepend a message to the Writer about it. 123 * For getting the logs from this server, it won't use the REST API and instead get them directly to be more efficient. 124 * 125 * @param logStreamer the XLogStreamer 126 * @param startTime the job start time 127 * @param endTime the job end time 128 * @param writer the writer 129 * @throws IOException Signals that an I/O exception has occurred. 130 */ 131 private void collateLogs(XLogStreamer logStreamer, Date startTime, Date endTime, Writer writer) throws IOException { 132 List<String> badOozies = new ArrayList<String>(); 133 List<ServiceInstance<Map>> oozies = null; 134 try { 135 oozies = zk.getAllMetaData(); 136 } 137 catch (Exception ex) { 138 throw new IOException("Issue communicating with ZooKeeper: " + ex.getMessage(), ex); 139 } 140 List<TimestampedMessageParser> parsers = new ArrayList<TimestampedMessageParser>(oozies.size()); 141 try { 142 // Create a BufferedReader for getting the logs of each server and put them in a TimestampedMessageParser 143 for (ServiceInstance<Map> oozie : oozies) { 144 Map<String, String> oozieMeta = oozie.getPayload(); 145 String otherId = oozieMeta.get(ZKUtils.ZKMetadataKeys.OOZIE_ID); 146 // If it's this server, we can just get them directly 147 if (otherId.equals(zk.getZKId())) { 148 BufferedReader reader = logStreamer.makeReader(startTime, 149 endTime); 150 parsers.add(new TimestampedMessageParser(reader, logStreamer.getXLogFilter())); 151 } 152 // If it's another server, we'll have to use the REST API 153 else { 154 String otherUrl = oozieMeta.get(ZKUtils.ZKMetadataKeys.OOZIE_URL); 155 String jobId = logStreamer.getXLogFilter().getFilterParams().get(DagXLogInfoService.JOB); 156 try { 157 // It's important that we specify ALL_SERVERS_PARAM=false in the GET request to prevent the other Oozie 158 // Server from trying aggregate logs from the other Oozie servers (and creating an infinite recursion) 159 final String url = otherUrl + "/v" + OozieClient.WS_PROTOCOL_VERSION + "/" + RestConstants.JOB 160 + "/" + jobId + "?" + RestConstants.JOB_SHOW_PARAM + "=" + logStreamer.getLogType() 161 + "&" + RestConstants.ALL_SERVER_REQUEST + "=false" 162 + AuthUrlClient.getQueryParamString(logStreamer.getRequestParam()); 163 // remove doAs from url to avoid failure while fetching 164 // logs in case of HA mode 165 String key = "doAs"; 166 String[] value = null; 167 if (logStreamer.getRequestParam() != null) { 168 value = logStreamer.getRequestParam().get(key); 169 } 170 String urlWithoutdoAs = null; 171 if (value != null && value.length > 0 && value[0] != null && value[0].length() > 0) { 172 urlWithoutdoAs = url.replace("&" + key + "=" + URLEncoder.encode(value[0], "UTF-8"), ""); 173 } 174 else { 175 urlWithoutdoAs = url; 176 } 177 BufferedReader reader = AuthUrlClient.callServer(urlWithoutdoAs); 178 parsers.add(new SimpleTimestampedMessageParser(reader, logStreamer.getXLogFilter())); 179 } 180 catch(IOException ioe) { 181 log.warn("Failed to retrieve logs for job [" + jobId + "] from Oozie server with ID [" + otherId 182 + "] at [" + otherUrl + "]; log information may be incomplete", ioe); 183 badOozies.add(otherId); 184 } 185 } 186 } 187 188 //If log param debug is set, we need to write start date and end date to outputstream. 189 if(!StringUtils.isEmpty(logStreamer.getXLogFilter().getTruncatedMessage())){ 190 writer.write(logStreamer.getXLogFilter().getTruncatedMessage()); 191 } 192 193 if (logStreamer.getXLogFilter().isDebugMode()) { 194 writer.write(logStreamer.getXLogFilter().getDebugMessage()); 195 } 196 // Add a message about any servers we couldn't contact 197 if (!badOozies.isEmpty()) { 198 writer.write("Unable to contact the following Oozie Servers for logs (log information may be incomplete):\n"); 199 for (String badOozie : badOozies) { 200 writer.write(" "); 201 writer.write(badOozie); 202 writer.write("\n"); 203 } 204 writer.write("\n"); 205 writer.flush(); 206 } 207 208 // If it's just the one server (this server), then we don't need to do any more processing and can just copy it directly 209 if (parsers.size() == 1) { 210 TimestampedMessageParser parser = parsers.get(0); 211 parser.processRemaining(writer, logStreamer); 212 } 213 else { 214 // Now that we have a Reader for each server to get the logs from that server, we have to collate them. Within each 215 // server, the logs should already be in the correct order, so we can take advantage of that. We'll use the 216 // BufferedReaders to read the messages from the logs of each server and put them in order without having to bring 217 // every message into memory at the same time. 218 TreeMap<String, TimestampedMessageParser> timestampMap = new TreeMap<String, TimestampedMessageParser>(); 219 // populate timestampMap with initial values 220 for (TimestampedMessageParser parser : parsers) { 221 if (parser.increment()) { 222 timestampMap.put(parser.getLastTimestamp(), parser); 223 } 224 } 225 while (timestampMap.size() > 1) { 226 // The first entry will be the earliest based on the timestamp (also removes it) from the map 227 TimestampedMessageParser earliestParser = timestampMap.pollFirstEntry().getValue(); 228 // Write the message from that parser at that timestamp 229 writer.write(earliestParser.getLastMessage()); 230 if (logStreamer.shouldFlushOutput(earliestParser.getLastMessage().length())) { 231 writer.flush(); 232 } 233 // Increment that parser to read the next message 234 if (earliestParser.increment()) { 235 // If it still has messages left, put it back in the map with the new last timestamp for it 236 timestampMap.put(earliestParser.getLastTimestamp(), earliestParser); 237 } 238 } 239 // If there's only one parser left in the map, then we can simply copy the rest of its lines directly to be faster 240 if (timestampMap.size() == 1) { 241 TimestampedMessageParser parser = timestampMap.values().iterator().next(); 242 writer.write(parser.getLastMessage()); // don't forget the last message read by the parser 243 parser.processRemaining(writer, logStreamer); 244 } 245 } 246 } 247 finally { 248 for (TimestampedMessageParser parser : parsers) { 249 parser.closeReader(); 250 } 251 } 252 } 253}