diff --git a/docs/content/querying/query-context.md b/docs/content/querying/query-context.md index d4e2be28f123..8aee6744c398 100644 --- a/docs/content/querying/query-context.md +++ b/docs/content/querying/query-context.md @@ -22,6 +22,7 @@ The query context is used for various query configuration parameters. The follow |chunkPeriod | `P0D` (off) | At the broker node level, long interval queries (of any type) may be broken into shorter interval queries to parallelize merging more than normal. Broken up queries will use a larger share of cluster resources, but may be able to complete faster as a result. Use ISO 8601 periods. For example, if this property is set to `P1M` (one month), then a query covering a year would be broken into 12 smaller queries. The broker uses its query processing executor service to initiate processing for query chunks, so make sure "druid.processing.numThreads" is configured appropriately on the broker. [groupBy queries](groupbyquery.html) do not support chunkPeriod by default, although they do if using the legacy "v1" engine. | |serializeDateTimeAsLong| `false` | If true, DateTime is serialized as long in the result returned by broker and the data transportation between broker and compute node| |serializeDateTimeAsLongInner| `false` | If true, DateTime is serialized as long in the data transportation between broker and compute node| +|enableBrokerBackpressure|`false`|If true, brokers will refuse to accept new http chunks from query nodes until it is ready to process more chunks. This can reduce heap memory pressure on brokers for large query results at the potential expense of query latency| In addition, some query types offer context parameters specific to that query type. diff --git a/processing/src/main/java/io/druid/query/QueryContexts.java b/processing/src/main/java/io/druid/query/QueryContexts.java index 4252ad4537da..a8083e01fa61 100644 --- a/processing/src/main/java/io/druid/query/QueryContexts.java +++ b/processing/src/main/java/io/druid/query/QueryContexts.java @@ -33,12 +33,14 @@ public class QueryContexts public static final String MAX_SCATTER_GATHER_BYTES_KEY = "maxScatterGatherBytes"; public static final String DEFAULT_TIMEOUT_KEY = "defaultTimeout"; public static final String CHUNK_PERIOD_KEY = "chunkPeriod"; + public static final String ENABLE_BROKER_BACKPRESSURE = "enableBrokerBackpressure"; public static final boolean DEFAULT_BY_SEGMENT = false; public static final boolean DEFAULT_POPULATE_CACHE = true; public static final boolean DEFAULT_USE_CACHE = true; public static final boolean DEFAULT_POPULATE_RESULTLEVEL_CACHE = true; public static final boolean DEFAULT_USE_RESULTLEVEL_CACHE = true; + public static final boolean DEFAULT_ENABLE_BROKER_BACKPRESSURE = false; public static final int DEFAULT_PRIORITY = 0; public static final int DEFAULT_UNCOVERED_INTERVALS_LIMIT = 0; public static final long DEFAULT_TIMEOUT_MILLIS = 300_000; // 5 minutes @@ -170,7 +172,6 @@ public static Query verifyMaxQueryTimeout(Query query, long maxQueryTi } - public static long getMaxScatterGatherBytes(Query query) { return parseLong(query, MAX_SCATTER_GATHER_BYTES_KEY, Long.MAX_VALUE); @@ -210,6 +211,11 @@ static long getDefaultTimeout(Query query) return defaultTimeout; } + public static boolean isEnableBrokerBackpressure(Query query) + { + return parseBoolean(query, ENABLE_BROKER_BACKPRESSURE, DEFAULT_ENABLE_BROKER_BACKPRESSURE); + } + static long parseLong(Query query, String key, long defaultValue) { final Object val = query.getContextValue(key); diff --git a/server/src/main/java/io/druid/client/DirectDruidClient.java b/server/src/main/java/io/druid/client/DirectDruidClient.java index 72607f91bf4c..f64a9b1555e1 100644 --- a/server/src/main/java/io/druid/client/DirectDruidClient.java +++ b/server/src/main/java/io/druid/client/DirectDruidClient.java @@ -80,18 +80,19 @@ import java.util.Enumeration; import java.util.Iterator; import java.util.Map; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.CancellationException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.LinkedTransferQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.TransferQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.LongAdder; /** */ @@ -164,19 +165,27 @@ public int getNumOpenConnections() public Sequence run(final QueryPlus queryPlus, final Map context) { final Query query = queryPlus.getQuery(); - QueryToolChest> toolChest = warehouse.getToolChest(query); - boolean isBySegment = QueryContexts.isBySegment(query); - - Pair types = typesMap.get(query.getClass()); - if (types == null) { - final TypeFactory typeFactory = objectMapper.getTypeFactory(); - JavaType baseType = typeFactory.constructType(toolChest.getResultTypeReference()); - JavaType bySegmentType = typeFactory.constructParametricType( - Result.class, typeFactory.constructParametricType(BySegmentResultValueClass.class, baseType) - ); - types = Pair.of(baseType, bySegmentType); - typesMap.put(query.getClass(), types); - } + final QueryToolChest> toolChest = warehouse.getToolChest(query); + final boolean isBySegment = QueryContexts.isBySegment(query); + final boolean isEnableBrokerBackpressure = QueryContexts.isEnableBrokerBackpressure(query); + + final Pair types = typesMap.computeIfAbsent( + query.getClass(), + ignored -> { + final TypeFactory typeFactory = objectMapper.getTypeFactory(); + final JavaType baseType = typeFactory.constructType(toolChest.getResultTypeReference()); + final JavaType bySegmentType = typeFactory.constructParametrizedType( + Result.class, + Result.class, + typeFactory.constructParametrizedType( + BySegmentResultValueClass.class, + BySegmentResultValueClass.class, + baseType + ) + ); + return Pair.of(baseType, bySegmentType); + } + ); final JavaType typeRef; if (isBySegment) { @@ -200,8 +209,8 @@ public Sequence run(final QueryPlus queryPlus, final Map c final HttpResponseHandler responseHandler = new HttpResponseHandler() { - private final AtomicLong byteCount = new AtomicLong(0); - private final BlockingQueue queue = new LinkedBlockingQueue<>(); + private final LongAdder byteCount = new LongAdder(); + private final TransferQueue queue = new LinkedTransferQueue<>(); private final AtomicBoolean done = new AtomicBoolean(false); private final AtomicReference fail = new AtomicReference<>(); @@ -237,11 +246,16 @@ public ClientResponse handleResponse(HttpResponse response) ) ); } - queue.put(new ChannelBufferInputStream(response.getContent())); + final InputStream inputStream = new ChannelBufferInputStream(response.getContent()); + if (isEnableBrokerBackpressure) { + queue.transfer(inputStream); + } else { + queue.put(inputStream); + } } catch (final IOException e) { log.error(e, "Error parsing response context from url [%s]", url); - return ClientResponse.finished( + return ClientResponse.finished( new InputStream() { @Override @@ -257,8 +271,8 @@ public int read() throws IOException Thread.currentThread().interrupt(); throw Throwables.propagate(e); } - byteCount.addAndGet(response.getContent().readableBytes()); - return ClientResponse.finished( + byteCount.add(response.getContent().readableBytes()); + return ClientResponse.finished( new SequenceInputStream( new Enumeration() { @@ -316,14 +330,19 @@ public ClientResponse handleChunk( if (bytes > 0) { try { - queue.put(new ChannelBufferInputStream(channelBuffer)); + final InputStream inputStream = new ChannelBufferInputStream(channelBuffer); + if (isEnableBrokerBackpressure) { + queue.transfer(inputStream); + } else { + queue.put(inputStream); + } } catch (InterruptedException e) { log.error(e, "Unable to put finalizing input stream into Sequence queue for url [%s]", url); Thread.currentThread().interrupt(); throw Throwables.propagate(e); } - byteCount.addAndGet(bytes); + byteCount.add(bytes); } return clientResponse; } @@ -331,25 +350,27 @@ public ClientResponse handleChunk( @Override public ClientResponse done(ClientResponse clientResponse) { - long stopTimeNs = System.nanoTime(); - long nodeTimeNs = stopTimeNs - requestStartTimeNs; + final long stopTimeNs = System.nanoTime(); + final long nodeTimeNs = stopTimeNs - requestStartTimeNs; final long nodeTimeMs = TimeUnit.NANOSECONDS.toMillis(nodeTimeNs); + final long byteCount = this.byteCount.sum(); log.debug( "Completed queryId[%s] request to url[%s] with %,d bytes returned in %,d millis [%,f b/s].", query.getId(), url, - byteCount.get(), + byteCount, nodeTimeMs, - byteCount.get() / (0.001 * nodeTimeMs) // Floating math; division by zero will yield Inf, not exception + byteCount / (0.001 * nodeTimeMs) // Floating math; division by zero will yield Inf, not exception ); - QueryMetrics> responseMetrics = acquireResponseMetrics(); + final QueryMetrics> responseMetrics = acquireResponseMetrics(); responseMetrics.reportNodeTime(nodeTimeNs); - responseMetrics.reportNodeBytes(byteCount.get()); + responseMetrics.reportNodeBytes(byteCount); responseMetrics.emit(emitter); synchronized (done) { try { // An empty byte array is put at the end to give the SequenceInputStream.close() as something to close out // after done is set to true, regardless of the rest of the stream's state. + // We don't need to "transfer" this one because there's no more results to force backpressure with queue.put(ByteSource.empty().openStream()); } catch (InterruptedException e) { @@ -365,7 +386,7 @@ public ClientResponse done(ClientResponse clientRespon done.set(true); } } - return ClientResponse.finished(clientResponse.getObj()); + return ClientResponse.finished(clientResponse.getObj()); } @Override @@ -384,6 +405,7 @@ private void setupResponseReadFailure(String msg, Throwable th) { fail.set(msg); queue.clear(); + // Don't need to transfer because this is a terminal state, so no need to block more items coming in queue.offer(new InputStream() { @Override