This project has retired. For details please refer to its Attic page.
Source code
001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.service;
019
020import java.io.BufferedReader;
021
022import org.apache.oozie.util.Instrumentable;
023import org.apache.oozie.util.Instrumentation;
024import org.apache.oozie.util.XLogStreamer;
025
026import java.io.IOException;
027import java.io.Writer;
028import java.util.ArrayList;
029import java.util.Date;
030import java.util.List;
031import java.util.Map;
032import java.util.TreeMap;
033
034import org.apache.curator.x.discovery.ServiceInstance;
035import org.apache.oozie.ErrorCode;
036import org.apache.oozie.client.OozieClient;
037import org.apache.oozie.client.rest.RestConstants;
038import org.apache.oozie.util.AuthUrlClient;
039import org.apache.oozie.util.XLogFilter;
040import org.apache.oozie.util.SimpleTimestampedMessageParser;
041import org.apache.oozie.util.TimestampedMessageParser;
042import org.apache.oozie.util.XLog;
043import org.apache.oozie.util.ZKUtils;
044
045/**
046 * Service that performs streaming of log files over Web Services if enabled in XLogService and collates logs from other Oozie
047 * servers.  Requires that a ZooKeeper ensemble is available.
048 */
049public class ZKXLogStreamingService extends XLogStreamingService implements Service, Instrumentable {
050
051    private ZKUtils zk;
052    private XLog log;
053
054    /**
055     * Initialize the log streaming service.
056     *
057     * @param services services instance.
058     * @throws ServiceException thrown if the log streaming service could not be initialized.
059     */
060    @Override
061    public void init(Services services) throws ServiceException {
062        super.init(services);
063        try {
064            zk = ZKUtils.register(this);
065        }
066        catch (Exception ex) {
067            throw new ServiceException(ErrorCode.E1700, ex.getMessage(), ex);
068        }
069        log = XLog.getLog(this.getClass());
070    }
071
072    /**
073     * Destroy the log streaming service.
074     */
075    @Override
076    public void destroy() {
077        if (zk != null) {
078            zk.unregister(this);
079        }
080        zk = null;
081        super.destroy();
082    }
083
084    /**
085     * Instruments the log streaming service.
086     *
087     * @param instr instrumentation to use.
088     */
089    @Override
090    public void instrument(Instrumentation instr) {
091        super.instrument(instr);
092    }
093
094    /**
095     * Stream the log of a job.  It contacts any other running Oozie servers to collate relevant logs while streaming.
096     *
097     * @param filter log streamer filter.
098     * @param startTime start time for log events to filter.
099     * @param endTime end time for log events to filter.
100     * @param writer writer to stream the log to.
101     * @param params additional parameters from the request
102     * @throws IOException thrown if the log cannot be streamed.
103     */
104    @Override
105    public void streamLog(XLogFilter filter, Date startTime, Date endTime, Writer writer, Map<String, String[]> params)
106            throws IOException {
107        XLogService xLogService = Services.get().get(XLogService.class);
108        if (xLogService.getLogOverWS()) {
109            // If ALL_SERVERS_PARAM is set to false, then only stream our log
110            if (!Services.get().get(JobsConcurrencyService.class).isAllServerRequest(params)) {
111                new XLogStreamer(filter, xLogService.getOozieLogPath(), xLogService.getOozieLogName(),
112                        xLogService.getOozieLogRotation()).streamLog(writer, startTime, endTime, bufferLen);
113            }
114            // Otherwise, we have to go collate relevant logs from the other Oozie servers
115            else {
116                collateLogs(filter, startTime, endTime, writer, params, xLogService.getOozieLogPath(),
117                        xLogService.getOozieLogName(), xLogService.getOozieLogRotation(), RestConstants.JOB_SHOW_LOG);
118            }
119        }
120        else {
121            writer.write("Log streaming disabled!!");
122        }
123    }
124
125    /**
126     * Stream the error log of a job.  It contacts any other running Oozie servers to collate relevant error logs while streaming.
127     *
128     * @param filter log streamer filter.
129     * @param startTime start time for log events to filter.
130     * @param endTime end time for log events to filter.
131     * @param writer writer to stream the log to.
132     * @param params additional parameters from the request
133     * @throws IOException thrown if the log cannot be streamed.
134     */
135    public void streamErrorLog(XLogFilter filter, Date startTime, Date endTime, Writer writer, Map<String, String[]> params)
136            throws IOException {
137        XLogService xLogService = Services.get().get(XLogService.class);
138        if (xLogService.isErrorLogEnabled()) {
139            // If ALL_SERVERS_PARAM is set to false, then only stream our log
140            if (!Services.get().get(JobsConcurrencyService.class).isAllServerRequest(params)) {
141                new XLogStreamer(filter, xLogService.getOozieErrorLogPath(), xLogService.getOozieErrorLogName(),
142                        xLogService.getOozieErrorLogRotation()).streamLog(writer, startTime, endTime, bufferLen);
143            }
144            // Otherwise, we have to go collate relevant logs from the other Oozie servers
145            else {
146                collateLogs(filter, startTime, endTime, writer, params, xLogService.getOozieLogPath(),
147                        xLogService.getOozieErrorLogName(), xLogService.getOozieErrorLogRotation(),
148                        RestConstants.JOB_SHOW_ERROR_LOG);
149            }
150        }
151        else {
152            writer.write("Error Log streaming disabled!!");
153        }
154    }
155
156    /**
157     * Stream the audit log of a job.  It contacts any other running Oozie servers to collate relevant audit logs while streaming.
158     *
159     * @param filter log streamer filter.
160     * @param startTime start time for log events to filter.
161     * @param endTime end time for log events to filter.
162     * @param writer writer to stream the log to.
163     * @param params additional parameters from the request
164     * @throws IOException thrown if the log cannot be streamed.
165     */
166    @Override
167    public void streamAuditLog(XLogFilter filter, Date startTime, Date endTime, Writer writer, Map<String, String[]> params)
168            throws IOException {
169        XLogService xLogService = Services.get().get(XLogService.class);
170        if (xLogService.isAuditLogEnabled()) {
171            // If ALL_SERVERS_PARAM is set to false, then only stream our log
172            if (!Services.get().get(JobsConcurrencyService.class).isAllServerRequest(params)) {
173                new XLogStreamer(filter, xLogService.getOozieAuditLogPath(), xLogService.getOozieAuditLogName(),
174                        xLogService.getOozieAuditLogRotation()).streamLog(writer, startTime, endTime, bufferLen);
175            }
176            // Otherwise, we have to go collate relevant logs from the other Oozie servers
177            else {
178                collateLogs(filter, startTime, endTime, writer, params, xLogService.getOozieAuditLogPath(),
179                        xLogService.getOozieAuditLogName(), xLogService.getOozieAuditLogRotation(),
180                        RestConstants.JOB_SHOW_AUDIT_LOG);
181            }
182        }
183        else {
184            writer.write("Audit Log streaming disabled!!");
185        }
186    }
187
188
189
190    /**
191     * Contacts each of the other Oozie servers, gets their logs for the job, collates them, and sends them to the user via the
192     * Writer.  It will make sure to not read all of the log messages into memory at the same time to not use up the heap.  If there
193     * is a problem talking to one of the other servers, it will ignore that server and prepend a message to the Writer about it.
194     * For getting the logs from this server, it won't use the REST API and instead get them directly to be more efficient.
195     *
196     * @param filter the job filter
197     * @param startTime the job start time
198     * @param endTime the job end time
199     * @param writer the writer
200     * @param params the params
201     * @param logPath the log path
202     * @param logName the log name
203     * @param rotation the rotation
204     * @param logType the log type
205     * @throws IOException Signals that an I/O exception has occurred.
206     */
207    private void collateLogs(XLogFilter filter, Date startTime, Date endTime, Writer writer,
208            Map<String, String[]> params, String logPath, String logName, int rotation, final String logType) throws IOException {
209        XLogService xLogService = Services.get().get(XLogService.class);
210        List<String> badOozies = new ArrayList<String>();
211        List<ServiceInstance<Map>> oozies = null;
212        try {
213            oozies = zk.getAllMetaData();
214        }
215        catch (Exception ex) {
216            throw new IOException("Issue communicating with ZooKeeper: " + ex.getMessage(), ex);
217        }
218        List<TimestampedMessageParser> parsers = new ArrayList<TimestampedMessageParser>(oozies.size());
219        try {
220            // Create a BufferedReader for getting the logs of each server and put them in a TimestampedMessageParser
221            for (ServiceInstance<Map> oozie : oozies) {
222                Map<String, String> oozieMeta = oozie.getPayload();
223                String otherId = oozieMeta.get(ZKUtils.ZKMetadataKeys.OOZIE_ID);
224                // If it's this server, we can just get them directly
225                if (otherId.equals(zk.getZKId())) {
226                    BufferedReader reader = new XLogStreamer(filter, logPath, logName, rotation).makeReader(startTime,
227                            endTime);
228                    parsers.add(new TimestampedMessageParser(reader, filter));
229                }
230                // If it's another server, we'll have to use the REST API
231                else {
232                    String otherUrl = oozieMeta.get(ZKUtils.ZKMetadataKeys.OOZIE_URL);
233                    String jobId = filter.getFilterParams().get(DagXLogInfoService.JOB);
234                    try {
235                     // It's important that we specify ALL_SERVERS_PARAM=false in the GET request to prevent the other Oozie
236                     // Server from trying aggregate logs from the other Oozie servers (and creating an infinite recursion)
237                        final String url = otherUrl + "/v" + OozieClient.WS_PROTOCOL_VERSION + "/" + RestConstants.JOB
238                                + "/" + jobId + "?" + RestConstants.JOB_SHOW_PARAM + "=" + logType
239                                + "&" + RestConstants.ALL_SERVER_REQUEST + "=false" + AuthUrlClient.getQueryParamString(params);
240
241                        BufferedReader reader = AuthUrlClient.callServer(url);
242                        parsers.add(new SimpleTimestampedMessageParser(reader, filter));
243                    }
244                    catch(IOException ioe) {
245                        log.warn("Failed to retrieve logs for job [" + jobId + "] from Oozie server with ID [" + otherId
246                                + "] at [" + otherUrl + "]; log information may be incomplete", ioe);
247                        badOozies.add(otherId);
248                    }
249                }
250            }
251
252            //If log param debug is set, we need to write start date and end date to outputstream.
253            if(filter.isDebugMode()){
254                writer.write(filter.getDebugMessage());
255            }
256
257            // Add a message about any servers we couldn't contact
258            if (!badOozies.isEmpty()) {
259                writer.write("Unable to contact the following Oozie Servers for logs (log information may be incomplete):\n");
260                for (String badOozie : badOozies) {
261                    writer.write("     ");
262                    writer.write(badOozie);
263                    writer.write("\n");
264                }
265                writer.write("\n");
266                writer.flush();
267            }
268
269            // If it's just the one server (this server), then we don't need to do any more processing and can just copy it directly
270            if (parsers.size() == 1) {
271                TimestampedMessageParser parser = parsers.get(0);
272                parser.processRemaining(writer, bufferLen);
273            }
274            else {
275                // Now that we have a Reader for each server to get the logs from that server, we have to collate them.  Within each
276                // server, the logs should already be in the correct order, so we can take advantage of that.  We'll use the
277                // BufferedReaders to read the messages from the logs of each server and put them in order without having to bring
278                // every message into memory at the same time.
279                TreeMap<String, TimestampedMessageParser> timestampMap = new TreeMap<String, TimestampedMessageParser>();
280                // populate timestampMap with initial values
281                for (TimestampedMessageParser parser : parsers) {
282                    if (parser.increment()) {
283                        timestampMap.put(parser.getLastTimestamp(), parser);
284                    }
285                }
286                int bytesWritten = 0;
287                while (timestampMap.size() > 1) {
288                    // The first entry will be the earliest based on the timestamp (also removes it) from the map
289                    TimestampedMessageParser earliestParser = timestampMap.pollFirstEntry().getValue();
290                    // Write the message from that parser at that timestamp
291                    writer.write(earliestParser.getLastMessage());
292                    bytesWritten = earliestParser.getLastMessage().length();
293                    if (bytesWritten > bufferLen) {
294                        writer.flush();
295                        bytesWritten = 0;
296                    }
297                    // Increment that parser to read the next message
298                    if (earliestParser.increment()) {
299                        // If it still has messages left, put it back in the map with the new last timestamp for it
300                        timestampMap.put(earliestParser.getLastTimestamp(), earliestParser);
301                    }
302                }
303                // If there's only one parser left in the map, then we can simply copy the rest of its lines directly to be faster
304                if (timestampMap.size() == 1) {
305                    TimestampedMessageParser parser = timestampMap.values().iterator().next();
306                    writer.write(parser.getLastMessage());  // don't forget the last message read by the parser
307                    parser.processRemaining(writer, bufferLen, bytesWritten + parser.getLastMessage().length());
308                }
309            }
310        }
311        finally {
312            for (TimestampedMessageParser parser : parsers) {
313                parser.closeReader();
314            }
315            writer.flush();
316        }
317    }
318}