Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/content/configuration/broker.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ Druid uses Jetty to serve HTTP requests.
|`druid.server.http.numThreads`|Number of threads for HTTP requests.|10|
|`druid.server.http.maxIdleTime`|The Jetty max idle time for a connection.|PT5m|
|`druid.server.http.defaultQueryTimeout`|Query timeout in millis, beyond which unfinished queries will be cancelled|300000|
|`druid.server.http.maxScatterGatherBytes`|Maximum number of bytes gathered from data nodes such as historicals and realtime processes to execute a query. This is an advance configuration that allows to protect in case broker is under heavy load and not utilizing the data gathered in memory fast enough and leading to OOMs. This limit can be further reduced at query time using `maxScatterGatherBytes` in the context. Note that having large limit is not necessarily bad if broker is never under heavy concurrent load in which case data gathered is processed quickly and freeing up the memory used.|Long.MAX_VALUE|
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder why this property is prefixed by druid.server instead of druid.broker. Is it planned to be applied to other node types?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't think it makes sense for any other node type. And now that you mention it, I think probably made more sense to call it druid.broker.http.maxScatterGatherBytes . we can possibly change it in future or remove this config if backpressure story improves.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. It sounds good.

|`druid.broker.http.numConnections`|Size of connection pool for the Broker to connect to historical and real-time processes. If there are more queries than this number that all need to speak to the same node, then they will queue up.|20|
|`druid.broker.http.compressionCodec`|Compression codec the Broker uses to communicate with historical and real-time processes. May be "gzip" or "identity".|gzip|
|`druid.broker.http.readTimeout`|The timeout for data reads from historical and real-time processes.|PT15M|
Expand Down
1 change: 1 addition & 0 deletions docs/content/querying/query-context.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ The query context is used for various query configuration parameters. The follow
|property |default | description |
|-----------------|----------------------------------------|----------------------|
|timeout | `druid.server.http.defaultQueryTimeout`| Query timeout in millis, beyond which unfinished queries will be cancelled. 0 timeout means `no timeout`. To set the default timeout, see [broker configuration](broker.html) |
|maxScatterGatherBytes| `druid.server.http.maxScatterGatherBytes` | Maximum number of bytes gathered from data nodes such as historicals and realtime processes to execute a query. This parameter can be used to further reduce `maxScatterGatherBytes` limit at query time. See [broker configuration](broker.html) for more details.|
|priority | `0` | Query Priority. Queries with higher priority get precedence for computational resources.|
|queryId | auto-generated | Unique identifier given to this query. If a query ID is set or known, this can be used to cancel the query |
|useCache | `true` | Flag indicating whether to leverage the query cache for this query. When set to false, it disables reading from the query cache for this query. When set to true, Druid uses druid.broker.cache.useCache or druid.historical.cache.useCache to determine whether or not to read from the query cache |
Expand Down
32 changes: 32 additions & 0 deletions processing/src/main/java/io/druid/query/QueryContexts.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.ISE;

public class QueryContexts
{
public static final String PRIORITY_KEY = "priority";
public static final String TIMEOUT_KEY = "timeout";
public static final String MAX_SCATTER_GATHER_BYTES_KEY = "maxScatterGatherBytes";
public static final String DEFAULT_TIMEOUT_KEY = "defaultTimeout";
public static final String CHUNK_PERIOD_KEY = "chunkPeriod";

Expand Down Expand Up @@ -98,6 +100,31 @@ public static <T> String getChunkPeriod(Query<T> query)
return query.getContextValue(CHUNK_PERIOD_KEY, "P0D");
}

public static <T> Query<T> withMaxScatterGatherBytes(Query<T> query, long maxScatterGatherBytesLimit)
{
Object obj = query.getContextValue(MAX_SCATTER_GATHER_BYTES_KEY);
if (obj == null) {
return query.withOverriddenContext(ImmutableMap.of(MAX_SCATTER_GATHER_BYTES_KEY, maxScatterGatherBytesLimit));
} else {
long curr = ((Number) obj).longValue();
if (curr > maxScatterGatherBytesLimit) {
throw new IAE(
"configured [%s = %s] is more than enforced limit of [%s].",
MAX_SCATTER_GATHER_BYTES_KEY,
curr,
maxScatterGatherBytesLimit
);
} else {
return query;
}
}
}

public static <T> long getMaxScatterGatherBytes(Query<T> query)
{
return parseLong(query, MAX_SCATTER_GATHER_BYTES_KEY, Long.MAX_VALUE);
}

public static <T> boolean hasTimeout(Query<T> query)
{
return getTimeout(query) != NO_TIMEOUT;
Expand All @@ -115,6 +142,11 @@ public static <T> long getTimeout(Query<T> query, long defaultTimeout)
return timeout;
}

public static <T> Query<T> withTimeout(Query<T> query, long timeout)
{
return query.withOverriddenContext(ImmutableMap.of(TIMEOUT_KEY, timeout));
}

public static <T> Query<T> withDefaultTimeout(Query<T> query, long defaultTimeout)
{
return query.withOverriddenContext(ImmutableMap.of(QueryContexts.DEFAULT_TIMEOUT_KEY, defaultTimeout));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package io.druid.query;

import io.druid.common.utils.StringUtils;

/**
* Exception indicating that an operation failed because it exceeded some configured resource limit.
*
Expand All @@ -27,8 +29,8 @@
*/
public class ResourceLimitExceededException extends RuntimeException
{
public ResourceLimitExceededException(String message)
public ResourceLimitExceededException(String message, Object... arguments)
{
super(message);
super(StringUtils.safeFormat(message, arguments));
}
}
150 changes: 126 additions & 24 deletions server/src/main/java/io/druid/client/DirectDruidClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import com.metamx.http.client.response.HttpResponseHandler;
import com.metamx.http.client.response.StatusResponseHandler;
import com.metamx.http.client.response.StatusResponseHolder;
import io.druid.common.utils.StringUtils;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.RE;
Expand All @@ -60,6 +61,7 @@
import io.druid.query.QueryToolChest;
import io.druid.query.QueryToolChestWarehouse;
import io.druid.query.QueryWatcher;
import io.druid.query.ResourceLimitExceededException;
import io.druid.query.Result;
import io.druid.query.aggregation.MetricManipulatorFns;
import org.jboss.netty.buffer.ChannelBuffer;
Expand All @@ -68,6 +70,7 @@
import org.jboss.netty.handler.codec.http.HttpHeaders;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.joda.time.Duration;

import javax.ws.rs.core.MediaType;
import java.io.Closeable;
Expand All @@ -84,14 +87,19 @@
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

/**
*/
public class DirectDruidClient<T> implements QueryRunner<T>
{
public static final String QUERY_FAIL_TIME = "queryFailTime";
public static final String QUERY_TOTAL_BYTES_GATHERED = "queryTotalBytesGathered";

private static final Logger log = new Logger(DirectDruidClient.class);

private static final Map<Class<? extends Query>, Pair<JavaType, JavaType>> typesMap = Maps.newConcurrentMap();
Expand Down Expand Up @@ -168,16 +176,24 @@ public Sequence<T> run(final QueryPlus<T> queryPlus, final Map<String, Object> c
final QueryMetrics<? super Query<T>> queryMetrics = toolChest.makeMetrics(query);
queryMetrics.server(host);

long timeoutAt = ((Long) context.get(QUERY_FAIL_TIME)).longValue();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It introduces NPE, if the query is made bypassing QueryResource, i. e. from inside QueryEngine of another type of query, via QuerySegmentWalker.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Druid SQL ran into that too and was fixed via #4305.

I think this is fine since, imo, making queries without going through the resources is something you do 'at your own risk' and isn't an officially supported mode of operation.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QueryResource is neither official API in #4433. Making a query as HTTP request to the same JVM runtime is ridiculous.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I'm saying is that I would consider the whole concept of queries making other queries as not officially supported. If you want to do that in an extension then go for it but there is no official API for it.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rationale for me is:

  • Queries making other queries is going outside the normal framework. The normal framework is a query comes in to the broker, then fans out to historicals/other data nodes, then fans back in and the broker does a merge and returns results. There's not a standard place for "make another subquery" to fit in.
  • Queries making other queries may lead to metrics not being completely collected, or security rules not being properly applied (some of which are checked in the resources), or requests not being properly logged, or exceptions not being properly alerted on. This needs to be thought through.
  • Users are still free to write extensions where queries make other queries, but due to the above two points, I don't think that should be considered a "public api" at this time.

long maxScatterGatherBytes = QueryContexts.getMaxScatterGatherBytes(query);
AtomicLong totalBytesGathered = (AtomicLong) context.get(QUERY_TOTAL_BYTES_GATHERED);

final HttpResponseHandler<InputStream, InputStream> responseHandler = new HttpResponseHandler<InputStream, InputStream>()
{
private long responseStartTimeNs;
private final AtomicLong byteCount = new AtomicLong(0);
private final BlockingQueue<InputStream> queue = new LinkedBlockingQueue<>();
private final AtomicBoolean done = new AtomicBoolean(false);
private final AtomicReference<String> fail = new AtomicReference<>();

@Override
public ClientResponse<InputStream> handleResponse(HttpResponse response)
{
checkQueryTimeout();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be nicer if both of these were to return booleans and the call sites were to just skip things or whatever if the limits are exceeded.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

they contain specific error message which would need to be repeated at all call sites. also, they are used in a relatively small scope and look ok.

checkTotalBytesLimit(response.getContent().readableBytes());

log.debug("Initial response from url[%s] for queryId[%s]", url, query.getId());
responseStartTimeNs = System.nanoTime();
queryMetrics.reportNodeTimeToFirstByte(responseStartTimeNs - requestStartTimeNs).emit(emitter);
Expand Down Expand Up @@ -222,6 +238,11 @@ public int read() throws IOException
@Override
public boolean hasMoreElements()
{
if (fail.get() != null) {
throw new RE(fail.get());
}
checkQueryTimeout();

// Done is always true until the last stream has be put in the queue.
// Then the stream should be spouting good InputStreams.
synchronized (done) {
Expand All @@ -232,8 +253,17 @@ public boolean hasMoreElements()
@Override
public InputStream nextElement()
{
if (fail.get() != null) {
throw new RE(fail.get());
}

try {
return queue.take();
InputStream is = queue.poll(checkQueryTimeout(), TimeUnit.MILLISECONDS);
if (is != null) {
return is;
} else {
throw new RE("Query[%s] url[%s] timed out.", query.getId(), url);
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
Expand All @@ -250,8 +280,13 @@ public ClientResponse<InputStream> handleChunk(
ClientResponse<InputStream> clientResponse, HttpChunk chunk
)
{
checkQueryTimeout();

final ChannelBuffer channelBuffer = chunk.getContent();
final int bytes = channelBuffer.readableBytes();

checkTotalBytesLimit(bytes);

if (bytes > 0) {
try {
queue.put(new ChannelBufferInputStream(channelBuffer));
Expand Down Expand Up @@ -308,34 +343,78 @@ public ClientResponse<InputStream> done(ClientResponse<InputStream> clientRespon
@Override
public void exceptionCaught(final ClientResponse<InputStream> clientResponse, final Throwable e)
{
// Don't wait for lock in case the lock had something to do with the error
synchronized (done) {
done.set(true);
// Make a best effort to put a zero length buffer into the queue in case something is waiting on the take()
// If nothing is waiting on take(), this will be closed out anyways.
queue.offer(
new InputStream()
{
@Override
public int read() throws IOException
{
throw new IOException(e);
}
}
String msg = StringUtils.safeFormat(
"Query[%s] url[%s] failed with exception msg [%s]",
query.getId(),
url,
e.getMessage()
);
setupResponseReadFailure(msg, e);
}

private void setupResponseReadFailure(String msg, Throwable th)
{
fail.set(msg);
queue.clear();
queue.offer(new InputStream()
{
@Override
public int read() throws IOException
{
if (th != null) {
throw new IOException(msg, th);
} else {
throw new IOException(msg);
}
}
});

}

// Returns remaining timeout or throws exception if timeout already elapsed.
private long checkQueryTimeout()
{
long timeLeft = timeoutAt - System.currentTimeMillis();
if (timeLeft >= 0) {
String msg = StringUtils.safeFormat("Query[%s] url[%s] timed out.", query.getId(), url);
setupResponseReadFailure(msg, null);
throw new RE(msg);
} else {
return timeLeft;
}
}

private void checkTotalBytesLimit(long bytes)
{
if (maxScatterGatherBytes < Long.MAX_VALUE && totalBytesGathered.addAndGet(bytes) > maxScatterGatherBytes) {
String msg = StringUtils.safeFormat(
"Query[%s] url[%s] max scatter-gather bytes limit reached.",
query.getId(),
url
);
setupResponseReadFailure(msg, null);
throw new RE(msg);
}
}
};

long timeLeft = timeoutAt - System.currentTimeMillis();

if (timeLeft <= 0) {
throw new RE("Query[%s] url[%s] timed out.", query.getId(), url);
}

future = httpClient.go(
new Request(
HttpMethod.POST,
new URL(url)
).setContent(objectMapper.writeValueAsBytes(query))
).setContent(objectMapper.writeValueAsBytes(QueryContexts.withTimeout(query, timeLeft)))
.setHeader(
HttpHeaders.Names.CONTENT_TYPE,
isSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON
),
responseHandler
responseHandler,
Duration.millis(timeLeft)
);

queryWatcher.registerQuery(query, future);
Expand Down Expand Up @@ -368,8 +447,10 @@ public void onFailure(Throwable t)
? SmileMediaTypes.APPLICATION_JACKSON_SMILE
: MediaType.APPLICATION_JSON
),
new StatusResponseHandler(Charsets.UTF_8)
).get();
new StatusResponseHandler(Charsets.UTF_8),
Duration.standardSeconds(1)
).get(1, TimeUnit.SECONDS);

if (res.getStatus().getCode() >= 500) {
throw new RE(
"Error cancelling query[%s]: queriable node returned status[%d] [%s].",
Expand All @@ -378,7 +459,7 @@ public void onFailure(Throwable t)
);
}
}
catch (IOException | ExecutionException | InterruptedException e) {
catch (IOException | ExecutionException | InterruptedException | TimeoutException e) {
Throwables.propagate(e);
}
}
Expand All @@ -396,7 +477,7 @@ public void onFailure(Throwable t)
@Override
public JsonParserIterator<T> make()
{
return new JsonParserIterator<T>(typeRef, future, url);
return new JsonParserIterator<T>(typeRef, future, url, query);
}

@Override
Expand Down Expand Up @@ -428,13 +509,15 @@ private class JsonParserIterator<T> implements Iterator<T>, Closeable
private ObjectCodec objectCodec;
private final JavaType typeRef;
private final Future<InputStream> future;
private final Query<T> query;
private final String url;

public JsonParserIterator(JavaType typeRef, Future<InputStream> future, String url)
public JsonParserIterator(JavaType typeRef, Future<InputStream> future, String url, Query<T> query)
{
this.typeRef = typeRef;
this.future = future;
this.url = url;
this.query = query;
jp = null;
}

Expand All @@ -458,6 +541,7 @@ public boolean hasNext()
public T next()
{
init();

try {
final T retVal = objectCodec.readValue(jp, typeRef);
jp.nextToken();
Expand All @@ -478,7 +562,19 @@ private void init()
{
if (jp == null) {
try {
jp = objectMapper.getFactory().createParser(future.get());
InputStream is = future.get();
if (is == null) {
throw new QueryInterruptedException(
new ResourceLimitExceededException(
"query[%s] url[%s] timed out or max bytes limit reached.",
query.getId(),
url
),
host
);
} else {
jp = objectMapper.getFactory().createParser(is);
}
final JsonToken nextToken = jp.nextToken();
if (nextToken == JsonToken.START_OBJECT) {
QueryInterruptedException cause = jp.getCodec().readValue(jp, QueryInterruptedException.class);
Expand All @@ -491,7 +587,13 @@ private void init()
}
}
catch (IOException | InterruptedException | ExecutionException e) {
throw new RE(e, "Failure getting results from[%s] because of [%s]", url, e.getMessage());
throw new RE(
e,
"Failure getting results for query[%s] url[%s] because of [%s]",
query.getId(),
url,
e.getMessage()
);
}
catch (CancellationException e) {
throw new QueryInterruptedException(e, host);
Expand Down
Loading