From f25e39c8b7b715fb098513af10e4fbf64d734fc1 Mon Sep 17 00:00:00 2001 From: Steven Chen Date: Thu, 12 Oct 2017 14:05:48 +0800 Subject: [PATCH 1/3] Add limit to query result buffer --- docs/content/configuration/broker.md | 2 + .../java/io/druid/query/QueryContexts.java | 43 ++++++--- .../io/druid/client/DirectDruidClient.java | 90 +++++++++++++++---- .../server/initialization/ServerConfig.java | 45 ++++++++-- 4 files changed, 149 insertions(+), 31 deletions(-) diff --git a/docs/content/configuration/broker.md b/docs/content/configuration/broker.md index af97d2dffa50..c34de70c6b32 100644 --- a/docs/content/configuration/broker.md +++ b/docs/content/configuration/broker.md @@ -44,6 +44,8 @@ Druid uses Jetty to serve HTTP requests. |`druid.broker.http.numConnections`|Size of connection pool for the Broker to connect to historical and real-time processes. If there are more queries than this number that all need to speak to the same node, then they will queue up.|20| |`druid.broker.http.compressionCodec`|Compression codec the Broker uses to communicate with historical and real-time processes. May be "gzip" or "identity".|gzip| |`druid.broker.http.readTimeout`|The timeout for data reads from historical and real-time processes.|PT15M| +|`druid.broker.http.maxBufferSizeBytes`|The maximum number of bytes collected from query nodes to buffer at broker to execute a query. Compared to `maxScatterGatherBytes`, this advanced configuration protects broker having OOMs without aborting the query immediately when reaching maximum buffer size limit. This limit can be further reduced at query time using `maxBufferSizeBytes` in the context. |Long.MAX_VALUE| +|`druid.broker.http.queryBufferingTimeout`|The timeout in milliseconds, beyond which broker will stop waiting for buffer space to accept data from query nodes and abort the query. This timeout can be further adjusted at query time using `queryBufferingTimeout` in the context |300000| #### Retry Policy diff --git a/processing/src/main/java/io/druid/query/QueryContexts.java b/processing/src/main/java/io/druid/query/QueryContexts.java index b56812d8b0e5..b29d088d0692 100644 --- a/processing/src/main/java/io/druid/query/QueryContexts.java +++ b/processing/src/main/java/io/druid/query/QueryContexts.java @@ -33,6 +33,8 @@ public class QueryContexts public static final String MAX_SCATTER_GATHER_BYTES_KEY = "maxScatterGatherBytes"; public static final String DEFAULT_TIMEOUT_KEY = "defaultTimeout"; public static final String CHUNK_PERIOD_KEY = "chunkPeriod"; + public static final String MAX_BUFFER_SIZE_BYTES = "maxBufferSizeBytes"; + public static final String QUERY_BUFFERING_TIMEOUT_KEY = "queryBufferingTimeout"; public static final boolean DEFAULT_BY_SEGMENT = false; public static final boolean DEFAULT_POPULATE_CACHE = true; @@ -112,31 +114,52 @@ public static String getChunkPeriod(Query query) return query.getContextValue(CHUNK_PERIOD_KEY, "P0D"); } - public static Query withMaxScatterGatherBytes(Query query, long maxScatterGatherBytesLimit) + private static Query withCustomizedArgument(Query query, String key, long value) { - Object obj = query.getContextValue(MAX_SCATTER_GATHER_BYTES_KEY); + Object obj = query.getContextValue(key); if (obj == null) { - return query.withOverriddenContext(ImmutableMap.of(MAX_SCATTER_GATHER_BYTES_KEY, maxScatterGatherBytesLimit)); + return query.withOverriddenContext(ImmutableMap.of(key, value)); } else { long curr = ((Number) obj).longValue(); - if (curr > maxScatterGatherBytesLimit) { - throw new IAE( - "configured [%s = %s] is more than enforced limit of [%s].", - MAX_SCATTER_GATHER_BYTES_KEY, - curr, - maxScatterGatherBytesLimit - ); + if (curr > value) { + String err = "configured [%s = %s] is more than enforced limit of [%s]."; + throw new IAE(err, key, curr, value); } else { return query; } } } + public static Query withMaxScatterGatherBytes(Query query, long maxScatterGatherBytesLimit) + { + return withCustomizedArgument(query, MAX_SCATTER_GATHER_BYTES_KEY, maxScatterGatherBytesLimit); + } + public static long getMaxScatterGatherBytes(Query query) { return parseLong(query, MAX_SCATTER_GATHER_BYTES_KEY, Long.MAX_VALUE); } + public static Query withMaxBufferSizeBytes(Query query, long maxBufferSizeBytes) + { + return withCustomizedArgument(query, MAX_BUFFER_SIZE_BYTES, maxBufferSizeBytes); + } + + public static long getMaxBufferSizeBytes(Query query) + { + return parseLong(query, MAX_BUFFER_SIZE_BYTES, Long.MAX_VALUE); + } + + public static Query withQueryBufferingTimeout(Query query, long bufferingTimeout) + { + return query.withOverriddenContext(ImmutableMap.of(QUERY_BUFFERING_TIMEOUT_KEY, bufferingTimeout)); + } + + public static long getQueryBufferingTimeout(Query query) + { + return parseLong(query, QUERY_BUFFERING_TIMEOUT_KEY, DEFAULT_TIMEOUT_MILLIS); + } + public static boolean hasTimeout(Query query) { return getTimeout(query) != NO_TIMEOUT; diff --git a/server/src/main/java/io/druid/client/DirectDruidClient.java b/server/src/main/java/io/druid/client/DirectDruidClient.java index 16d01a6be6b8..e5d39acb38c4 100644 --- a/server/src/main/java/io/druid/client/DirectDruidClient.java +++ b/server/src/main/java/io/druid/client/DirectDruidClient.java @@ -123,7 +123,9 @@ public static > QueryType withDefaultTimeoutAndMax { return (QueryType) QueryContexts.withMaxScatterGatherBytes( QueryContexts.withDefaultTimeout( - (Query) query, + QueryContexts.withQueryBufferingTimeout( + QueryContexts.withMaxBufferSizeBytes((Query) query, serverConfig.getMaxBufferSizeBytes()), + serverConfig.getQueryBufferingTimeout()), serverConfig.getDefaultQueryTimeout() ), serverConfig.getMaxScatterGatherBytes() @@ -214,14 +216,17 @@ public Sequence run(final QueryPlus queryPlus, final Map c final long requestStartTimeNs = System.nanoTime(); - long timeoutAt = ((Long) context.get(QUERY_FAIL_TIME)).longValue(); - long maxScatterGatherBytes = QueryContexts.getMaxScatterGatherBytes(query); + final long timeoutAt = ((Long) context.get(QUERY_FAIL_TIME)).longValue(); + final long maxScatterGatherBytes = QueryContexts.getMaxScatterGatherBytes(query); + final long maxBufferSizeBytes = QueryContexts.getMaxBufferSizeBytes(query); + final long queryBufferingTimeout = QueryContexts.getQueryBufferingTimeout(query); AtomicLong totalBytesGathered = (AtomicLong) context.get(QUERY_TOTAL_BYTES_GATHERED); + AtomicLong totalBufferedBytes = new AtomicLong(0); final HttpResponseHandler responseHandler = new HttpResponseHandler() { private final AtomicLong byteCount = new AtomicLong(0); - private final BlockingQueue queue = new LinkedBlockingQueue<>(); + private final BlockingQueue queue = new LinkedBlockingQueue<>(); private final AtomicBoolean done = new AtomicBoolean(false); private final AtomicReference fail = new AtomicReference<>(); @@ -241,7 +246,9 @@ private QueryMetrics> acquireResponseMetrics() public ClientResponse handleResponse(HttpResponse response) { checkQueryTimeout(); - checkTotalBytesLimit(response.getContent().readableBytes()); + + final long bytes = response.getContent().readableBytes(); + checkTotalBytesLimit(bytes); log.debug("Initial response from url[%s] for queryId[%s]", url, query.getId()); responseStartTimeNs = System.nanoTime(); @@ -257,7 +264,9 @@ public ClientResponse handleResponse(HttpResponse response) ) ); } - queue.put(new ChannelBufferInputStream(response.getContent())); + + checkBufferCapacity(bytes, queryBufferingTimeout); + queue.put(new BufferedStream(new ChannelBufferInputStream(response.getContent()), bytes)); } catch (final IOException e) { log.error(e, "Error parsing response context from url [%s]", url); @@ -277,7 +286,7 @@ public int read() throws IOException Thread.currentThread().interrupt(); throw Throwables.propagate(e); } - byteCount.addAndGet(response.getContent().readableBytes()); + byteCount.addAndGet(bytes); return ClientResponse.finished( new SequenceInputStream( new Enumeration() @@ -305,9 +314,10 @@ public InputStream nextElement() } try { - InputStream is = queue.poll(checkQueryTimeout(), TimeUnit.MILLISECONDS); - if (is != null) { - return is; + BufferedStream stream = queue.poll(checkQueryTimeout(), TimeUnit.MILLISECONDS); + totalBufferedBytes.addAndGet(-stream.getBytes()); + if (stream.getInputStream() != null) { + return stream.getInputStream(); } else { throw new RE("Query[%s] url[%s] timed out.", query.getId(), url); } @@ -331,12 +341,12 @@ public ClientResponse handleChunk( final ChannelBuffer channelBuffer = chunk.getContent(); final int bytes = channelBuffer.readableBytes(); - checkTotalBytesLimit(bytes); if (bytes > 0) { try { - queue.put(new ChannelBufferInputStream(channelBuffer)); + checkBufferCapacity(bytes, queryBufferingTimeout); + queue.put(new BufferedStream(new ChannelBufferInputStream(channelBuffer), bytes)); } catch (InterruptedException e) { log.error(e, "Unable to put finalizing input stream into Sequence queue for url [%s]", url); @@ -370,7 +380,7 @@ public ClientResponse done(ClientResponse clientRespon try { // An empty byte array is put at the end to give the SequenceInputStream.close() as something to close out // after done is set to true, regardless of the rest of the stream's state. - queue.put(ByteSource.empty().openStream()); + queue.put(new BufferedStream(ByteSource.empty().openStream(), 0)); } catch (InterruptedException e) { log.error(e, "Unable to put finalizing input stream into Sequence queue for url [%s]", url); @@ -404,7 +414,8 @@ private void setupResponseReadFailure(String msg, Throwable th) { fail.set(msg); queue.clear(); - queue.offer(new InputStream() + totalBufferedBytes.set(0); + queue.offer(new BufferedStream(new InputStream() { @Override public int read() throws IOException @@ -415,7 +426,7 @@ public int read() throws IOException throw new IOException(msg); } } - }); + }, 0)); } @@ -444,6 +455,33 @@ private void checkTotalBytesLimit(long bytes) throw new RE(msg); } } + + private long getBufferCapacity() + { + return maxBufferSizeBytes - totalBufferedBytes.get(); + } + + private void checkBufferCapacity(long bytes, long timeoutMs) + { + final long startTimeMs = System.currentTimeMillis(); + boolean isTimeout = false; + while (maxBufferSizeBytes < Long.MAX_VALUE && getBufferCapacity() < bytes && !isTimeout) { + if (System.currentTimeMillis() - startTimeMs > timeoutMs) { + isTimeout = true; + } + } + if (isTimeout) { + String msg = StringUtils.format( + "Query[%s] url[%s] max buffer limit reached: waiting for free buffer timeout", + query.getId(), + url + ); + setupResponseReadFailure(msg, null); + throw new RE(msg); + } else { + totalBufferedBytes.addAndGet(bytes); + } + } }; long timeLeft = timeoutAt - System.currentTimeMillis(); @@ -658,6 +696,28 @@ public void close() throws IOException } } + private class BufferedStream + { + private final InputStream inputStream; + private final long bytes; + + public BufferedStream(InputStream is, long size) + { + inputStream = is; + bytes = size; + } + + public InputStream getInputStream() + { + return inputStream; + } + + public long getBytes() + { + return bytes; + } + } + @Override public String toString() { diff --git a/server/src/main/java/io/druid/server/initialization/ServerConfig.java b/server/src/main/java/io/druid/server/initialization/ServerConfig.java index d5db2cd2c530..3fa157b0811a 100644 --- a/server/src/main/java/io/druid/server/initialization/ServerConfig.java +++ b/server/src/main/java/io/druid/server/initialization/ServerConfig.java @@ -53,6 +53,14 @@ public class ServerConfig @Min(1) private long maxScatterGatherBytes = Long.MAX_VALUE; + @JsonProperty + @Min(1) + private long maxBufferSizeBytes = Long.MAX_VALUE; + + @JsonProperty + @Min(1) + private long queryBufferingTimeout = 300_000; + public int getNumThreads() { return numThreads; @@ -83,6 +91,16 @@ public long getMaxScatterGatherBytes() return maxScatterGatherBytes; } + public long getMaxBufferSizeBytes() + { + return maxBufferSizeBytes; + } + + public long getQueryBufferingTimeout() + { + return queryBufferingTimeout; + } + @Override public boolean equals(Object o) { @@ -98,6 +116,8 @@ public boolean equals(Object o) enableRequestLimit == that.enableRequestLimit && defaultQueryTimeout == that.defaultQueryTimeout && maxScatterGatherBytes == that.maxScatterGatherBytes && + maxBufferSizeBytes == that.maxBufferSizeBytes && + queryBufferingTimeout == that.queryBufferingTimeout && Objects.equals(maxIdleTime, that.maxIdleTime); } @@ -105,12 +125,25 @@ public boolean equals(Object o) public int hashCode() { return Objects.hash( - numThreads, - queueSize, - enableRequestLimit, - maxIdleTime, - defaultQueryTimeout, - maxScatterGatherBytes + numThreads, + maxIdleTime, + defaultQueryTimeout, + maxScatterGatherBytes, + maxBufferSizeBytes, + queryBufferingTimeout ); } + + @Override + public String toString() + { + return "ServerConfig{" + + "numThreads=" + numThreads + + ", maxIdleTime=" + maxIdleTime + + ", defaultQueryTimeout=" + defaultQueryTimeout + + ", maxScatterGatherBytes=" + maxScatterGatherBytes + + ", maxBufferSizeBytes=" + maxBufferSizeBytes + + ", queryBufferingTimeout=" + queryBufferingTimeout + + '}'; + } } From 812096bc7fcba1d2f7d12bb9c1d3371fbc6a1c1e Mon Sep 17 00:00:00 2001 From: Steven Chen Date: Tue, 31 Oct 2017 11:15:36 +0800 Subject: [PATCH 2/3] refactor: rename --- .../java/io/druid/query/QueryContexts.java | 12 +++---- .../io/druid/client/DirectDruidClient.java | 35 ++++++++----------- 2 files changed, 21 insertions(+), 26 deletions(-) diff --git a/processing/src/main/java/io/druid/query/QueryContexts.java b/processing/src/main/java/io/druid/query/QueryContexts.java index b29d088d0692..e85e3dea4d2d 100644 --- a/processing/src/main/java/io/druid/query/QueryContexts.java +++ b/processing/src/main/java/io/druid/query/QueryContexts.java @@ -114,16 +114,16 @@ public static String getChunkPeriod(Query query) return query.getContextValue(CHUNK_PERIOD_KEY, "P0D"); } - private static Query withCustomizedArgument(Query query, String key, long value) + private static Query withCustomizedLimit(Query query, String key, long maxLimit) { Object obj = query.getContextValue(key); if (obj == null) { - return query.withOverriddenContext(ImmutableMap.of(key, value)); + return query.withOverriddenContext(ImmutableMap.of(key, maxLimit)); } else { long curr = ((Number) obj).longValue(); - if (curr > value) { + if (curr > maxLimit) { String err = "configured [%s = %s] is more than enforced limit of [%s]."; - throw new IAE(err, key, curr, value); + throw new IAE(err, key, curr, maxLimit); } else { return query; } @@ -132,7 +132,7 @@ private static Query withCustomizedArgument(Query query, String key, l public static Query withMaxScatterGatherBytes(Query query, long maxScatterGatherBytesLimit) { - return withCustomizedArgument(query, MAX_SCATTER_GATHER_BYTES_KEY, maxScatterGatherBytesLimit); + return withCustomizedLimit(query, MAX_SCATTER_GATHER_BYTES_KEY, maxScatterGatherBytesLimit); } public static long getMaxScatterGatherBytes(Query query) @@ -142,7 +142,7 @@ public static long getMaxScatterGatherBytes(Query query) public static Query withMaxBufferSizeBytes(Query query, long maxBufferSizeBytes) { - return withCustomizedArgument(query, MAX_BUFFER_SIZE_BYTES, maxBufferSizeBytes); + return withCustomizedLimit(query, MAX_BUFFER_SIZE_BYTES, maxBufferSizeBytes); } public static long getMaxBufferSizeBytes(Query query) diff --git a/server/src/main/java/io/druid/client/DirectDruidClient.java b/server/src/main/java/io/druid/client/DirectDruidClient.java index e5d39acb38c4..583e3b0514d2 100644 --- a/server/src/main/java/io/druid/client/DirectDruidClient.java +++ b/server/src/main/java/io/druid/client/DirectDruidClient.java @@ -247,8 +247,8 @@ public ClientResponse handleResponse(HttpResponse response) { checkQueryTimeout(); - final long bytes = response.getContent().readableBytes(); - checkTotalBytesLimit(bytes); + final long totalBytes = response.getContent().readableBytes(); + checkTotalBytesLimit(totalBytes); log.debug("Initial response from url[%s] for queryId[%s]", url, query.getId()); responseStartTimeNs = System.nanoTime(); @@ -265,8 +265,8 @@ public ClientResponse handleResponse(HttpResponse response) ); } - checkBufferCapacity(bytes, queryBufferingTimeout); - queue.put(new BufferedStream(new ChannelBufferInputStream(response.getContent()), bytes)); + checkBufferCapacity(totalBytes, queryBufferingTimeout); + queue.put(new BufferedStream(new ChannelBufferInputStream(response.getContent()), totalBytes)); } catch (final IOException e) { log.error(e, "Error parsing response context from url [%s]", url); @@ -286,7 +286,7 @@ public int read() throws IOException Thread.currentThread().interrupt(); throw Throwables.propagate(e); } - byteCount.addAndGet(bytes); + byteCount.addAndGet(totalBytes); return ClientResponse.finished( new SequenceInputStream( new Enumeration() @@ -461,26 +461,21 @@ private long getBufferCapacity() return maxBufferSizeBytes - totalBufferedBytes.get(); } - private void checkBufferCapacity(long bytes, long timeoutMs) + private void checkBufferCapacity(long requestedBytes, long timeoutMs) { final long startTimeMs = System.currentTimeMillis(); - boolean isTimeout = false; - while (maxBufferSizeBytes < Long.MAX_VALUE && getBufferCapacity() < bytes && !isTimeout) { + while (maxBufferSizeBytes < Long.MAX_VALUE && getBufferCapacity() < requestedBytes) { if (System.currentTimeMillis() - startTimeMs > timeoutMs) { - isTimeout = true; + String msg = StringUtils.format( + "Query[%s] url[%s] max buffer limit reached: waiting for free buffer timeout", + query.getId(), + url + ); + setupResponseReadFailure(msg, null); + throw new RE(msg); } } - if (isTimeout) { - String msg = StringUtils.format( - "Query[%s] url[%s] max buffer limit reached: waiting for free buffer timeout", - query.getId(), - url - ); - setupResponseReadFailure(msg, null); - throw new RE(msg); - } else { - totalBufferedBytes.addAndGet(bytes); - } + totalBufferedBytes.addAndGet(requestedBytes); } }; From 9dac62bc83fa2f7f0711ff57288473ea07d017d8 Mon Sep 17 00:00:00 2001 From: Steven Chen Date: Wed, 1 Nov 2017 15:54:18 +0800 Subject: [PATCH 3/3] fix: build failure due to non-static inner class --- server/src/main/java/io/druid/client/DirectDruidClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/io/druid/client/DirectDruidClient.java b/server/src/main/java/io/druid/client/DirectDruidClient.java index 583e3b0514d2..f543dfcd13a5 100644 --- a/server/src/main/java/io/druid/client/DirectDruidClient.java +++ b/server/src/main/java/io/druid/client/DirectDruidClient.java @@ -691,7 +691,7 @@ public void close() throws IOException } } - private class BufferedStream + private static class BufferedStream { private final InputStream inputStream; private final long bytes;