001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.service;
019
020import java.io.BufferedReader;
021import java.io.IOException;
022import java.io.Writer;
023import java.net.URLEncoder;
024import java.util.ArrayList;
025import java.util.Date;
026import java.util.List;
027import java.util.Map;
028import java.util.TreeMap;
029
030import org.apache.curator.x.discovery.ServiceInstance;
031import org.apache.oozie.ErrorCode;
032import org.apache.oozie.client.OozieClient;
033import org.apache.oozie.client.rest.RestConstants;
034import org.apache.oozie.util.AuthUrlClient;
035import org.apache.oozie.util.Instrumentable;
036import org.apache.oozie.util.Instrumentation;
037import org.apache.oozie.util.SimpleTimestampedMessageParser;
038import org.apache.oozie.util.TimestampedMessageParser;
039import org.apache.oozie.util.XLog;
040import org.apache.oozie.util.XLogFilter;
041import org.apache.oozie.util.XLogStreamer;
042import org.apache.oozie.util.ZKUtils;
043
044/**
045 * Service that performs streaming of log files over Web Services if enabled in XLogService and collates logs from other Oozie
046 * servers.  Requires that a ZooKeeper ensemble is available.
047 */
048public class ZKXLogStreamingService extends XLogStreamingService implements Service, Instrumentable {
049
050    private ZKUtils zk;
051    private XLog log;
052
053    /**
054     * Initialize the log streaming service.
055     *
056     * @param services services instance.
057     * @throws ServiceException thrown if the log streaming service could not be initialized.
058     */
059    @Override
060    public void init(Services services) throws ServiceException {
061        super.init(services);
062        try {
063            zk = ZKUtils.register(this);
064        }
065        catch (Exception ex) {
066            throw new ServiceException(ErrorCode.E1700, ex.getMessage(), ex);
067        }
068        log = XLog.getLog(this.getClass());
069    }
070
071    /**
072     * Destroy the log streaming service.
073     */
074    @Override
075    public void destroy() {
076        if (zk != null) {
077            zk.unregister(this);
078        }
079        zk = null;
080        super.destroy();
081    }
082
083    /**
084     * Instruments the log streaming service.
085     *
086     * @param instr instrumentation to use.
087     */
088    @Override
089    public void instrument(Instrumentation instr) {
090        super.instrument(instr);
091    }
092
093    /**
094     * Stream the log of a job.  It contacts any other running Oozie servers to collate relevant logs while streaming.
095     *
096     * @param filter log streamer filter.
097     * @param startTime start time for log events to filter.
098     * @param endTime end time for log events to filter.
099     * @param writer writer to stream the log to.
100     * @param params additional parameters from the request
101     * @throws IOException thrown if the log cannot be streamed.
102     */
103    @Override
104    public void streamLog(XLogFilter filter, Date startTime, Date endTime, Writer writer, Map<String, String[]> params)
105            throws IOException {
106        XLogService xLogService = Services.get().get(XLogService.class);
107        if (xLogService.getLogOverWS()) {
108            // If ALL_SERVERS_PARAM is set to false, then only stream our log
109            if (!Services.get().get(JobsConcurrencyService.class).isAllServerRequest(params)) {
110                new XLogStreamer(filter, xLogService.getOozieLogPath(), xLogService.getOozieLogName(),
111                        xLogService.getOozieLogRotation()).streamLog(writer, startTime, endTime, bufferLen);
112            }
113            // Otherwise, we have to go collate relevant logs from the other Oozie servers
114            else {
115                collateLogs(filter, startTime, endTime, writer, params, xLogService.getOozieLogPath(),
116                        xLogService.getOozieLogName(), xLogService.getOozieLogRotation(), RestConstants.JOB_SHOW_LOG);
117            }
118        }
119        else {
120            writer.write("Log streaming disabled!!");
121        }
122    }
123
124    /**
125     * Stream the error log of a job.  It contacts any other running Oozie servers to collate relevant error logs while streaming.
126     *
127     * @param filter log streamer filter.
128     * @param startTime start time for log events to filter.
129     * @param endTime end time for log events to filter.
130     * @param writer writer to stream the log to.
131     * @param params additional parameters from the request
132     * @throws IOException thrown if the log cannot be streamed.
133     */
134    public void streamErrorLog(XLogFilter filter, Date startTime, Date endTime, Writer writer, Map<String, String[]> params)
135            throws IOException {
136        XLogService xLogService = Services.get().get(XLogService.class);
137        if (xLogService.isErrorLogEnabled()) {
138            // If ALL_SERVERS_PARAM is set to false, then only stream our log
139            if (!Services.get().get(JobsConcurrencyService.class).isAllServerRequest(params)) {
140                new XLogStreamer(filter, xLogService.getOozieErrorLogPath(), xLogService.getOozieErrorLogName(),
141                        xLogService.getOozieErrorLogRotation()).streamLog(writer, startTime, endTime, bufferLen);
142            }
143            // Otherwise, we have to go collate relevant logs from the other Oozie servers
144            else {
145                collateLogs(filter, startTime, endTime, writer, params, xLogService.getOozieLogPath(),
146                        xLogService.getOozieErrorLogName(), xLogService.getOozieErrorLogRotation(),
147                        RestConstants.JOB_SHOW_ERROR_LOG);
148            }
149        }
150        else {
151            writer.write("Error Log streaming disabled!!");
152        }
153    }
154
155    /**
156     * Stream the audit log of a job.  It contacts any other running Oozie servers to collate relevant audit logs while streaming.
157     *
158     * @param filter log streamer filter.
159     * @param startTime start time for log events to filter.
160     * @param endTime end time for log events to filter.
161     * @param writer writer to stream the log to.
162     * @param params additional parameters from the request
163     * @throws IOException thrown if the log cannot be streamed.
164     */
165    @Override
166    public void streamAuditLog(XLogFilter filter, Date startTime, Date endTime, Writer writer, Map<String, String[]> params)
167            throws IOException {
168        XLogService xLogService = Services.get().get(XLogService.class);
169        if (xLogService.isAuditLogEnabled()) {
170            // If ALL_SERVERS_PARAM is set to false, then only stream our log
171            if (!Services.get().get(JobsConcurrencyService.class).isAllServerRequest(params)) {
172                new XLogStreamer(filter, xLogService.getOozieAuditLogPath(), xLogService.getOozieAuditLogName(),
173                        xLogService.getOozieAuditLogRotation()).streamLog(writer, startTime, endTime, bufferLen);
174            }
175            // Otherwise, we have to go collate relevant logs from the other Oozie servers
176            else {
177                collateLogs(filter, startTime, endTime, writer, params, xLogService.getOozieAuditLogPath(),
178                        xLogService.getOozieAuditLogName(), xLogService.getOozieAuditLogRotation(),
179                        RestConstants.JOB_SHOW_AUDIT_LOG);
180            }
181        }
182        else {
183            writer.write("Audit Log streaming disabled!!");
184        }
185    }
186
187
188
189    /**
190     * Contacts each of the other Oozie servers, gets their logs for the job, collates them, and sends them to the user via the
191     * Writer.  It will make sure to not read all of the log messages into memory at the same time to not use up the heap.  If there
192     * is a problem talking to one of the other servers, it will ignore that server and prepend a message to the Writer about it.
193     * For getting the logs from this server, it won't use the REST API and instead get them directly to be more efficient.
194     *
195     * @param filter the job filter
196     * @param startTime the job start time
197     * @param endTime the job end time
198     * @param writer the writer
199     * @param params the params
200     * @param logPath the log path
201     * @param logName the log name
202     * @param rotation the rotation
203     * @param logType the log type
204     * @throws IOException Signals that an I/O exception has occurred.
205     */
206    private void collateLogs(XLogFilter filter, Date startTime, Date endTime, Writer writer,
207            Map<String, String[]> params, String logPath, String logName, int rotation, final String logType) throws IOException {
208        XLogService xLogService = Services.get().get(XLogService.class);
209        List<String> badOozies = new ArrayList<String>();
210        List<ServiceInstance<Map>> oozies = null;
211        try {
212            oozies = zk.getAllMetaData();
213        }
214        catch (Exception ex) {
215            throw new IOException("Issue communicating with ZooKeeper: " + ex.getMessage(), ex);
216        }
217        List<TimestampedMessageParser> parsers = new ArrayList<TimestampedMessageParser>(oozies.size());
218        try {
219            // Create a BufferedReader for getting the logs of each server and put them in a TimestampedMessageParser
220            for (ServiceInstance<Map> oozie : oozies) {
221                Map<String, String> oozieMeta = oozie.getPayload();
222                String otherId = oozieMeta.get(ZKUtils.ZKMetadataKeys.OOZIE_ID);
223                // If it's this server, we can just get them directly
224                if (otherId.equals(zk.getZKId())) {
225                    BufferedReader reader = new XLogStreamer(filter, logPath, logName, rotation).makeReader(startTime,
226                            endTime);
227                    parsers.add(new TimestampedMessageParser(reader, filter));
228                }
229                // If it's another server, we'll have to use the REST API
230                else {
231                    String otherUrl = oozieMeta.get(ZKUtils.ZKMetadataKeys.OOZIE_URL);
232                    String jobId = filter.getFilterParams().get(DagXLogInfoService.JOB);
233                    try {
234                     // It's important that we specify ALL_SERVERS_PARAM=false in the GET request to prevent the other Oozie
235                     // Server from trying aggregate logs from the other Oozie servers (and creating an infinite recursion)
236                        final String url = otherUrl + "/v" + OozieClient.WS_PROTOCOL_VERSION + "/" + RestConstants.JOB
237                                + "/" + jobId + "?" + RestConstants.JOB_SHOW_PARAM + "=" + logType
238                                + "&" + RestConstants.ALL_SERVER_REQUEST + "=false" + AuthUrlClient.getQueryParamString(params);
239                        // remove doAs from url to avoid failure while fetching
240                        // logs in case of HA mode
241                        String key = "doAs";
242                        String[] value = params.get(key);
243                        String urlWithoutdoAs = null;
244                        if (value != null && value.length > 0 && value[0] != null && value[0].length() > 0) {
245                            urlWithoutdoAs = url.replace("&" + key + "=" + URLEncoder.encode(value[0], "UTF-8"), "");
246                        }
247                        else {
248                            urlWithoutdoAs = url;
249                        }
250                        BufferedReader reader = AuthUrlClient.callServer(urlWithoutdoAs);
251                        parsers.add(new SimpleTimestampedMessageParser(reader, filter));
252                    }
253                    catch(IOException ioe) {
254                        log.warn("Failed to retrieve logs for job [" + jobId + "] from Oozie server with ID [" + otherId
255                                + "] at [" + otherUrl + "]; log information may be incomplete", ioe);
256                        badOozies.add(otherId);
257                    }
258                }
259            }
260
261            //If log param debug is set, we need to write start date and end date to outputstream.
262            if(filter.isDebugMode()){
263                writer.write(filter.getDebugMessage());
264            }
265
266            // Add a message about any servers we couldn't contact
267            if (!badOozies.isEmpty()) {
268                writer.write("Unable to contact the following Oozie Servers for logs (log information may be incomplete):\n");
269                for (String badOozie : badOozies) {
270                    writer.write("     ");
271                    writer.write(badOozie);
272                    writer.write("\n");
273                }
274                writer.write("\n");
275                writer.flush();
276            }
277
278            // If it's just the one server (this server), then we don't need to do any more processing and can just copy it directly
279            if (parsers.size() == 1) {
280                TimestampedMessageParser parser = parsers.get(0);
281                parser.processRemaining(writer, bufferLen);
282            }
283            else {
284                // Now that we have a Reader for each server to get the logs from that server, we have to collate them.  Within each
285                // server, the logs should already be in the correct order, so we can take advantage of that.  We'll use the
286                // BufferedReaders to read the messages from the logs of each server and put them in order without having to bring
287                // every message into memory at the same time.
288                TreeMap<String, TimestampedMessageParser> timestampMap = new TreeMap<String, TimestampedMessageParser>();
289                // populate timestampMap with initial values
290                for (TimestampedMessageParser parser : parsers) {
291                    if (parser.increment()) {
292                        timestampMap.put(parser.getLastTimestamp(), parser);
293                    }
294                }
295                int bytesWritten = 0;
296                while (timestampMap.size() > 1) {
297                    // The first entry will be the earliest based on the timestamp (also removes it) from the map
298                    TimestampedMessageParser earliestParser = timestampMap.pollFirstEntry().getValue();
299                    // Write the message from that parser at that timestamp
300                    writer.write(earliestParser.getLastMessage());
301                    bytesWritten = earliestParser.getLastMessage().length();
302                    if (bytesWritten > bufferLen) {
303                        writer.flush();
304                        bytesWritten = 0;
305                    }
306                    // Increment that parser to read the next message
307                    if (earliestParser.increment()) {
308                        // If it still has messages left, put it back in the map with the new last timestamp for it
309                        timestampMap.put(earliestParser.getLastTimestamp(), earliestParser);
310                    }
311                }
312                // If there's only one parser left in the map, then we can simply copy the rest of its lines directly to be faster
313                if (timestampMap.size() == 1) {
314                    TimestampedMessageParser parser = timestampMap.values().iterator().next();
315                    writer.write(parser.getLastMessage());  // don't forget the last message read by the parser
316                    parser.processRemaining(writer, bufferLen, bytesWritten + parser.getLastMessage().length());
317                }
318            }
319        }
320        finally {
321            for (TimestampedMessageParser parser : parsers) {
322                parser.closeReader();
323            }
324            writer.flush();
325        }
326    }
327}