Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/content/querying/query-context.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ The query context is used for various query configuration parameters. The follow
|chunkPeriod | `P0D` (off) | At the broker node level, long interval queries (of any type) may be broken into shorter interval queries to parallelize merging more than normal. Broken up queries will use a larger share of cluster resources, but may be able to complete faster as a result. Use ISO 8601 periods. For example, if this property is set to `P1M` (one month), then a query covering a year would be broken into 12 smaller queries. The broker uses its query processing executor service to initiate processing for query chunks, so make sure "druid.processing.numThreads" is configured appropriately on the broker. [groupBy queries](groupbyquery.html) do not support chunkPeriod by default, although they do if using the legacy "v1" engine. |
|serializeDateTimeAsLong| `false` | If true, DateTime is serialized as long in the result returned by broker and the data transportation between broker and compute node|
|serializeDateTimeAsLongInner| `false` | If true, DateTime is serialized as long in the data transportation between broker and compute node|
|enableBrokerBackpressure|`false`|If true, brokers will refuse to accept new http chunks from query nodes until it is ready to process more chunks. This can reduce heap memory pressure on brokers for large query results at the potential expense of query latency|

In addition, some query types offer context parameters specific to that query type.

Expand Down
8 changes: 7 additions & 1 deletion processing/src/main/java/io/druid/query/QueryContexts.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,14 @@ public class QueryContexts
public static final String MAX_SCATTER_GATHER_BYTES_KEY = "maxScatterGatherBytes";
public static final String DEFAULT_TIMEOUT_KEY = "defaultTimeout";
public static final String CHUNK_PERIOD_KEY = "chunkPeriod";
public static final String ENABLE_BROKER_BACKPRESSURE = "enableBrokerBackpressure";

public static final boolean DEFAULT_BY_SEGMENT = false;
public static final boolean DEFAULT_POPULATE_CACHE = true;
public static final boolean DEFAULT_USE_CACHE = true;
public static final boolean DEFAULT_POPULATE_RESULTLEVEL_CACHE = true;
public static final boolean DEFAULT_USE_RESULTLEVEL_CACHE = true;
public static final boolean DEFAULT_ENABLE_BROKER_BACKPRESSURE = false;
public static final int DEFAULT_PRIORITY = 0;
public static final int DEFAULT_UNCOVERED_INTERVALS_LIMIT = 0;
public static final long DEFAULT_TIMEOUT_MILLIS = 300_000; // 5 minutes
Expand Down Expand Up @@ -170,7 +172,6 @@ public static <T> Query<T> verifyMaxQueryTimeout(Query<T> query, long maxQueryTi
}



public static <T> long getMaxScatterGatherBytes(Query<T> query)
{
return parseLong(query, MAX_SCATTER_GATHER_BYTES_KEY, Long.MAX_VALUE);
Expand Down Expand Up @@ -210,6 +211,11 @@ static <T> long getDefaultTimeout(Query<T> query)
return defaultTimeout;
}

public static <T> boolean isEnableBrokerBackpressure(Query<T> query)
{
return parseBoolean(query, ENABLE_BROKER_BACKPRESSURE, DEFAULT_ENABLE_BROKER_BACKPRESSURE);
}

static <T> long parseLong(Query<T> query, String key, long defaultValue)
{
final Object val = query.getContextValue(key);
Expand Down
82 changes: 52 additions & 30 deletions server/src/main/java/io/druid/client/DirectDruidClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,18 +80,19 @@
import java.util.Enumeration;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.TransferQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;

/**
*/
Expand Down Expand Up @@ -164,19 +165,27 @@ public int getNumOpenConnections()
public Sequence<T> run(final QueryPlus<T> queryPlus, final Map<String, Object> context)
{
final Query<T> query = queryPlus.getQuery();
QueryToolChest<T, Query<T>> toolChest = warehouse.getToolChest(query);
boolean isBySegment = QueryContexts.isBySegment(query);

Pair<JavaType, JavaType> types = typesMap.get(query.getClass());
if (types == null) {
final TypeFactory typeFactory = objectMapper.getTypeFactory();
JavaType baseType = typeFactory.constructType(toolChest.getResultTypeReference());
JavaType bySegmentType = typeFactory.constructParametricType(
Result.class, typeFactory.constructParametricType(BySegmentResultValueClass.class, baseType)
);
types = Pair.of(baseType, bySegmentType);
typesMap.put(query.getClass(), types);
}
final QueryToolChest<T, Query<T>> toolChest = warehouse.getToolChest(query);
final boolean isBySegment = QueryContexts.isBySegment(query);
final boolean isEnableBrokerBackpressure = QueryContexts.isEnableBrokerBackpressure(query);

final Pair<JavaType, JavaType> types = typesMap.computeIfAbsent(
query.getClass(),
ignored -> {
final TypeFactory typeFactory = objectMapper.getTypeFactory();
final JavaType baseType = typeFactory.constructType(toolChest.getResultTypeReference());
final JavaType bySegmentType = typeFactory.constructParametrizedType(
Result.class,
Result.class,
typeFactory.constructParametrizedType(
BySegmentResultValueClass.class,
BySegmentResultValueClass.class,
baseType
)
);
return Pair.of(baseType, bySegmentType);
}
);

final JavaType typeRef;
if (isBySegment) {
Expand All @@ -200,8 +209,8 @@ public Sequence<T> run(final QueryPlus<T> queryPlus, final Map<String, Object> c

final HttpResponseHandler<InputStream, InputStream> responseHandler = new HttpResponseHandler<InputStream, InputStream>()
{
private final AtomicLong byteCount = new AtomicLong(0);
private final BlockingQueue<InputStream> queue = new LinkedBlockingQueue<>();
private final LongAdder byteCount = new LongAdder();
private final TransferQueue<InputStream> queue = new LinkedTransferQueue<>();
private final AtomicBoolean done = new AtomicBoolean(false);
private final AtomicReference<String> fail = new AtomicReference<>();

Expand Down Expand Up @@ -237,11 +246,16 @@ public ClientResponse<InputStream> handleResponse(HttpResponse response)
)
);
}
queue.put(new ChannelBufferInputStream(response.getContent()));
final InputStream inputStream = new ChannelBufferInputStream(response.getContent());
if (isEnableBrokerBackpressure) {
queue.transfer(inputStream);
} else {
queue.put(inputStream);
}
}
catch (final IOException e) {
log.error(e, "Error parsing response context from url [%s]", url);
return ClientResponse.<InputStream>finished(
return ClientResponse.finished(
new InputStream()
{
@Override
Expand All @@ -257,8 +271,8 @@ public int read() throws IOException
Thread.currentThread().interrupt();
throw Throwables.propagate(e);
}
byteCount.addAndGet(response.getContent().readableBytes());
return ClientResponse.<InputStream>finished(
byteCount.add(response.getContent().readableBytes());
return ClientResponse.finished(
new SequenceInputStream(
new Enumeration<InputStream>()
{
Expand Down Expand Up @@ -316,40 +330,47 @@ public ClientResponse<InputStream> handleChunk(

if (bytes > 0) {
try {
queue.put(new ChannelBufferInputStream(channelBuffer));
final InputStream inputStream = new ChannelBufferInputStream(channelBuffer);
if (isEnableBrokerBackpressure) {
queue.transfer(inputStream);
} else {
queue.put(inputStream);
}
}
catch (InterruptedException e) {
log.error(e, "Unable to put finalizing input stream into Sequence queue for url [%s]", url);
Thread.currentThread().interrupt();
throw Throwables.propagate(e);
}
byteCount.addAndGet(bytes);
byteCount.add(bytes);
}
return clientResponse;
}

@Override
public ClientResponse<InputStream> done(ClientResponse<InputStream> clientResponse)
{
long stopTimeNs = System.nanoTime();
long nodeTimeNs = stopTimeNs - requestStartTimeNs;
final long stopTimeNs = System.nanoTime();
final long nodeTimeNs = stopTimeNs - requestStartTimeNs;
final long nodeTimeMs = TimeUnit.NANOSECONDS.toMillis(nodeTimeNs);
final long byteCount = this.byteCount.sum();
log.debug(
"Completed queryId[%s] request to url[%s] with %,d bytes returned in %,d millis [%,f b/s].",
query.getId(),
url,
byteCount.get(),
byteCount,
nodeTimeMs,
byteCount.get() / (0.001 * nodeTimeMs) // Floating math; division by zero will yield Inf, not exception
byteCount / (0.001 * nodeTimeMs) // Floating math; division by zero will yield Inf, not exception
);
QueryMetrics<? super Query<T>> responseMetrics = acquireResponseMetrics();
final QueryMetrics<? super Query<T>> responseMetrics = acquireResponseMetrics();
responseMetrics.reportNodeTime(nodeTimeNs);
responseMetrics.reportNodeBytes(byteCount.get());
responseMetrics.reportNodeBytes(byteCount);
responseMetrics.emit(emitter);
synchronized (done) {
try {
// An empty byte array is put at the end to give the SequenceInputStream.close() as something to close out
// after done is set to true, regardless of the rest of the stream's state.
// We don't need to "transfer" this one because there's no more results to force backpressure with
queue.put(ByteSource.empty().openStream());
}
catch (InterruptedException e) {
Expand All @@ -365,7 +386,7 @@ public ClientResponse<InputStream> done(ClientResponse<InputStream> clientRespon
done.set(true);
}
}
return ClientResponse.<InputStream>finished(clientResponse.getObj());
return ClientResponse.finished(clientResponse.getObj());
}

@Override
Expand All @@ -384,6 +405,7 @@ private void setupResponseReadFailure(String msg, Throwable th)
{
fail.set(msg);
queue.clear();
// Don't need to transfer because this is a terminal state, so no need to block more items coming in
queue.offer(new InputStream()
{
@Override
Expand Down