001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.service;
019
020import java.io.BufferedReader;
021
022import org.apache.oozie.util.Instrumentable;
023import org.apache.oozie.util.Instrumentation;
024import org.apache.oozie.util.XLogStreamer;
025
026import java.io.IOException;
027import java.io.Writer;
028import java.util.ArrayList;
029import java.util.Date;
030import java.util.List;
031import java.util.Map;
032import java.util.TreeMap;
033
034import org.apache.curator.x.discovery.ServiceInstance;
035import org.apache.oozie.ErrorCode;
036import org.apache.oozie.client.OozieClient;
037import org.apache.oozie.client.rest.RestConstants;
038import org.apache.oozie.util.AuthUrlClient;
039import org.apache.oozie.util.XLogFilter;
040import org.apache.oozie.util.SimpleTimestampedMessageParser;
041import org.apache.oozie.util.TimestampedMessageParser;
042import org.apache.oozie.util.XLog;
043import org.apache.oozie.util.ZKUtils;
044
045/**
046 * Service that performs streaming of log files over Web Services if enabled in XLogService and collates logs from other Oozie
047 * servers.  Requires that a ZooKeeper ensemble is available.
048 */
049public class ZKXLogStreamingService extends XLogStreamingService implements Service, Instrumentable {
050
051    private ZKUtils zk;
052    private XLog log;
053
054    /**
055     * Initialize the log streaming service.
056     *
057     * @param services services instance.
058     * @throws ServiceException thrown if the log streaming service could not be initialized.
059     */
060    @Override
061    public void init(Services services) throws ServiceException {
062        super.init(services);
063        try {
064            zk = ZKUtils.register(this);
065        }
066        catch (Exception ex) {
067            throw new ServiceException(ErrorCode.E1700, ex.getMessage(), ex);
068        }
069        log = XLog.getLog(this.getClass());
070    }
071
072    /**
073     * Destroy the log streaming service.
074     */
075    @Override
076    public void destroy() {
077        if (zk != null) {
078            zk.unregister(this);
079        }
080        zk = null;
081        super.destroy();
082    }
083
084    /**
085     * Instruments the log streaming service.
086     *
087     * @param instr instrumentation to use.
088     */
089    @Override
090    public void instrument(Instrumentation instr) {
091        super.instrument(instr);
092    }
093
094    /**
095     * Stream the log of a job.  It contacts any other running Oozie servers to collate relevant logs while streaming.
096     *
097     * @param filter log streamer filter.
098     * @param startTime start time for log events to filter.
099     * @param endTime end time for log events to filter.
100     * @param writer writer to stream the log to.
101     * @param params additional parameters from the request
102     * @throws IOException thrown if the log cannot be streamed.
103     */
104    @Override
105    public void streamLog(XLogFilter filter, Date startTime, Date endTime, Writer writer, Map<String, String[]> params)
106            throws IOException {
107        XLogService xLogService = Services.get().get(XLogService.class);
108        if (xLogService.getLogOverWS()) {
109            // If ALL_SERVERS_PARAM is set to false, then only stream our log
110            if (!Services.get().get(JobsConcurrencyService.class).isAllServerRequest(params)) {
111                new XLogStreamer(filter, xLogService.getOozieLogPath(), xLogService.getOozieLogName(),
112                        xLogService.getOozieLogRotation()).streamLog(writer, startTime, endTime, bufferLen);
113            }
114            // Otherwise, we have to go collate relevant logs from the other Oozie servers
115            else {
116                collateLogs(filter, startTime, endTime, writer, params);
117            }
118        }
119        else {
120            writer.write("Log streaming disabled!!");
121        }
122    }
123
124    /**
125     * Contacts each of the other Oozie servers, gets their logs for the job, collates them, and sends them to the user via the
126     * Writer.  It will make sure to not read all of the log messages into memory at the same time to not use up the heap.  If there
127     * is a problem talking to one of the other servers, it will ignore that server and prepend a message to the Writer about it.
128     * For getting the logs from this server, it won't use the REST API and instead get them directly to be more efficient.
129     *
130     * @param filter
131     * @param startTime
132     * @param endTime
133     * @param writer
134     * @throws IOException
135     */
136    private void collateLogs(XLogFilter filter, Date startTime, Date endTime, Writer writer,
137            Map<String, String[]> params) throws IOException {
138        XLogService xLogService = Services.get().get(XLogService.class);
139        List<String> badOozies = new ArrayList<String>();
140        List<ServiceInstance<Map>> oozies = null;
141        try {
142            oozies = zk.getAllMetaData();
143        }
144        catch (Exception ex) {
145            throw new IOException("Issue communicating with ZooKeeper: " + ex.getMessage(), ex);
146        }
147        List<TimestampedMessageParser> parsers = new ArrayList<TimestampedMessageParser>(oozies.size());
148        try {
149            // Create a BufferedReader for getting the logs of each server and put them in a TimestampedMessageParser
150            for (ServiceInstance<Map> oozie : oozies) {
151                Map<String, String> oozieMeta = oozie.getPayload();
152                String otherId = oozieMeta.get(ZKUtils.ZKMetadataKeys.OOZIE_ID);
153                // If it's this server, we can just get them directly
154                if (otherId.equals(zk.getZKId())) {
155                    BufferedReader reader = new XLogStreamer(filter, xLogService.getOozieLogPath(), xLogService.getOozieLogName(),
156                                                             xLogService.getOozieLogRotation()).makeReader(startTime, endTime);
157                    parsers.add(new TimestampedMessageParser(reader, filter));
158                }
159                // If it's another server, we'll have to use the REST API
160                else {
161                    String otherUrl = oozieMeta.get(ZKUtils.ZKMetadataKeys.OOZIE_URL);
162                    String jobId = filter.getFilterParams().get(DagXLogInfoService.JOB);
163                    try {
164                     // It's important that we specify ALL_SERVERS_PARAM=false in the GET request to prevent the other Oozie
165                     // Server from trying aggregate logs from the other Oozie servers (and creating an infinite recursion)
166                        final String url = otherUrl + "/v" + OozieClient.WS_PROTOCOL_VERSION + "/" + RestConstants.JOB
167                                + "/" + jobId + "?" + RestConstants.JOB_SHOW_PARAM + "=" + RestConstants.JOB_SHOW_LOG
168                                + "&" + RestConstants.ALL_SERVER_REQUEST + "=false" + AuthUrlClient.getQueryParamString(params);
169
170                        BufferedReader reader = AuthUrlClient.callServer(url);
171                        parsers.add(new SimpleTimestampedMessageParser(reader, filter));
172                    }
173                    catch(IOException ioe) {
174                        log.warn("Failed to retrieve logs for job [" + jobId + "] from Oozie server with ID [" + otherId
175                                + "] at [" + otherUrl + "]; log information may be incomplete", ioe);
176                        badOozies.add(otherId);
177                    }
178                }
179            }
180
181            //If log param debug is set, we need to write start date and end date to outputstream.
182            if(filter.isDebugMode()){
183                writer.write(filter.getDebugMessage());
184            }
185
186            // Add a message about any servers we couldn't contact
187            if (!badOozies.isEmpty()) {
188                writer.write("Unable to contact the following Oozie Servers for logs (log information may be incomplete):\n");
189                for (String badOozie : badOozies) {
190                    writer.write("     ");
191                    writer.write(badOozie);
192                    writer.write("\n");
193                }
194                writer.write("\n");
195                writer.flush();
196            }
197
198            // If it's just the one server (this server), then we don't need to do any more processing and can just copy it directly
199            if (parsers.size() == 1) {
200                TimestampedMessageParser parser = parsers.get(0);
201                parser.processRemaining(writer, bufferLen);
202            }
203            else {
204                // Now that we have a Reader for each server to get the logs from that server, we have to collate them.  Within each
205                // server, the logs should already be in the correct order, so we can take advantage of that.  We'll use the
206                // BufferedReaders to read the messages from the logs of each server and put them in order without having to bring
207                // every message into memory at the same time.
208                TreeMap<String, TimestampedMessageParser> timestampMap = new TreeMap<String, TimestampedMessageParser>();
209                // populate timestampMap with initial values
210                for (TimestampedMessageParser parser : parsers) {
211                    if (parser.increment()) {
212                        timestampMap.put(parser.getLastTimestamp(), parser);
213                    }
214                }
215                int bytesWritten = 0;
216                while (timestampMap.size() > 1) {
217                    // The first entry will be the earliest based on the timestamp (also removes it) from the map
218                    TimestampedMessageParser earliestParser = timestampMap.pollFirstEntry().getValue();
219                    // Write the message from that parser at that timestamp
220                    writer.write(earliestParser.getLastMessage());
221                    bytesWritten = earliestParser.getLastMessage().length();
222                    if (bytesWritten > bufferLen) {
223                        writer.flush();
224                        bytesWritten = 0;
225                    }
226                    // Increment that parser to read the next message
227                    if (earliestParser.increment()) {
228                        // If it still has messages left, put it back in the map with the new last timestamp for it
229                        timestampMap.put(earliestParser.getLastTimestamp(), earliestParser);
230                    }
231                }
232                // If there's only one parser left in the map, then we can simply copy the rest of its lines directly to be faster
233                if (timestampMap.size() == 1) {
234                    TimestampedMessageParser parser = timestampMap.values().iterator().next();
235                    writer.write(parser.getLastMessage());  // don't forget the last message read by the parser
236                    parser.processRemaining(writer, bufferLen, bytesWritten + parser.getLastMessage().length());
237                }
238            }
239        }
240        finally {
241            for (TimestampedMessageParser parser : parsers) {
242                parser.closeReader();
243            }
244            writer.flush();
245        }
246    }
247}