From 220d945b95e5c3af6ad2d18e00622b03cb8e6a22 Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Mon, 16 Jul 2018 13:52:24 -0700 Subject: [PATCH 1/3] Allow queries to request backpressure to historicals in query context --- .../java/io/druid/query/QueryContexts.java | 6 ++ .../io/druid/client/DirectDruidClient.java | 66 +++++++++++-------- 2 files changed, 44 insertions(+), 28 deletions(-) diff --git a/processing/src/main/java/io/druid/query/QueryContexts.java b/processing/src/main/java/io/druid/query/QueryContexts.java index 4252ad4537da..7160b8b5204c 100644 --- a/processing/src/main/java/io/druid/query/QueryContexts.java +++ b/processing/src/main/java/io/druid/query/QueryContexts.java @@ -33,12 +33,14 @@ public class QueryContexts public static final String MAX_SCATTER_GATHER_BYTES_KEY = "maxScatterGatherBytes"; public static final String DEFAULT_TIMEOUT_KEY = "defaultTimeout"; public static final String CHUNK_PERIOD_KEY = "chunkPeriod"; + public static final String ENABLE_BROKER_BACKPRESSURE = "enableBrokerBackpressure"; public static final boolean DEFAULT_BY_SEGMENT = false; public static final boolean DEFAULT_POPULATE_CACHE = true; public static final boolean DEFAULT_USE_CACHE = true; public static final boolean DEFAULT_POPULATE_RESULTLEVEL_CACHE = true; public static final boolean DEFAULT_USE_RESULTLEVEL_CACHE = true; + public static final boolean DEFAULT_ENABLE_BROKER_BACKPRESSURE = false; public static final int DEFAULT_PRIORITY = 0; public static final int DEFAULT_UNCOVERED_INTERVALS_LIMIT = 0; public static final long DEFAULT_TIMEOUT_MILLIS = 300_000; // 5 minutes @@ -210,6 +212,10 @@ static long getDefaultTimeout(Query query) return defaultTimeout; } + public static boolean isEnableBrokerBackpressure(Query query) { + return parseBoolean(query, ENABLE_BROKER_BACKPRESSURE, DEFAULT_ENABLE_BROKER_BACKPRESSURE); + } + static long parseLong(Query query, String key, long defaultValue) { final Object val = query.getContextValue(key); diff --git a/server/src/main/java/io/druid/client/DirectDruidClient.java b/server/src/main/java/io/druid/client/DirectDruidClient.java index 72607f91bf4c..ab711dba8d23 100644 --- a/server/src/main/java/io/druid/client/DirectDruidClient.java +++ b/server/src/main/java/io/druid/client/DirectDruidClient.java @@ -80,18 +80,19 @@ import java.util.Enumeration; import java.util.Iterator; import java.util.Map; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.CancellationException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.LinkedTransferQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.TransferQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.LongAdder; /** */ @@ -164,19 +165,27 @@ public int getNumOpenConnections() public Sequence run(final QueryPlus queryPlus, final Map context) { final Query query = queryPlus.getQuery(); - QueryToolChest> toolChest = warehouse.getToolChest(query); - boolean isBySegment = QueryContexts.isBySegment(query); - - Pair types = typesMap.get(query.getClass()); - if (types == null) { - final TypeFactory typeFactory = objectMapper.getTypeFactory(); - JavaType baseType = typeFactory.constructType(toolChest.getResultTypeReference()); - JavaType bySegmentType = typeFactory.constructParametricType( - Result.class, typeFactory.constructParametricType(BySegmentResultValueClass.class, baseType) - ); - types = Pair.of(baseType, bySegmentType); - typesMap.put(query.getClass(), types); - } + final QueryToolChest> toolChest = warehouse.getToolChest(query); + final boolean isBySegment = QueryContexts.isBySegment(query); + final boolean isEnableBrokerBackpressure = QueryContexts.isEnableBrokerBackpressure(query); + + final Pair types = typesMap.computeIfAbsent( + query.getClass(), + ignored -> { + final TypeFactory typeFactory = objectMapper.getTypeFactory(); + final JavaType baseType = typeFactory.constructType(toolChest.getResultTypeReference()); + final JavaType bySegmentType = typeFactory.constructParametrizedType( + Result.class, + Result.class, + typeFactory.constructParametrizedType( + BySegmentResultValueClass.class, + BySegmentResultValueClass.class, + baseType + ) + ); + return Pair.of(baseType, bySegmentType); + } + ); final JavaType typeRef; if (isBySegment) { @@ -200,8 +209,8 @@ public Sequence run(final QueryPlus queryPlus, final Map c final HttpResponseHandler responseHandler = new HttpResponseHandler() { - private final AtomicLong byteCount = new AtomicLong(0); - private final BlockingQueue queue = new LinkedBlockingQueue<>(); + private final LongAdder byteCount = new LongAdder(); + private final TransferQueue queue = new LinkedTransferQueue<>(); private final AtomicBoolean done = new AtomicBoolean(false); private final AtomicReference fail = new AtomicReference<>(); @@ -241,7 +250,7 @@ public ClientResponse handleResponse(HttpResponse response) } catch (final IOException e) { log.error(e, "Error parsing response context from url [%s]", url); - return ClientResponse.finished( + return ClientResponse.finished( new InputStream() { @Override @@ -257,8 +266,8 @@ public int read() throws IOException Thread.currentThread().interrupt(); throw Throwables.propagate(e); } - byteCount.addAndGet(response.getContent().readableBytes()); - return ClientResponse.finished( + byteCount.add(response.getContent().readableBytes()); + return ClientResponse.finished( new SequenceInputStream( new Enumeration() { @@ -323,7 +332,7 @@ public ClientResponse handleChunk( Thread.currentThread().interrupt(); throw Throwables.propagate(e); } - byteCount.addAndGet(bytes); + byteCount.add(bytes); } return clientResponse; } @@ -331,20 +340,21 @@ public ClientResponse handleChunk( @Override public ClientResponse done(ClientResponse clientResponse) { - long stopTimeNs = System.nanoTime(); - long nodeTimeNs = stopTimeNs - requestStartTimeNs; + final long stopTimeNs = System.nanoTime(); + final long nodeTimeNs = stopTimeNs - requestStartTimeNs; final long nodeTimeMs = TimeUnit.NANOSECONDS.toMillis(nodeTimeNs); + final long byteCount = this.byteCount.sum(); log.debug( "Completed queryId[%s] request to url[%s] with %,d bytes returned in %,d millis [%,f b/s].", query.getId(), url, - byteCount.get(), + byteCount, nodeTimeMs, - byteCount.get() / (0.001 * nodeTimeMs) // Floating math; division by zero will yield Inf, not exception + byteCount / (0.001 * nodeTimeMs) // Floating math; division by zero will yield Inf, not exception ); - QueryMetrics> responseMetrics = acquireResponseMetrics(); + final QueryMetrics> responseMetrics = acquireResponseMetrics(); responseMetrics.reportNodeTime(nodeTimeNs); - responseMetrics.reportNodeBytes(byteCount.get()); + responseMetrics.reportNodeBytes(byteCount); responseMetrics.emit(emitter); synchronized (done) { try { @@ -365,7 +375,7 @@ public ClientResponse done(ClientResponse clientRespon done.set(true); } } - return ClientResponse.finished(clientResponse.getObj()); + return ClientResponse.finished(clientResponse.getObj()); } @Override From c13afc195b1ad5378f34f1efe3619151d3b78d0b Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Tue, 17 Jul 2018 15:29:43 -0700 Subject: [PATCH 2/3] Add optional transfer queue operations in direct druid client * Add docs on query context --- docs/content/querying/query-context.md | 1 + .../java/io/druid/client/DirectDruidClient.java | 16 ++++++++++++++-- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/docs/content/querying/query-context.md b/docs/content/querying/query-context.md index d4e2be28f123..8aee6744c398 100644 --- a/docs/content/querying/query-context.md +++ b/docs/content/querying/query-context.md @@ -22,6 +22,7 @@ The query context is used for various query configuration parameters. The follow |chunkPeriod | `P0D` (off) | At the broker node level, long interval queries (of any type) may be broken into shorter interval queries to parallelize merging more than normal. Broken up queries will use a larger share of cluster resources, but may be able to complete faster as a result. Use ISO 8601 periods. For example, if this property is set to `P1M` (one month), then a query covering a year would be broken into 12 smaller queries. The broker uses its query processing executor service to initiate processing for query chunks, so make sure "druid.processing.numThreads" is configured appropriately on the broker. [groupBy queries](groupbyquery.html) do not support chunkPeriod by default, although they do if using the legacy "v1" engine. | |serializeDateTimeAsLong| `false` | If true, DateTime is serialized as long in the result returned by broker and the data transportation between broker and compute node| |serializeDateTimeAsLongInner| `false` | If true, DateTime is serialized as long in the data transportation between broker and compute node| +|enableBrokerBackpressure|`false`|If true, brokers will refuse to accept new http chunks from query nodes until it is ready to process more chunks. This can reduce heap memory pressure on brokers for large query results at the potential expense of query latency| In addition, some query types offer context parameters specific to that query type. diff --git a/server/src/main/java/io/druid/client/DirectDruidClient.java b/server/src/main/java/io/druid/client/DirectDruidClient.java index ab711dba8d23..f64a9b1555e1 100644 --- a/server/src/main/java/io/druid/client/DirectDruidClient.java +++ b/server/src/main/java/io/druid/client/DirectDruidClient.java @@ -246,7 +246,12 @@ public ClientResponse handleResponse(HttpResponse response) ) ); } - queue.put(new ChannelBufferInputStream(response.getContent())); + final InputStream inputStream = new ChannelBufferInputStream(response.getContent()); + if (isEnableBrokerBackpressure) { + queue.transfer(inputStream); + } else { + queue.put(inputStream); + } } catch (final IOException e) { log.error(e, "Error parsing response context from url [%s]", url); @@ -325,7 +330,12 @@ public ClientResponse handleChunk( if (bytes > 0) { try { - queue.put(new ChannelBufferInputStream(channelBuffer)); + final InputStream inputStream = new ChannelBufferInputStream(channelBuffer); + if (isEnableBrokerBackpressure) { + queue.transfer(inputStream); + } else { + queue.put(inputStream); + } } catch (InterruptedException e) { log.error(e, "Unable to put finalizing input stream into Sequence queue for url [%s]", url); @@ -360,6 +370,7 @@ public ClientResponse done(ClientResponse clientRespon try { // An empty byte array is put at the end to give the SequenceInputStream.close() as something to close out // after done is set to true, regardless of the rest of the stream's state. + // We don't need to "transfer" this one because there's no more results to force backpressure with queue.put(ByteSource.empty().openStream()); } catch (InterruptedException e) { @@ -394,6 +405,7 @@ private void setupResponseReadFailure(String msg, Throwable th) { fail.set(msg); queue.clear(); + // Don't need to transfer because this is a terminal state, so no need to block more items coming in queue.offer(new InputStream() { @Override From f988ef2e5ab5a3adbd835241733ec3e42d57e7a2 Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Tue, 17 Jul 2018 15:53:31 -0700 Subject: [PATCH 3/3] Code formatting --- processing/src/main/java/io/druid/query/QueryContexts.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/processing/src/main/java/io/druid/query/QueryContexts.java b/processing/src/main/java/io/druid/query/QueryContexts.java index 7160b8b5204c..a8083e01fa61 100644 --- a/processing/src/main/java/io/druid/query/QueryContexts.java +++ b/processing/src/main/java/io/druid/query/QueryContexts.java @@ -172,7 +172,6 @@ public static Query verifyMaxQueryTimeout(Query query, long maxQueryTi } - public static long getMaxScatterGatherBytes(Query query) { return parseLong(query, MAX_SCATTER_GATHER_BYTES_KEY, Long.MAX_VALUE); @@ -212,7 +211,8 @@ static long getDefaultTimeout(Query query) return defaultTimeout; } - public static boolean isEnableBrokerBackpressure(Query query) { + public static boolean isEnableBrokerBackpressure(Query query) + { return parseBoolean(query, ENABLE_BROKER_BACKPRESSURE, DEFAULT_ENABLE_BROKER_BACKPRESSURE); }