Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/content/configuration/broker.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ Druid uses Jetty to serve HTTP requests.
|`druid.broker.http.numConnections`|Size of connection pool for the Broker to connect to historical and real-time processes. If there are more queries than this number that all need to speak to the same node, then they will queue up.|20|
|`druid.broker.http.compressionCodec`|Compression codec the Broker uses to communicate with historical and real-time processes. May be "gzip" or "identity".|gzip|
|`druid.broker.http.readTimeout`|The timeout for data reads from historical and real-time processes.|PT15M|
|`druid.broker.http.maxBufferSizeBytes`|The maximum number of bytes collected from query nodes to buffer at broker to execute a query. Compared to `maxScatterGatherBytes`, this advanced configuration protects broker having OOMs without aborting the query immediately when reaching maximum buffer size limit. This limit can be further reduced at query time using `maxBufferSizeBytes` in the context. |Long.MAX_VALUE|
|`druid.broker.http.queryBufferingTimeout`|The timeout in milliseconds, beyond which broker will stop waiting for buffer space to accept data from query nodes and abort the query. This timeout can be further adjusted at query time using `queryBufferingTimeout` in the context |300000|

#### Retry Policy

Expand Down
43 changes: 33 additions & 10 deletions processing/src/main/java/io/druid/query/QueryContexts.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public class QueryContexts
public static final String MAX_SCATTER_GATHER_BYTES_KEY = "maxScatterGatherBytes";
public static final String DEFAULT_TIMEOUT_KEY = "defaultTimeout";
public static final String CHUNK_PERIOD_KEY = "chunkPeriod";
public static final String MAX_BUFFER_SIZE_BYTES = "maxBufferSizeBytes";
public static final String QUERY_BUFFERING_TIMEOUT_KEY = "queryBufferingTimeout";

public static final boolean DEFAULT_BY_SEGMENT = false;
public static final boolean DEFAULT_POPULATE_CACHE = true;
Expand Down Expand Up @@ -112,31 +114,52 @@ public static <T> String getChunkPeriod(Query<T> query)
return query.getContextValue(CHUNK_PERIOD_KEY, "P0D");
}

public static <T> Query<T> withMaxScatterGatherBytes(Query<T> query, long maxScatterGatherBytesLimit)
private static <T> Query<T> withCustomizedLimit(Query<T> query, String key, long maxLimit)
{
Object obj = query.getContextValue(MAX_SCATTER_GATHER_BYTES_KEY);
Object obj = query.getContextValue(key);
if (obj == null) {
return query.withOverriddenContext(ImmutableMap.of(MAX_SCATTER_GATHER_BYTES_KEY, maxScatterGatherBytesLimit));
return query.withOverriddenContext(ImmutableMap.of(key, maxLimit));
} else {
long curr = ((Number) obj).longValue();
if (curr > maxScatterGatherBytesLimit) {
throw new IAE(
"configured [%s = %s] is more than enforced limit of [%s].",
MAX_SCATTER_GATHER_BYTES_KEY,
curr,
maxScatterGatherBytesLimit
);
if (curr > maxLimit) {
String err = "configured [%s = %s] is more than enforced limit of [%s].";
throw new IAE(err, key, curr, maxLimit);
} else {
return query;
}
}
}

public static <T> Query<T> withMaxScatterGatherBytes(Query<T> query, long maxScatterGatherBytesLimit)
{
return withCustomizedLimit(query, MAX_SCATTER_GATHER_BYTES_KEY, maxScatterGatherBytesLimit);
}

public static <T> long getMaxScatterGatherBytes(Query<T> query)
{
return parseLong(query, MAX_SCATTER_GATHER_BYTES_KEY, Long.MAX_VALUE);
}

public static <T> Query<T> withMaxBufferSizeBytes(Query<T> query, long maxBufferSizeBytes)
{
return withCustomizedLimit(query, MAX_BUFFER_SIZE_BYTES, maxBufferSizeBytes);
}

public static <T> long getMaxBufferSizeBytes(Query<T> query)
{
return parseLong(query, MAX_BUFFER_SIZE_BYTES, Long.MAX_VALUE);
}

public static <T> Query<T> withQueryBufferingTimeout(Query<T> query, long bufferingTimeout)
{
return query.withOverriddenContext(ImmutableMap.of(QUERY_BUFFERING_TIMEOUT_KEY, bufferingTimeout));
}

public static <T> long getQueryBufferingTimeout(Query<T> query)
{
return parseLong(query, QUERY_BUFFERING_TIMEOUT_KEY, DEFAULT_TIMEOUT_MILLIS);
}

public static <T> boolean hasTimeout(Query<T> query)
{
return getTimeout(query) != NO_TIMEOUT;
Expand Down
85 changes: 70 additions & 15 deletions server/src/main/java/io/druid/client/DirectDruidClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,9 @@ public static <T, QueryType extends Query<T>> QueryType withDefaultTimeoutAndMax
{
return (QueryType) QueryContexts.withMaxScatterGatherBytes(
QueryContexts.withDefaultTimeout(
(Query) query,
QueryContexts.withQueryBufferingTimeout(
QueryContexts.withMaxBufferSizeBytes((Query) query, serverConfig.getMaxBufferSizeBytes()),
serverConfig.getQueryBufferingTimeout()),
serverConfig.getDefaultQueryTimeout()
),
serverConfig.getMaxScatterGatherBytes()
Expand Down Expand Up @@ -214,14 +216,17 @@ public Sequence<T> run(final QueryPlus<T> queryPlus, final Map<String, Object> c

final long requestStartTimeNs = System.nanoTime();

long timeoutAt = ((Long) context.get(QUERY_FAIL_TIME)).longValue();
long maxScatterGatherBytes = QueryContexts.getMaxScatterGatherBytes(query);
final long timeoutAt = ((Long) context.get(QUERY_FAIL_TIME)).longValue();
final long maxScatterGatherBytes = QueryContexts.getMaxScatterGatherBytes(query);
final long maxBufferSizeBytes = QueryContexts.getMaxBufferSizeBytes(query);
final long queryBufferingTimeout = QueryContexts.getQueryBufferingTimeout(query);
AtomicLong totalBytesGathered = (AtomicLong) context.get(QUERY_TOTAL_BYTES_GATHERED);
AtomicLong totalBufferedBytes = new AtomicLong(0);

final HttpResponseHandler<InputStream, InputStream> responseHandler = new HttpResponseHandler<InputStream, InputStream>()
{
private final AtomicLong byteCount = new AtomicLong(0);
private final BlockingQueue<InputStream> queue = new LinkedBlockingQueue<>();
private final BlockingQueue<BufferedStream> queue = new LinkedBlockingQueue<>();
private final AtomicBoolean done = new AtomicBoolean(false);
private final AtomicReference<String> fail = new AtomicReference<>();

Expand All @@ -241,7 +246,9 @@ private QueryMetrics<? super Query<T>> acquireResponseMetrics()
public ClientResponse<InputStream> handleResponse(HttpResponse response)
{
checkQueryTimeout();
checkTotalBytesLimit(response.getContent().readableBytes());

final long totalBytes = response.getContent().readableBytes();
checkTotalBytesLimit(totalBytes);

log.debug("Initial response from url[%s] for queryId[%s]", url, query.getId());
responseStartTimeNs = System.nanoTime();
Expand All @@ -257,7 +264,9 @@ public ClientResponse<InputStream> handleResponse(HttpResponse response)
)
);
}
queue.put(new ChannelBufferInputStream(response.getContent()));

checkBufferCapacity(totalBytes, queryBufferingTimeout);
queue.put(new BufferedStream(new ChannelBufferInputStream(response.getContent()), totalBytes));
}
catch (final IOException e) {
log.error(e, "Error parsing response context from url [%s]", url);
Expand All @@ -277,7 +286,7 @@ public int read() throws IOException
Thread.currentThread().interrupt();
throw Throwables.propagate(e);
}
byteCount.addAndGet(response.getContent().readableBytes());
byteCount.addAndGet(totalBytes);
return ClientResponse.<InputStream>finished(
new SequenceInputStream(
new Enumeration<InputStream>()
Expand Down Expand Up @@ -305,9 +314,10 @@ public InputStream nextElement()
}

try {
InputStream is = queue.poll(checkQueryTimeout(), TimeUnit.MILLISECONDS);
if (is != null) {
return is;
BufferedStream stream = queue.poll(checkQueryTimeout(), TimeUnit.MILLISECONDS);
totalBufferedBytes.addAndGet(-stream.getBytes());
if (stream.getInputStream() != null) {
return stream.getInputStream();
} else {
throw new RE("Query[%s] url[%s] timed out.", query.getId(), url);
}
Expand All @@ -331,12 +341,12 @@ public ClientResponse<InputStream> handleChunk(

final ChannelBuffer channelBuffer = chunk.getContent();
final int bytes = channelBuffer.readableBytes();

checkTotalBytesLimit(bytes);

if (bytes > 0) {
try {
queue.put(new ChannelBufferInputStream(channelBuffer));
checkBufferCapacity(bytes, queryBufferingTimeout);
queue.put(new BufferedStream(new ChannelBufferInputStream(channelBuffer), bytes));
}
catch (InterruptedException e) {
log.error(e, "Unable to put finalizing input stream into Sequence queue for url [%s]", url);
Expand Down Expand Up @@ -370,7 +380,7 @@ public ClientResponse<InputStream> done(ClientResponse<InputStream> clientRespon
try {
// An empty byte array is put at the end to give the SequenceInputStream.close() as something to close out
// after done is set to true, regardless of the rest of the stream's state.
queue.put(ByteSource.empty().openStream());
queue.put(new BufferedStream(ByteSource.empty().openStream(), 0));
}
catch (InterruptedException e) {
log.error(e, "Unable to put finalizing input stream into Sequence queue for url [%s]", url);
Expand Down Expand Up @@ -404,7 +414,8 @@ private void setupResponseReadFailure(String msg, Throwable th)
{
fail.set(msg);
queue.clear();
queue.offer(new InputStream()
totalBufferedBytes.set(0);
queue.offer(new BufferedStream(new InputStream()
{
@Override
public int read() throws IOException
Expand All @@ -415,7 +426,7 @@ public int read() throws IOException
throw new IOException(msg);
}
}
});
}, 0));

}

Expand Down Expand Up @@ -444,6 +455,28 @@ private void checkTotalBytesLimit(long bytes)
throw new RE(msg);
}
}

private long getBufferCapacity()
{
return maxBufferSizeBytes - totalBufferedBytes.get();
}

private void checkBufferCapacity(long requestedBytes, long timeoutMs)
{
final long startTimeMs = System.currentTimeMillis();
while (maxBufferSizeBytes < Long.MAX_VALUE && getBufferCapacity() < requestedBytes) {
if (System.currentTimeMillis() - startTimeMs > timeoutMs) {
String msg = StringUtils.format(
"Query[%s] url[%s] max buffer limit reached: waiting for free buffer timeout",
query.getId(),
url
);
setupResponseReadFailure(msg, null);
throw new RE(msg);
}
}
totalBufferedBytes.addAndGet(requestedBytes);
}
};

long timeLeft = timeoutAt - System.currentTimeMillis();
Expand Down Expand Up @@ -658,6 +691,28 @@ public void close() throws IOException
}
}

private static class BufferedStream
{
private final InputStream inputStream;
private final long bytes;

public BufferedStream(InputStream is, long size)
{
inputStream = is;
bytes = size;
}

public InputStream getInputStream()
{
return inputStream;
}

public long getBytes()
{
return bytes;
}
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,14 @@ public class ServerConfig
@Min(1)
private long maxScatterGatherBytes = Long.MAX_VALUE;

@JsonProperty
@Min(1)
private long maxBufferSizeBytes = Long.MAX_VALUE;

@JsonProperty
@Min(1)
private long queryBufferingTimeout = 300_000;

public int getNumThreads()
{
return numThreads;
Expand Down Expand Up @@ -83,6 +91,16 @@ public long getMaxScatterGatherBytes()
return maxScatterGatherBytes;
}

public long getMaxBufferSizeBytes()
{
return maxBufferSizeBytes;
}

public long getQueryBufferingTimeout()
{
return queryBufferingTimeout;
}

@Override
public boolean equals(Object o)
{
Expand All @@ -98,19 +116,34 @@ public boolean equals(Object o)
enableRequestLimit == that.enableRequestLimit &&
defaultQueryTimeout == that.defaultQueryTimeout &&
maxScatterGatherBytes == that.maxScatterGatherBytes &&
maxBufferSizeBytes == that.maxBufferSizeBytes &&
queryBufferingTimeout == that.queryBufferingTimeout &&
Objects.equals(maxIdleTime, that.maxIdleTime);
}

@Override
public int hashCode()
{
return Objects.hash(
numThreads,
queueSize,
enableRequestLimit,
maxIdleTime,
defaultQueryTimeout,
maxScatterGatherBytes
numThreads,
maxIdleTime,
defaultQueryTimeout,
maxScatterGatherBytes,
maxBufferSizeBytes,
queryBufferingTimeout
);
}

@Override
public String toString()
{
return "ServerConfig{" +
"numThreads=" + numThreads +
", maxIdleTime=" + maxIdleTime +
", defaultQueryTimeout=" + defaultQueryTimeout +
", maxScatterGatherBytes=" + maxScatterGatherBytes +
", maxBufferSizeBytes=" + maxBufferSizeBytes +
", queryBufferingTimeout=" + queryBufferingTimeout +
'}';
}
}