001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.service;
019
020import java.io.BufferedReader;
021import java.io.IOException;
022import java.io.Writer;
023import java.net.URLEncoder;
024import java.util.ArrayList;
025import java.util.Date;
026import java.util.List;
027import java.util.Map;
028import java.util.TreeMap;
029
030import org.apache.commons.lang.StringUtils;
031import org.apache.curator.x.discovery.ServiceInstance;
032import org.apache.oozie.ErrorCode;
033import org.apache.oozie.client.OozieClient;
034import org.apache.oozie.client.rest.RestConstants;
035import org.apache.oozie.util.AuthUrlClient;
036import org.apache.oozie.util.Instrumentable;
037import org.apache.oozie.util.Instrumentation;
038import org.apache.oozie.util.SimpleTimestampedMessageParser;
039import org.apache.oozie.util.TimestampedMessageParser;
040import org.apache.oozie.util.XLog;
041import org.apache.oozie.util.XLogStreamer;
042import org.apache.oozie.util.ZKUtils;
043
044/**
045 * Service that performs streaming of log files over Web Services if enabled in XLogService and collates logs from other Oozie
046 * servers.  Requires that a ZooKeeper ensemble is available.
047 */
048public class ZKXLogStreamingService extends XLogStreamingService implements Service, Instrumentable {
049
050    private ZKUtils zk;
051    private XLog log;
052
053    /**
054     * Initialize the log streaming service.
055     *
056     * @param services services instance.
057     * @throws ServiceException thrown if the log streaming service could not be initialized.
058     */
059    @Override
060    public void init(Services services) throws ServiceException {
061        super.init(services);
062        try {
063            zk = ZKUtils.register(this);
064        }
065        catch (Exception ex) {
066            throw new ServiceException(ErrorCode.E1700, ex.getMessage(), ex);
067        }
068        log = XLog.getLog(this.getClass());
069    }
070
071    /**
072     * Destroy the log streaming service.
073     */
074    @Override
075    public void destroy() {
076        if (zk != null) {
077            zk.unregister(this);
078        }
079        zk = null;
080        super.destroy();
081    }
082
083    /**
084     * Instruments the log streaming service.
085     *
086     * @param instr instrumentation to use.
087     */
088    @Override
089    public void instrument(Instrumentation instr) {
090        super.instrument(instr);
091    }
092
093    /**
094     * Stream the log of a job.  It contacts any other running Oozie servers to collate relevant logs while streaming.
095     *
096     * @param logStreamer the log streamer
097     * @param startTime start time for log events to filter.
098     * @param endTime end time for log events to filter.
099     * @param writer writer to stream the log to.
100     * @throws IOException thrown if the log cannot be streamed.
101     */
102    @Override
103    public void streamLog(XLogStreamer logStreamer, Date startTime, Date endTime, Writer writer) throws IOException {
104
105        if (!logStreamer.isLogEnabled()) {
106            writer.write(logStreamer.getLogDisableMessage());
107            return;
108        }
109        // If ALL_SERVERS_PARAM is set to false, then only stream our log
110        if (!Services.get().get(JobsConcurrencyService.class).isAllServerRequest(logStreamer.getRequestParam())) {
111            super.streamLog(logStreamer, startTime, endTime, writer, false);
112        }
113        // Otherwise, we have to go collate relevant logs from the other Oozie servers
114        else {
115            collateLogs(logStreamer, startTime, endTime, writer);
116        }
117    }
118
119    /**
120     * Contacts each of the other Oozie servers, gets their logs for the job, collates them, and sends them to the user via the
121     * Writer.  It will make sure to not read all of the log messages into memory at the same time to not use up the heap.  If there
122     * is a problem talking to one of the other servers, it will ignore that server and prepend a message to the Writer about it.
123     * For getting the logs from this server, it won't use the REST API and instead get them directly to be more efficient.
124     *
125     * @param logStreamer the XLogStreamer
126     * @param startTime the job start time
127     * @param endTime the job end time
128     * @param writer the writer
129     * @throws IOException Signals that an I/O exception has occurred.
130     */
131    private void collateLogs(XLogStreamer logStreamer, Date startTime, Date endTime, Writer writer) throws IOException {
132        List<String> badOozies = new ArrayList<String>();
133        List<ServiceInstance<Map>> oozies = null;
134        try {
135            oozies = zk.getAllMetaData();
136        }
137        catch (Exception ex) {
138            throw new IOException("Issue communicating with ZooKeeper: " + ex.getMessage(), ex);
139        }
140        List<TimestampedMessageParser> parsers = new ArrayList<TimestampedMessageParser>(oozies.size());
141        try {
142            // Create a BufferedReader for getting the logs of each server and put them in a TimestampedMessageParser
143            for (ServiceInstance<Map> oozie : oozies) {
144                Map<String, String> oozieMeta = oozie.getPayload();
145                String otherId = oozieMeta.get(ZKUtils.ZKMetadataKeys.OOZIE_ID);
146                // If it's this server, we can just get them directly
147                if (otherId.equals(zk.getZKId())) {
148                    BufferedReader reader = logStreamer.makeReader(startTime,
149                            endTime);
150                    parsers.add(new TimestampedMessageParser(reader, logStreamer.getXLogFilter()));
151                }
152                // If it's another server, we'll have to use the REST API
153                else {
154                    String otherUrl = oozieMeta.get(ZKUtils.ZKMetadataKeys.OOZIE_URL);
155                    String jobId = logStreamer.getXLogFilter().getFilterParams().get(DagXLogInfoService.JOB);
156                    try {
157                     // It's important that we specify ALL_SERVERS_PARAM=false in the GET request to prevent the other Oozie
158                     // Server from trying aggregate logs from the other Oozie servers (and creating an infinite recursion)
159                        final String url = otherUrl + "/v" + OozieClient.WS_PROTOCOL_VERSION + "/" + RestConstants.JOB
160                                + "/" + jobId + "?" + RestConstants.JOB_SHOW_PARAM + "=" + logStreamer.getLogType()
161                                + "&" + RestConstants.ALL_SERVER_REQUEST + "=false"
162                                + AuthUrlClient.getQueryParamString(logStreamer.getRequestParam());
163                        // remove doAs from url to avoid failure while fetching
164                        // logs in case of HA mode
165                        String key = "doAs";
166                        String[] value = null;
167                        if (logStreamer.getRequestParam() != null) {
168                            value = logStreamer.getRequestParam().get(key);
169                        }
170                        String urlWithoutdoAs = null;
171                        if (value != null && value.length > 0 && value[0] != null && value[0].length() > 0) {
172                            urlWithoutdoAs = url.replace("&" + key + "=" + URLEncoder.encode(value[0], "UTF-8"), "");
173                        }
174                        else {
175                            urlWithoutdoAs = url;
176                        }
177                        BufferedReader reader = AuthUrlClient.callServer(urlWithoutdoAs);
178                        parsers.add(new SimpleTimestampedMessageParser(reader, logStreamer.getXLogFilter()));
179                    }
180                    catch(IOException ioe) {
181                        log.warn("Failed to retrieve logs for job [" + jobId + "] from Oozie server with ID [" + otherId
182                                + "] at [" + otherUrl + "]; log information may be incomplete", ioe);
183                        badOozies.add(otherId);
184                    }
185                }
186            }
187
188            //If log param debug is set, we need to write start date and end date to outputstream.
189            if(!StringUtils.isEmpty(logStreamer.getXLogFilter().getTruncatedMessage())){
190                writer.write(logStreamer.getXLogFilter().getTruncatedMessage());
191            }
192
193            if (logStreamer.getXLogFilter().isDebugMode()) {
194                writer.write(logStreamer.getXLogFilter().getDebugMessage());
195            }
196            // Add a message about any servers we couldn't contact
197            if (!badOozies.isEmpty()) {
198                writer.write("Unable to contact the following Oozie Servers for logs (log information may be incomplete):\n");
199                for (String badOozie : badOozies) {
200                    writer.write("     ");
201                    writer.write(badOozie);
202                    writer.write("\n");
203                }
204                writer.write("\n");
205                writer.flush();
206            }
207
208            // If it's just the one server (this server), then we don't need to do any more processing and can just copy it directly
209            if (parsers.size() == 1) {
210                TimestampedMessageParser parser = parsers.get(0);
211                parser.processRemaining(writer, logStreamer);
212            }
213            else {
214                // Now that we have a Reader for each server to get the logs from that server, we have to collate them.  Within each
215                // server, the logs should already be in the correct order, so we can take advantage of that.  We'll use the
216                // BufferedReaders to read the messages from the logs of each server and put them in order without having to bring
217                // every message into memory at the same time.
218                TreeMap<String, TimestampedMessageParser> timestampMap = new TreeMap<String, TimestampedMessageParser>();
219                // populate timestampMap with initial values
220                for (TimestampedMessageParser parser : parsers) {
221                    if (parser.increment()) {
222                        timestampMap.put(parser.getLastTimestamp(), parser);
223                    }
224                }
225                while (timestampMap.size() > 1) {
226                    // The first entry will be the earliest based on the timestamp (also removes it) from the map
227                    TimestampedMessageParser earliestParser = timestampMap.pollFirstEntry().getValue();
228                    // Write the message from that parser at that timestamp
229                    writer.write(earliestParser.getLastMessage());
230                    if (logStreamer.shouldFlushOutput(earliestParser.getLastMessage().length())) {
231                        writer.flush();
232                    }
233                    // Increment that parser to read the next message
234                    if (earliestParser.increment()) {
235                        // If it still has messages left, put it back in the map with the new last timestamp for it
236                        timestampMap.put(earliestParser.getLastTimestamp(), earliestParser);
237                    }
238                }
239                // If there's only one parser left in the map, then we can simply copy the rest of its lines directly to be faster
240                if (timestampMap.size() == 1) {
241                    TimestampedMessageParser parser = timestampMap.values().iterator().next();
242                    writer.write(parser.getLastMessage());  // don't forget the last message read by the parser
243                    parser.processRemaining(writer, logStreamer);
244                }
245            }
246        }
247        finally {
248            for (TimestampedMessageParser parser : parsers) {
249                parser.closeReader();
250            }
251        }
252    }
253}