From 9c43b2d3b11cdae6b10909a1cbc38097805a0914 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Fri, 5 May 2017 13:21:41 -0500 Subject: [PATCH 1/2] improve query timeout handling and limit max scatter-gather bytes --- docs/content/querying/query-context.md | 1 + .../java/io/druid/query/QueryContexts.java | 11 ++ .../query/ResourceLimitExceededException.java | 6 +- .../io/druid/client/DirectDruidClient.java | 170 +++++++++++++++--- .../java/io/druid/server/QueryResource.java | 8 +- .../druid/client/DirectDruidClientTest.java | 19 +- 6 files changed, 181 insertions(+), 34 deletions(-) diff --git a/docs/content/querying/query-context.md b/docs/content/querying/query-context.md index d53a239ad8cc..c4ea37a8c77b 100644 --- a/docs/content/querying/query-context.md +++ b/docs/content/querying/query-context.md @@ -10,6 +10,7 @@ The query context is used for various query configuration parameters. The follow |property |default | description | |-----------------|----------------------------------------|----------------------| |timeout | `druid.server.http.defaultQueryTimeout`| Query timeout in millis, beyond which unfinished queries will be cancelled. 0 timeout means `no timeout`. To set the default timeout, see [broker configuration](broker.html) | +|maxScatterGatherBytes| Long.MAX_VALUE | At Broker, this limits total number of bytes gathered from downstream nodes such as historicals and realtime indexers. If Broker is under heavy load and not consuming the data fast enough coming from downstream nodes then it gets stored in memory and may lead to OOMs. This setting provides a workaround to limit per query max memory utilization for storing data received from downstream nodes.| |priority | `0` | Query Priority. Queries with higher priority get precedence for computational resources.| |queryId | auto-generated | Unique identifier given to this query. If a query ID is set or known, this can be used to cancel the query | |useCache | `true` | Flag indicating whether to leverage the query cache for this query. When set to false, it disables reading from the query cache for this query. When set to true, Druid uses druid.broker.cache.useCache or druid.historical.cache.useCache to determine whether or not to read from the query cache | diff --git a/processing/src/main/java/io/druid/query/QueryContexts.java b/processing/src/main/java/io/druid/query/QueryContexts.java index 5a0409e430d4..df77f73db780 100644 --- a/processing/src/main/java/io/druid/query/QueryContexts.java +++ b/processing/src/main/java/io/druid/query/QueryContexts.java @@ -27,6 +27,7 @@ public class QueryContexts { public static final String PRIORITY_KEY = "priority"; public static final String TIMEOUT_KEY = "timeout"; + public static final String MAX_SCATTER_GATHER_BYTES_KEY = "maxScatterGatherBytes"; public static final String DEFAULT_TIMEOUT_KEY = "defaultTimeout"; public static final String CHUNK_PERIOD_KEY = "chunkPeriod"; @@ -98,6 +99,11 @@ public static String getChunkPeriod(Query query) return query.getContextValue(CHUNK_PERIOD_KEY, "P0D"); } + public static long getMaxScatterGatherBytes(Query query) + { + return parseLong(query, MAX_SCATTER_GATHER_BYTES_KEY, Long.MAX_VALUE); + } + public static boolean hasTimeout(Query query) { return getTimeout(query) != NO_TIMEOUT; @@ -115,6 +121,11 @@ public static long getTimeout(Query query, long defaultTimeout) return timeout; } + public static Query withTimeout(Query query, long timeout) + { + return query.withOverriddenContext(ImmutableMap.of(TIMEOUT_KEY, timeout)); + } + public static Query withDefaultTimeout(Query query, long defaultTimeout) { return query.withOverriddenContext(ImmutableMap.of(QueryContexts.DEFAULT_TIMEOUT_KEY, defaultTimeout)); diff --git a/processing/src/main/java/io/druid/query/ResourceLimitExceededException.java b/processing/src/main/java/io/druid/query/ResourceLimitExceededException.java index 1ed038840ba8..06b74ccf4a57 100644 --- a/processing/src/main/java/io/druid/query/ResourceLimitExceededException.java +++ b/processing/src/main/java/io/druid/query/ResourceLimitExceededException.java @@ -19,6 +19,8 @@ package io.druid.query; +import io.druid.common.utils.StringUtils; + /** * Exception indicating that an operation failed because it exceeded some configured resource limit. * @@ -27,8 +29,8 @@ */ public class ResourceLimitExceededException extends RuntimeException { - public ResourceLimitExceededException(String message) + public ResourceLimitExceededException(String message, Object... arguments) { - super(message); + super(StringUtils.safeFormat(message, arguments)); } } diff --git a/server/src/main/java/io/druid/client/DirectDruidClient.java b/server/src/main/java/io/druid/client/DirectDruidClient.java index 98d04bea2a48..f96279039884 100644 --- a/server/src/main/java/io/druid/client/DirectDruidClient.java +++ b/server/src/main/java/io/druid/client/DirectDruidClient.java @@ -42,6 +42,7 @@ import com.metamx.http.client.response.HttpResponseHandler; import com.metamx.http.client.response.StatusResponseHandler; import com.metamx.http.client.response.StatusResponseHolder; +import io.druid.common.utils.StringUtils; import io.druid.java.util.common.IAE; import io.druid.java.util.common.Pair; import io.druid.java.util.common.RE; @@ -60,6 +61,7 @@ import io.druid.query.QueryToolChest; import io.druid.query.QueryToolChestWarehouse; import io.druid.query.QueryWatcher; +import io.druid.query.ResourceLimitExceededException; import io.druid.query.Result; import io.druid.query.aggregation.MetricManipulatorFns; import org.jboss.netty.buffer.ChannelBuffer; @@ -68,6 +70,7 @@ import org.jboss.netty.handler.codec.http.HttpHeaders; import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpResponse; +import org.joda.time.Duration; import javax.ws.rs.core.MediaType; import java.io.Closeable; @@ -84,14 +87,19 @@ import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; /** */ public class DirectDruidClient implements QueryRunner { + public static final String QUERY_START_TIME = "queryStartTime"; + public static final String QUERY_TOTAL_BYTES_GATHERED = "queryTotalBytesGathered"; + private static final Logger log = new Logger(DirectDruidClient.class); private static final Map, Pair> typesMap = Maps.newConcurrentMap(); @@ -174,10 +182,14 @@ public Sequence run(final QueryPlus queryPlus, final Map c private final AtomicLong byteCount = new AtomicLong(0); private final BlockingQueue queue = new LinkedBlockingQueue<>(); private final AtomicBoolean done = new AtomicBoolean(false); + private final AtomicReference fail = new AtomicReference<>(); @Override public ClientResponse handleResponse(HttpResponse response) { + checkQueryTimeout(); + checkTotalBytesLimit(response.getContent().readableBytes()); + log.debug("Initial response from url[%s] for queryId[%s]", url, query.getId()); responseStartTimeNs = System.nanoTime(); queryMetrics.reportNodeTimeToFirstByte(responseStartTimeNs - requestStartTimeNs).emit(emitter); @@ -222,6 +234,11 @@ public int read() throws IOException @Override public boolean hasMoreElements() { + if (fail.get() != null) { + throw new RE(fail.get()); + } + checkQueryTimeout(); + // Done is always true until the last stream has be put in the queue. // Then the stream should be spouting good InputStreams. synchronized (done) { @@ -232,8 +249,17 @@ public boolean hasMoreElements() @Override public InputStream nextElement() { + if (fail.get() != null) { + throw new RE(fail.get()); + } + try { - return queue.take(); + InputStream is = queue.poll(checkQueryTimeout(), TimeUnit.MILLISECONDS); + if (is != null) { + return is; + } else { + throw new RE("Query[%s] url[%s] timed out.", query.getId(), url); + } } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -250,8 +276,13 @@ public ClientResponse handleChunk( ClientResponse clientResponse, HttpChunk chunk ) { + checkQueryTimeout(); + final ChannelBuffer channelBuffer = chunk.getContent(); final int bytes = channelBuffer.readableBytes(); + + checkTotalBytesLimit(bytes); + if (bytes > 0) { try { queue.put(new ChannelBufferInputStream(channelBuffer)); @@ -308,34 +339,100 @@ public ClientResponse done(ClientResponse clientRespon @Override public void exceptionCaught(final ClientResponse clientResponse, final Throwable e) { - // Don't wait for lock in case the lock had something to do with the error - synchronized (done) { - done.set(true); - // Make a best effort to put a zero length buffer into the queue in case something is waiting on the take() - // If nothing is waiting on take(), this will be closed out anyways. - queue.offer( - new InputStream() - { - @Override - public int read() throws IOException - { - throw new IOException(e); - } - } - ); + String msg = StringUtils.safeFormat( + "Query[%s] url[%s] failed with exception msg [%s]", + query.getId(), + url, + e.getMessage() + ); + setupResponseReadFailure(msg, e); + } + + private void setupResponseReadFailure(String msg, Throwable th) + { + fail.set(msg); + queue.clear(); + queue.offer(new InputStream() + { + @Override + public int read() throws IOException + { + if (th != null) { + throw new IOException(msg, th); + } else { + throw new IOException(msg); + } + } + }); + + } + + // Returns remaining timeout or throws exception if timeout already elapsed. + private long checkQueryTimeout() + { + Object obj = context.get(QUERY_START_TIME); + + if (obj != null) { + long timeLeft = QueryContexts.getTimeout(query) - (System.currentTimeMillis() - ((Long) obj).longValue()); + if (timeLeft <= 0) { + String msg = StringUtils.safeFormat("Query[%s] url[%s] timed out.", query.getId(), url); + setupResponseReadFailure(msg, null); + throw new RE(msg); + } else { + return timeLeft; + } + } else { + return QueryContexts.getTimeout(query); + } + } + + private void checkTotalBytesLimit(long bytes) + { + long limit = QueryContexts.getMaxScatterGatherBytes(query); + + if (limit < Long.MAX_VALUE) { + synchronized (context) { + if(context.get(QUERY_TOTAL_BYTES_GATHERED) == null) { + context.put(QUERY_TOTAL_BYTES_GATHERED, new AtomicLong()); + } + } + + AtomicLong totalBytesGathered = (AtomicLong) context.get(QUERY_TOTAL_BYTES_GATHERED); + + if (totalBytesGathered.addAndGet(bytes) > limit) { + String msg = StringUtils.safeFormat( + "Query[%s] url[%s] max scatter-gather bytes limit reached.", + query.getId(), + url + ); + setupResponseReadFailure(msg, null); + throw new RE(msg); + } } } }; + + long timeLeft = QueryContexts.getTimeout(query); + Object queryStartTimeObj = context.get(QUERY_START_TIME); + if (queryStartTimeObj != null) { + timeLeft = timeLeft - (System.currentTimeMillis() - ((Long) queryStartTimeObj).longValue()); + } + + if (timeLeft <= 0) { + throw new RE("Query[%s] url[%s] timed out.", query.getId(), url); + } + future = httpClient.go( new Request( HttpMethod.POST, new URL(url) - ).setContent(objectMapper.writeValueAsBytes(query)) + ).setContent(objectMapper.writeValueAsBytes(QueryContexts.withTimeout(query, timeLeft))) .setHeader( HttpHeaders.Names.CONTENT_TYPE, isSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON ), - responseHandler + responseHandler, + Duration.millis(timeLeft) ); queryWatcher.registerQuery(query, future); @@ -368,8 +465,10 @@ public void onFailure(Throwable t) ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON ), - new StatusResponseHandler(Charsets.UTF_8) - ).get(); + new StatusResponseHandler(Charsets.UTF_8), + Duration.standardSeconds(1) + ).get(1, TimeUnit.SECONDS); + if (res.getStatus().getCode() >= 500) { throw new RE( "Error cancelling query[%s]: queriable node returned status[%d] [%s].", @@ -378,7 +477,7 @@ public void onFailure(Throwable t) ); } } - catch (IOException | ExecutionException | InterruptedException e) { + catch (IOException | ExecutionException | InterruptedException | TimeoutException e) { Throwables.propagate(e); } } @@ -396,7 +495,7 @@ public void onFailure(Throwable t) @Override public JsonParserIterator make() { - return new JsonParserIterator(typeRef, future, url); + return new JsonParserIterator(typeRef, future, url, query); } @Override @@ -428,13 +527,15 @@ private class JsonParserIterator implements Iterator, Closeable private ObjectCodec objectCodec; private final JavaType typeRef; private final Future future; + private final Query query; private final String url; - public JsonParserIterator(JavaType typeRef, Future future, String url) + public JsonParserIterator(JavaType typeRef, Future future, String url, Query query) { this.typeRef = typeRef; this.future = future; this.url = url; + this.query = query; jp = null; } @@ -458,6 +559,7 @@ public boolean hasNext() public T next() { init(); + try { final T retVal = objectCodec.readValue(jp, typeRef); jp.nextToken(); @@ -478,7 +580,19 @@ private void init() { if (jp == null) { try { - jp = objectMapper.getFactory().createParser(future.get()); + InputStream is = future.get(); + if (is == null) { + throw new QueryInterruptedException( + new ResourceLimitExceededException( + "query[%s] url[%s] timed out or max bytes limit reached.", + query.getId(), + url + ), + host + ); + } else { + jp = objectMapper.getFactory().createParser(is); + } final JsonToken nextToken = jp.nextToken(); if (nextToken == JsonToken.START_OBJECT) { QueryInterruptedException cause = jp.getCodec().readValue(jp, QueryInterruptedException.class); @@ -491,7 +605,13 @@ private void init() } } catch (IOException | InterruptedException | ExecutionException e) { - throw new RE(e, "Failure getting results from[%s] because of [%s]", url, e.getMessage()); + throw new RE( + e, + "Failure getting results for query[%s] url[%s] because of [%s]", + query.getId(), + url, + e.getMessage() + ); } catch (CancellationException e) { throw new QueryInterruptedException(e, host); diff --git a/server/src/main/java/io/druid/server/QueryResource.java b/server/src/main/java/io/druid/server/QueryResource.java index 2cb88853b113..92b659895e38 100644 --- a/server/src/main/java/io/druid/server/QueryResource.java +++ b/server/src/main/java/io/druid/server/QueryResource.java @@ -30,6 +30,7 @@ import com.google.inject.Inject; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.ServiceEmitter; +import io.druid.client.DirectDruidClient; import io.druid.guice.annotations.Json; import io.druid.guice.annotations.Smile; import io.druid.java.util.common.ISE; @@ -186,6 +187,9 @@ public Response doPost( final String currThreadName = Thread.currentThread().getName(); try { + final Map responseContext = new MapMaker().makeMap(); + responseContext.put(DirectDruidClient.QUERY_START_TIME, System.currentTimeMillis()); + query = context.getObjectMapper().readValue(in, Query.class); queryId = query.getId(); if (queryId == null) { @@ -227,7 +231,6 @@ public Response doPost( ); } - final Map responseContext = new MapMaker().makeMap(); final Sequence res = QueryPlus.wrap(query).run(texasRanger, responseContext); if (prevEtag != null && prevEtag.equals(responseContext.get(HDR_ETAG))) { @@ -330,6 +333,9 @@ public void write(OutputStream outputStream) throws IOException, WebApplicationE responseContext.remove(HDR_ETAG); } + responseContext.remove(DirectDruidClient.QUERY_START_TIME); + responseContext.remove(DirectDruidClient.QUERY_TOTAL_BYTES_GATHERED); + //Limit the response-context header, see https://github.com/druid-io/druid/issues/2331 //Note that Response.ResponseBuilder.header(String key,Object value).build() calls value.toString() //and encodes the string using ASCII, so 1 char is = 1 byte diff --git a/server/src/test/java/io/druid/client/DirectDruidClientTest.java b/server/src/test/java/io/druid/client/DirectDruidClientTest.java index c9ebab8eae8d..6d751b0865e0 100644 --- a/server/src/test/java/io/druid/client/DirectDruidClientTest.java +++ b/server/src/test/java/io/druid/client/DirectDruidClientTest.java @@ -51,6 +51,7 @@ import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.jboss.netty.handler.timeout.ReadTimeoutException; import org.joda.time.DateTime; +import org.joda.time.Duration; import org.joda.time.Interval; import org.junit.Assert; import org.junit.Test; @@ -74,7 +75,8 @@ public void testRun() throws Exception EasyMock.expect( httpClient.go( EasyMock.capture(capturedRequest), - EasyMock.anyObject() + EasyMock.anyObject(), + EasyMock.anyObject(Duration.class) ) ) .andReturn(futureResult) @@ -84,7 +86,8 @@ public void testRun() throws Exception EasyMock.expect( httpClient.go( EasyMock.capture(capturedRequest), - EasyMock.anyObject() + EasyMock.anyObject(), + EasyMock.anyObject(Duration.class) ) ) .andReturn(futureException) @@ -93,7 +96,8 @@ public void testRun() throws Exception EasyMock.expect( httpClient.go( EasyMock.capture(capturedRequest), - EasyMock.anyObject() + EasyMock.anyObject(), + EasyMock.anyObject(Duration.class) ) ) .andReturn(SettableFuture.create()) @@ -194,7 +198,8 @@ public void testCancel() throws Exception EasyMock.expect( httpClient.go( EasyMock.capture(capturedRequest), - EasyMock.anyObject() + EasyMock.anyObject(), + EasyMock.anyObject(Duration.class) ) ) .andReturn(cancelledFuture) @@ -203,7 +208,8 @@ public void testCancel() throws Exception EasyMock.expect( httpClient.go( EasyMock.capture(capturedRequest), - EasyMock.anyObject() + EasyMock.anyObject(), + EasyMock.anyObject(Duration.class) ) ) .andReturn(cancellationFuture) @@ -271,7 +277,8 @@ public void testQueryInterruptionExceptionLogMessage() throws JsonProcessingExce EasyMock.expect( httpClient.go( EasyMock.capture(capturedRequest), - EasyMock.anyObject() + EasyMock.anyObject(), + EasyMock.anyObject(Duration.class) ) ) .andReturn(interruptionFuture) From 882452fdeaa9d3f71fc8dd6474c3352eea9821ae Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Tue, 9 May 2017 15:18:53 -0500 Subject: [PATCH 2/2] address review comments --- docs/content/configuration/broker.md | 1 + docs/content/querying/query-context.md | 2 +- .../java/io/druid/query/QueryContexts.java | 21 +++++++ .../io/druid/client/DirectDruidClient.java | 58 +++++++------------ .../java/io/druid/server/QueryResource.java | 10 +++- .../server/initialization/ServerConfig.java | 10 ++++ .../druid/client/DirectDruidClientTest.java | 33 +++++++---- 7 files changed, 82 insertions(+), 53 deletions(-) diff --git a/docs/content/configuration/broker.md b/docs/content/configuration/broker.md index fa6a01b56476..1afde552524b 100644 --- a/docs/content/configuration/broker.md +++ b/docs/content/configuration/broker.md @@ -37,6 +37,7 @@ Druid uses Jetty to serve HTTP requests. |`druid.server.http.numThreads`|Number of threads for HTTP requests.|10| |`druid.server.http.maxIdleTime`|The Jetty max idle time for a connection.|PT5m| |`druid.server.http.defaultQueryTimeout`|Query timeout in millis, beyond which unfinished queries will be cancelled|300000| +|`druid.server.http.maxScatterGatherBytes`|Maximum number of bytes gathered from data nodes such as historicals and realtime processes to execute a query. This is an advance configuration that allows to protect in case broker is under heavy load and not utilizing the data gathered in memory fast enough and leading to OOMs. This limit can be further reduced at query time using `maxScatterGatherBytes` in the context. Note that having large limit is not necessarily bad if broker is never under heavy concurrent load in which case data gathered is processed quickly and freeing up the memory used.|Long.MAX_VALUE| |`druid.broker.http.numConnections`|Size of connection pool for the Broker to connect to historical and real-time processes. If there are more queries than this number that all need to speak to the same node, then they will queue up.|20| |`druid.broker.http.compressionCodec`|Compression codec the Broker uses to communicate with historical and real-time processes. May be "gzip" or "identity".|gzip| |`druid.broker.http.readTimeout`|The timeout for data reads from historical and real-time processes.|PT15M| diff --git a/docs/content/querying/query-context.md b/docs/content/querying/query-context.md index c4ea37a8c77b..440c6d8c87bc 100644 --- a/docs/content/querying/query-context.md +++ b/docs/content/querying/query-context.md @@ -10,7 +10,7 @@ The query context is used for various query configuration parameters. The follow |property |default | description | |-----------------|----------------------------------------|----------------------| |timeout | `druid.server.http.defaultQueryTimeout`| Query timeout in millis, beyond which unfinished queries will be cancelled. 0 timeout means `no timeout`. To set the default timeout, see [broker configuration](broker.html) | -|maxScatterGatherBytes| Long.MAX_VALUE | At Broker, this limits total number of bytes gathered from downstream nodes such as historicals and realtime indexers. If Broker is under heavy load and not consuming the data fast enough coming from downstream nodes then it gets stored in memory and may lead to OOMs. This setting provides a workaround to limit per query max memory utilization for storing data received from downstream nodes.| +|maxScatterGatherBytes| `druid.server.http.maxScatterGatherBytes` | Maximum number of bytes gathered from data nodes such as historicals and realtime processes to execute a query. This parameter can be used to further reduce `maxScatterGatherBytes` limit at query time. See [broker configuration](broker.html) for more details.| |priority | `0` | Query Priority. Queries with higher priority get precedence for computational resources.| |queryId | auto-generated | Unique identifier given to this query. If a query ID is set or known, this can be used to cancel the query | |useCache | `true` | Flag indicating whether to leverage the query cache for this query. When set to false, it disables reading from the query cache for this query. When set to true, Druid uses druid.broker.cache.useCache or druid.historical.cache.useCache to determine whether or not to read from the query cache | diff --git a/processing/src/main/java/io/druid/query/QueryContexts.java b/processing/src/main/java/io/druid/query/QueryContexts.java index df77f73db780..61103e7fca50 100644 --- a/processing/src/main/java/io/druid/query/QueryContexts.java +++ b/processing/src/main/java/io/druid/query/QueryContexts.java @@ -21,6 +21,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; +import io.druid.java.util.common.IAE; import io.druid.java.util.common.ISE; public class QueryContexts @@ -99,6 +100,26 @@ public static String getChunkPeriod(Query query) return query.getContextValue(CHUNK_PERIOD_KEY, "P0D"); } + public static Query withMaxScatterGatherBytes(Query query, long maxScatterGatherBytesLimit) + { + Object obj = query.getContextValue(MAX_SCATTER_GATHER_BYTES_KEY); + if (obj == null) { + return query.withOverriddenContext(ImmutableMap.of(MAX_SCATTER_GATHER_BYTES_KEY, maxScatterGatherBytesLimit)); + } else { + long curr = ((Number) obj).longValue(); + if (curr > maxScatterGatherBytesLimit) { + throw new IAE( + "configured [%s = %s] is more than enforced limit of [%s].", + MAX_SCATTER_GATHER_BYTES_KEY, + curr, + maxScatterGatherBytesLimit + ); + } else { + return query; + } + } + } + public static long getMaxScatterGatherBytes(Query query) { return parseLong(query, MAX_SCATTER_GATHER_BYTES_KEY, Long.MAX_VALUE); diff --git a/server/src/main/java/io/druid/client/DirectDruidClient.java b/server/src/main/java/io/druid/client/DirectDruidClient.java index f96279039884..5429485024f4 100644 --- a/server/src/main/java/io/druid/client/DirectDruidClient.java +++ b/server/src/main/java/io/druid/client/DirectDruidClient.java @@ -97,7 +97,7 @@ */ public class DirectDruidClient implements QueryRunner { - public static final String QUERY_START_TIME = "queryStartTime"; + public static final String QUERY_FAIL_TIME = "queryFailTime"; public static final String QUERY_TOTAL_BYTES_GATHERED = "queryTotalBytesGathered"; private static final Logger log = new Logger(DirectDruidClient.class); @@ -176,6 +176,10 @@ public Sequence run(final QueryPlus queryPlus, final Map c final QueryMetrics> queryMetrics = toolChest.makeMetrics(query); queryMetrics.server(host); + long timeoutAt = ((Long) context.get(QUERY_FAIL_TIME)).longValue(); + long maxScatterGatherBytes = QueryContexts.getMaxScatterGatherBytes(query); + AtomicLong totalBytesGathered = (AtomicLong) context.get(QUERY_TOTAL_BYTES_GATHERED); + final HttpResponseHandler responseHandler = new HttpResponseHandler() { private long responseStartTimeNs; @@ -370,53 +374,31 @@ public int read() throws IOException // Returns remaining timeout or throws exception if timeout already elapsed. private long checkQueryTimeout() { - Object obj = context.get(QUERY_START_TIME); - - if (obj != null) { - long timeLeft = QueryContexts.getTimeout(query) - (System.currentTimeMillis() - ((Long) obj).longValue()); - if (timeLeft <= 0) { - String msg = StringUtils.safeFormat("Query[%s] url[%s] timed out.", query.getId(), url); - setupResponseReadFailure(msg, null); - throw new RE(msg); - } else { - return timeLeft; - } + long timeLeft = timeoutAt - System.currentTimeMillis(); + if (timeLeft >= 0) { + String msg = StringUtils.safeFormat("Query[%s] url[%s] timed out.", query.getId(), url); + setupResponseReadFailure(msg, null); + throw new RE(msg); } else { - return QueryContexts.getTimeout(query); + return timeLeft; } } private void checkTotalBytesLimit(long bytes) { - long limit = QueryContexts.getMaxScatterGatherBytes(query); - - if (limit < Long.MAX_VALUE) { - synchronized (context) { - if(context.get(QUERY_TOTAL_BYTES_GATHERED) == null) { - context.put(QUERY_TOTAL_BYTES_GATHERED, new AtomicLong()); - } - } - - AtomicLong totalBytesGathered = (AtomicLong) context.get(QUERY_TOTAL_BYTES_GATHERED); - - if (totalBytesGathered.addAndGet(bytes) > limit) { - String msg = StringUtils.safeFormat( - "Query[%s] url[%s] max scatter-gather bytes limit reached.", - query.getId(), - url - ); - setupResponseReadFailure(msg, null); - throw new RE(msg); - } + if (maxScatterGatherBytes < Long.MAX_VALUE && totalBytesGathered.addAndGet(bytes) > maxScatterGatherBytes) { + String msg = StringUtils.safeFormat( + "Query[%s] url[%s] max scatter-gather bytes limit reached.", + query.getId(), + url + ); + setupResponseReadFailure(msg, null); + throw new RE(msg); } } }; - long timeLeft = QueryContexts.getTimeout(query); - Object queryStartTimeObj = context.get(QUERY_START_TIME); - if (queryStartTimeObj != null) { - timeLeft = timeLeft - (System.currentTimeMillis() - ((Long) queryStartTimeObj).longValue()); - } + long timeLeft = timeoutAt - System.currentTimeMillis(); if (timeLeft <= 0) { throw new RE("Query[%s] url[%s] timed out.", query.getId(), url); diff --git a/server/src/main/java/io/druid/server/QueryResource.java b/server/src/main/java/io/druid/server/QueryResource.java index 92b659895e38..40aa8d52bc69 100644 --- a/server/src/main/java/io/druid/server/QueryResource.java +++ b/server/src/main/java/io/druid/server/QueryResource.java @@ -188,7 +188,6 @@ public Response doPost( final String currThreadName = Thread.currentThread().getName(); try { final Map responseContext = new MapMaker().makeMap(); - responseContext.put(DirectDruidClient.QUERY_START_TIME, System.currentTimeMillis()); query = context.getObjectMapper().readValue(in, Query.class); queryId = query.getId(); @@ -197,6 +196,13 @@ public Response doPost( query = query.withId(queryId); } query = QueryContexts.withDefaultTimeout(query, config.getDefaultQueryTimeout()); + query = QueryContexts.withMaxScatterGatherBytes(query, config.getMaxScatterGatherBytes()); + + responseContext.put( + DirectDruidClient.QUERY_FAIL_TIME, + System.currentTimeMillis() + QueryContexts.getTimeout(query) + ); + responseContext.put(DirectDruidClient.QUERY_TOTAL_BYTES_GATHERED, new AtomicLong()); toolChest = warehouse.getToolChest(query); @@ -333,7 +339,7 @@ public void write(OutputStream outputStream) throws IOException, WebApplicationE responseContext.remove(HDR_ETAG); } - responseContext.remove(DirectDruidClient.QUERY_START_TIME); + responseContext.remove(DirectDruidClient.QUERY_FAIL_TIME); responseContext.remove(DirectDruidClient.QUERY_TOTAL_BYTES_GATHERED); //Limit the response-context header, see https://github.com/druid-io/druid/issues/2331 diff --git a/server/src/main/java/io/druid/server/initialization/ServerConfig.java b/server/src/main/java/io/druid/server/initialization/ServerConfig.java index 560975abe186..97ba7b888b82 100644 --- a/server/src/main/java/io/druid/server/initialization/ServerConfig.java +++ b/server/src/main/java/io/druid/server/initialization/ServerConfig.java @@ -41,6 +41,10 @@ public class ServerConfig @Min(0) private long defaultQueryTimeout = 300_000; // 5 minutes + @JsonProperty + @Min(1) + private long maxScatterGatherBytes = Long.MAX_VALUE; + public int getNumThreads() { return numThreads; @@ -56,6 +60,11 @@ public long getDefaultQueryTimeout() return defaultQueryTimeout; } + public long getMaxScatterGatherBytes() + { + return maxScatterGatherBytes; + } + @Override public String toString() { @@ -63,6 +72,7 @@ public String toString() "numThreads=" + numThreads + ", maxIdleTime=" + maxIdleTime + ", defaultQueryTimeout=" + defaultQueryTimeout + + ", maxScatterGatherBytes=" + maxScatterGatherBytes + '}'; } } diff --git a/server/src/test/java/io/druid/client/DirectDruidClientTest.java b/server/src/test/java/io/druid/client/DirectDruidClientTest.java index 6d751b0865e0..3f09859b8545 100644 --- a/server/src/test/java/io/druid/client/DirectDruidClientTest.java +++ b/server/src/test/java/io/druid/client/DirectDruidClientTest.java @@ -61,9 +61,20 @@ import java.net.URL; import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; public class DirectDruidClientTest { + private final Map defaultContext; + + public DirectDruidClientTest() + { + defaultContext = new HashMap<>(); + defaultContext.put(DirectDruidClient.QUERY_FAIL_TIME, Long.MAX_VALUE); + defaultContext.put(DirectDruidClient.QUERY_TOTAL_BYTES_GATHERED, new AtomicLong()); + } + @Test public void testRun() throws Exception { @@ -149,23 +160,23 @@ public void testRun() throws Exception serverSelector.addServerAndUpdateSegment(queryableDruidServer2, serverSelector.getSegment()); TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build(); - HashMap context = Maps.newHashMap(); - Sequence s1 = client1.run(query, context); + + Sequence s1 = client1.run(query, defaultContext); Assert.assertTrue(capturedRequest.hasCaptured()); Assert.assertEquals(url, capturedRequest.getValue().getUrl()); Assert.assertEquals(HttpMethod.POST, capturedRequest.getValue().getMethod()); Assert.assertEquals(1, client1.getNumOpenConnections()); // simulate read timeout - Sequence s2 = client1.run(query, context); + Sequence s2 = client1.run(query, defaultContext); Assert.assertEquals(2, client1.getNumOpenConnections()); futureException.setException(new ReadTimeoutException()); Assert.assertEquals(1, client1.getNumOpenConnections()); // subsequent connections should work - Sequence s3 = client1.run(query, context); - Sequence s4 = client1.run(query, context); - Sequence s5 = client1.run(query, context); + Sequence s3 = client1.run(query, defaultContext); + Sequence s4 = client1.run(query, defaultContext); + Sequence s5 = client1.run(query, defaultContext); Assert.assertTrue(client1.getNumOpenConnections() == 4); @@ -176,8 +187,8 @@ public void testRun() throws Exception Assert.assertEquals(new DateTime("2014-01-01T01:02:03Z"), results.get(0).getTimestamp()); Assert.assertEquals(3, client1.getNumOpenConnections()); - client2.run(query, context); - client2.run(query, context); + client2.run(query, defaultContext); + client2.run(query, defaultContext); Assert.assertTrue(client2.getNumOpenConnections() == 2); @@ -248,9 +259,8 @@ public void testCancel() throws Exception serverSelector.addServerAndUpdateSegment(queryableDruidServer1, serverSelector.getSegment()); TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build(); - HashMap context = Maps.newHashMap(); cancellationFuture.set(new StatusResponseHolder(HttpResponseStatus.OK, new StringBuilder("cancelled"))); - Sequence results = client1.run(query, context); + Sequence results = client1.run(query, defaultContext); Assert.assertEquals(HttpMethod.DELETE, capturedRequest.getValue().getMethod()); Assert.assertEquals(0, client1.getNumOpenConnections()); @@ -319,9 +329,8 @@ public void testQueryInterruptionExceptionLogMessage() throws JsonProcessingExce serverSelector.addServerAndUpdateSegment(queryableDruidServer, dataSegment); TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build(); - HashMap context = Maps.newHashMap(); interruptionFuture.set(new ByteArrayInputStream("{\"error\":\"testing1\",\"errorMessage\":\"testing2\"}".getBytes())); - Sequence results = client1.run(query, context); + Sequence results = client1.run(query, defaultContext); QueryInterruptedException actualException = null; try {