diff --git a/docs/content/configuration/broker.md b/docs/content/configuration/broker.md index fa6a01b56476..1afde552524b 100644 --- a/docs/content/configuration/broker.md +++ b/docs/content/configuration/broker.md @@ -37,6 +37,7 @@ Druid uses Jetty to serve HTTP requests. |`druid.server.http.numThreads`|Number of threads for HTTP requests.|10| |`druid.server.http.maxIdleTime`|The Jetty max idle time for a connection.|PT5m| |`druid.server.http.defaultQueryTimeout`|Query timeout in millis, beyond which unfinished queries will be cancelled|300000| +|`druid.server.http.maxScatterGatherBytes`|Maximum number of bytes gathered from data nodes such as historicals and realtime processes to execute a query. This is an advance configuration that allows to protect in case broker is under heavy load and not utilizing the data gathered in memory fast enough and leading to OOMs. This limit can be further reduced at query time using `maxScatterGatherBytes` in the context. Note that having large limit is not necessarily bad if broker is never under heavy concurrent load in which case data gathered is processed quickly and freeing up the memory used.|Long.MAX_VALUE| |`druid.broker.http.numConnections`|Size of connection pool for the Broker to connect to historical and real-time processes. If there are more queries than this number that all need to speak to the same node, then they will queue up.|20| |`druid.broker.http.compressionCodec`|Compression codec the Broker uses to communicate with historical and real-time processes. May be "gzip" or "identity".|gzip| |`druid.broker.http.readTimeout`|The timeout for data reads from historical and real-time processes.|PT15M| diff --git a/docs/content/querying/query-context.md b/docs/content/querying/query-context.md index d53a239ad8cc..440c6d8c87bc 100644 --- a/docs/content/querying/query-context.md +++ b/docs/content/querying/query-context.md @@ -10,6 +10,7 @@ The query context is used for various query configuration parameters. The follow |property |default | description | |-----------------|----------------------------------------|----------------------| |timeout | `druid.server.http.defaultQueryTimeout`| Query timeout in millis, beyond which unfinished queries will be cancelled. 0 timeout means `no timeout`. To set the default timeout, see [broker configuration](broker.html) | +|maxScatterGatherBytes| `druid.server.http.maxScatterGatherBytes` | Maximum number of bytes gathered from data nodes such as historicals and realtime processes to execute a query. This parameter can be used to further reduce `maxScatterGatherBytes` limit at query time. See [broker configuration](broker.html) for more details.| |priority | `0` | Query Priority. Queries with higher priority get precedence for computational resources.| |queryId | auto-generated | Unique identifier given to this query. If a query ID is set or known, this can be used to cancel the query | |useCache | `true` | Flag indicating whether to leverage the query cache for this query. When set to false, it disables reading from the query cache for this query. When set to true, Druid uses druid.broker.cache.useCache or druid.historical.cache.useCache to determine whether or not to read from the query cache | diff --git a/processing/src/main/java/io/druid/query/QueryContexts.java b/processing/src/main/java/io/druid/query/QueryContexts.java index 5a0409e430d4..61103e7fca50 100644 --- a/processing/src/main/java/io/druid/query/QueryContexts.java +++ b/processing/src/main/java/io/druid/query/QueryContexts.java @@ -21,12 +21,14 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; +import io.druid.java.util.common.IAE; import io.druid.java.util.common.ISE; public class QueryContexts { public static final String PRIORITY_KEY = "priority"; public static final String TIMEOUT_KEY = "timeout"; + public static final String MAX_SCATTER_GATHER_BYTES_KEY = "maxScatterGatherBytes"; public static final String DEFAULT_TIMEOUT_KEY = "defaultTimeout"; public static final String CHUNK_PERIOD_KEY = "chunkPeriod"; @@ -98,6 +100,31 @@ public static String getChunkPeriod(Query query) return query.getContextValue(CHUNK_PERIOD_KEY, "P0D"); } + public static Query withMaxScatterGatherBytes(Query query, long maxScatterGatherBytesLimit) + { + Object obj = query.getContextValue(MAX_SCATTER_GATHER_BYTES_KEY); + if (obj == null) { + return query.withOverriddenContext(ImmutableMap.of(MAX_SCATTER_GATHER_BYTES_KEY, maxScatterGatherBytesLimit)); + } else { + long curr = ((Number) obj).longValue(); + if (curr > maxScatterGatherBytesLimit) { + throw new IAE( + "configured [%s = %s] is more than enforced limit of [%s].", + MAX_SCATTER_GATHER_BYTES_KEY, + curr, + maxScatterGatherBytesLimit + ); + } else { + return query; + } + } + } + + public static long getMaxScatterGatherBytes(Query query) + { + return parseLong(query, MAX_SCATTER_GATHER_BYTES_KEY, Long.MAX_VALUE); + } + public static boolean hasTimeout(Query query) { return getTimeout(query) != NO_TIMEOUT; @@ -115,6 +142,11 @@ public static long getTimeout(Query query, long defaultTimeout) return timeout; } + public static Query withTimeout(Query query, long timeout) + { + return query.withOverriddenContext(ImmutableMap.of(TIMEOUT_KEY, timeout)); + } + public static Query withDefaultTimeout(Query query, long defaultTimeout) { return query.withOverriddenContext(ImmutableMap.of(QueryContexts.DEFAULT_TIMEOUT_KEY, defaultTimeout)); diff --git a/processing/src/main/java/io/druid/query/ResourceLimitExceededException.java b/processing/src/main/java/io/druid/query/ResourceLimitExceededException.java index 1ed038840ba8..06b74ccf4a57 100644 --- a/processing/src/main/java/io/druid/query/ResourceLimitExceededException.java +++ b/processing/src/main/java/io/druid/query/ResourceLimitExceededException.java @@ -19,6 +19,8 @@ package io.druid.query; +import io.druid.common.utils.StringUtils; + /** * Exception indicating that an operation failed because it exceeded some configured resource limit. * @@ -27,8 +29,8 @@ */ public class ResourceLimitExceededException extends RuntimeException { - public ResourceLimitExceededException(String message) + public ResourceLimitExceededException(String message, Object... arguments) { - super(message); + super(StringUtils.safeFormat(message, arguments)); } } diff --git a/server/src/main/java/io/druid/client/DirectDruidClient.java b/server/src/main/java/io/druid/client/DirectDruidClient.java index 98d04bea2a48..5429485024f4 100644 --- a/server/src/main/java/io/druid/client/DirectDruidClient.java +++ b/server/src/main/java/io/druid/client/DirectDruidClient.java @@ -42,6 +42,7 @@ import com.metamx.http.client.response.HttpResponseHandler; import com.metamx.http.client.response.StatusResponseHandler; import com.metamx.http.client.response.StatusResponseHolder; +import io.druid.common.utils.StringUtils; import io.druid.java.util.common.IAE; import io.druid.java.util.common.Pair; import io.druid.java.util.common.RE; @@ -60,6 +61,7 @@ import io.druid.query.QueryToolChest; import io.druid.query.QueryToolChestWarehouse; import io.druid.query.QueryWatcher; +import io.druid.query.ResourceLimitExceededException; import io.druid.query.Result; import io.druid.query.aggregation.MetricManipulatorFns; import org.jboss.netty.buffer.ChannelBuffer; @@ -68,6 +70,7 @@ import org.jboss.netty.handler.codec.http.HttpHeaders; import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpResponse; +import org.joda.time.Duration; import javax.ws.rs.core.MediaType; import java.io.Closeable; @@ -84,14 +87,19 @@ import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; /** */ public class DirectDruidClient implements QueryRunner { + public static final String QUERY_FAIL_TIME = "queryFailTime"; + public static final String QUERY_TOTAL_BYTES_GATHERED = "queryTotalBytesGathered"; + private static final Logger log = new Logger(DirectDruidClient.class); private static final Map, Pair> typesMap = Maps.newConcurrentMap(); @@ -168,16 +176,24 @@ public Sequence run(final QueryPlus queryPlus, final Map c final QueryMetrics> queryMetrics = toolChest.makeMetrics(query); queryMetrics.server(host); + long timeoutAt = ((Long) context.get(QUERY_FAIL_TIME)).longValue(); + long maxScatterGatherBytes = QueryContexts.getMaxScatterGatherBytes(query); + AtomicLong totalBytesGathered = (AtomicLong) context.get(QUERY_TOTAL_BYTES_GATHERED); + final HttpResponseHandler responseHandler = new HttpResponseHandler() { private long responseStartTimeNs; private final AtomicLong byteCount = new AtomicLong(0); private final BlockingQueue queue = new LinkedBlockingQueue<>(); private final AtomicBoolean done = new AtomicBoolean(false); + private final AtomicReference fail = new AtomicReference<>(); @Override public ClientResponse handleResponse(HttpResponse response) { + checkQueryTimeout(); + checkTotalBytesLimit(response.getContent().readableBytes()); + log.debug("Initial response from url[%s] for queryId[%s]", url, query.getId()); responseStartTimeNs = System.nanoTime(); queryMetrics.reportNodeTimeToFirstByte(responseStartTimeNs - requestStartTimeNs).emit(emitter); @@ -222,6 +238,11 @@ public int read() throws IOException @Override public boolean hasMoreElements() { + if (fail.get() != null) { + throw new RE(fail.get()); + } + checkQueryTimeout(); + // Done is always true until the last stream has be put in the queue. // Then the stream should be spouting good InputStreams. synchronized (done) { @@ -232,8 +253,17 @@ public boolean hasMoreElements() @Override public InputStream nextElement() { + if (fail.get() != null) { + throw new RE(fail.get()); + } + try { - return queue.take(); + InputStream is = queue.poll(checkQueryTimeout(), TimeUnit.MILLISECONDS); + if (is != null) { + return is; + } else { + throw new RE("Query[%s] url[%s] timed out.", query.getId(), url); + } } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -250,8 +280,13 @@ public ClientResponse handleChunk( ClientResponse clientResponse, HttpChunk chunk ) { + checkQueryTimeout(); + final ChannelBuffer channelBuffer = chunk.getContent(); final int bytes = channelBuffer.readableBytes(); + + checkTotalBytesLimit(bytes); + if (bytes > 0) { try { queue.put(new ChannelBufferInputStream(channelBuffer)); @@ -308,34 +343,78 @@ public ClientResponse done(ClientResponse clientRespon @Override public void exceptionCaught(final ClientResponse clientResponse, final Throwable e) { - // Don't wait for lock in case the lock had something to do with the error - synchronized (done) { - done.set(true); - // Make a best effort to put a zero length buffer into the queue in case something is waiting on the take() - // If nothing is waiting on take(), this will be closed out anyways. - queue.offer( - new InputStream() - { - @Override - public int read() throws IOException - { - throw new IOException(e); - } - } + String msg = StringUtils.safeFormat( + "Query[%s] url[%s] failed with exception msg [%s]", + query.getId(), + url, + e.getMessage() + ); + setupResponseReadFailure(msg, e); + } + + private void setupResponseReadFailure(String msg, Throwable th) + { + fail.set(msg); + queue.clear(); + queue.offer(new InputStream() + { + @Override + public int read() throws IOException + { + if (th != null) { + throw new IOException(msg, th); + } else { + throw new IOException(msg); + } + } + }); + + } + + // Returns remaining timeout or throws exception if timeout already elapsed. + private long checkQueryTimeout() + { + long timeLeft = timeoutAt - System.currentTimeMillis(); + if (timeLeft >= 0) { + String msg = StringUtils.safeFormat("Query[%s] url[%s] timed out.", query.getId(), url); + setupResponseReadFailure(msg, null); + throw new RE(msg); + } else { + return timeLeft; + } + } + + private void checkTotalBytesLimit(long bytes) + { + if (maxScatterGatherBytes < Long.MAX_VALUE && totalBytesGathered.addAndGet(bytes) > maxScatterGatherBytes) { + String msg = StringUtils.safeFormat( + "Query[%s] url[%s] max scatter-gather bytes limit reached.", + query.getId(), + url ); + setupResponseReadFailure(msg, null); + throw new RE(msg); } } }; + + long timeLeft = timeoutAt - System.currentTimeMillis(); + + if (timeLeft <= 0) { + throw new RE("Query[%s] url[%s] timed out.", query.getId(), url); + } + future = httpClient.go( new Request( HttpMethod.POST, new URL(url) - ).setContent(objectMapper.writeValueAsBytes(query)) + ).setContent(objectMapper.writeValueAsBytes(QueryContexts.withTimeout(query, timeLeft))) .setHeader( HttpHeaders.Names.CONTENT_TYPE, isSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON ), - responseHandler + responseHandler, + Duration.millis(timeLeft) ); queryWatcher.registerQuery(query, future); @@ -368,8 +447,10 @@ public void onFailure(Throwable t) ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON ), - new StatusResponseHandler(Charsets.UTF_8) - ).get(); + new StatusResponseHandler(Charsets.UTF_8), + Duration.standardSeconds(1) + ).get(1, TimeUnit.SECONDS); + if (res.getStatus().getCode() >= 500) { throw new RE( "Error cancelling query[%s]: queriable node returned status[%d] [%s].", @@ -378,7 +459,7 @@ public void onFailure(Throwable t) ); } } - catch (IOException | ExecutionException | InterruptedException e) { + catch (IOException | ExecutionException | InterruptedException | TimeoutException e) { Throwables.propagate(e); } } @@ -396,7 +477,7 @@ public void onFailure(Throwable t) @Override public JsonParserIterator make() { - return new JsonParserIterator(typeRef, future, url); + return new JsonParserIterator(typeRef, future, url, query); } @Override @@ -428,13 +509,15 @@ private class JsonParserIterator implements Iterator, Closeable private ObjectCodec objectCodec; private final JavaType typeRef; private final Future future; + private final Query query; private final String url; - public JsonParserIterator(JavaType typeRef, Future future, String url) + public JsonParserIterator(JavaType typeRef, Future future, String url, Query query) { this.typeRef = typeRef; this.future = future; this.url = url; + this.query = query; jp = null; } @@ -458,6 +541,7 @@ public boolean hasNext() public T next() { init(); + try { final T retVal = objectCodec.readValue(jp, typeRef); jp.nextToken(); @@ -478,7 +562,19 @@ private void init() { if (jp == null) { try { - jp = objectMapper.getFactory().createParser(future.get()); + InputStream is = future.get(); + if (is == null) { + throw new QueryInterruptedException( + new ResourceLimitExceededException( + "query[%s] url[%s] timed out or max bytes limit reached.", + query.getId(), + url + ), + host + ); + } else { + jp = objectMapper.getFactory().createParser(is); + } final JsonToken nextToken = jp.nextToken(); if (nextToken == JsonToken.START_OBJECT) { QueryInterruptedException cause = jp.getCodec().readValue(jp, QueryInterruptedException.class); @@ -491,7 +587,13 @@ private void init() } } catch (IOException | InterruptedException | ExecutionException e) { - throw new RE(e, "Failure getting results from[%s] because of [%s]", url, e.getMessage()); + throw new RE( + e, + "Failure getting results for query[%s] url[%s] because of [%s]", + query.getId(), + url, + e.getMessage() + ); } catch (CancellationException e) { throw new QueryInterruptedException(e, host); diff --git a/server/src/main/java/io/druid/server/QueryResource.java b/server/src/main/java/io/druid/server/QueryResource.java index 2cb88853b113..40aa8d52bc69 100644 --- a/server/src/main/java/io/druid/server/QueryResource.java +++ b/server/src/main/java/io/druid/server/QueryResource.java @@ -30,6 +30,7 @@ import com.google.inject.Inject; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.ServiceEmitter; +import io.druid.client.DirectDruidClient; import io.druid.guice.annotations.Json; import io.druid.guice.annotations.Smile; import io.druid.java.util.common.ISE; @@ -186,6 +187,8 @@ public Response doPost( final String currThreadName = Thread.currentThread().getName(); try { + final Map responseContext = new MapMaker().makeMap(); + query = context.getObjectMapper().readValue(in, Query.class); queryId = query.getId(); if (queryId == null) { @@ -193,6 +196,13 @@ public Response doPost( query = query.withId(queryId); } query = QueryContexts.withDefaultTimeout(query, config.getDefaultQueryTimeout()); + query = QueryContexts.withMaxScatterGatherBytes(query, config.getMaxScatterGatherBytes()); + + responseContext.put( + DirectDruidClient.QUERY_FAIL_TIME, + System.currentTimeMillis() + QueryContexts.getTimeout(query) + ); + responseContext.put(DirectDruidClient.QUERY_TOTAL_BYTES_GATHERED, new AtomicLong()); toolChest = warehouse.getToolChest(query); @@ -227,7 +237,6 @@ public Response doPost( ); } - final Map responseContext = new MapMaker().makeMap(); final Sequence res = QueryPlus.wrap(query).run(texasRanger, responseContext); if (prevEtag != null && prevEtag.equals(responseContext.get(HDR_ETAG))) { @@ -330,6 +339,9 @@ public void write(OutputStream outputStream) throws IOException, WebApplicationE responseContext.remove(HDR_ETAG); } + responseContext.remove(DirectDruidClient.QUERY_FAIL_TIME); + responseContext.remove(DirectDruidClient.QUERY_TOTAL_BYTES_GATHERED); + //Limit the response-context header, see https://github.com/druid-io/druid/issues/2331 //Note that Response.ResponseBuilder.header(String key,Object value).build() calls value.toString() //and encodes the string using ASCII, so 1 char is = 1 byte diff --git a/server/src/main/java/io/druid/server/initialization/ServerConfig.java b/server/src/main/java/io/druid/server/initialization/ServerConfig.java index 560975abe186..97ba7b888b82 100644 --- a/server/src/main/java/io/druid/server/initialization/ServerConfig.java +++ b/server/src/main/java/io/druid/server/initialization/ServerConfig.java @@ -41,6 +41,10 @@ public class ServerConfig @Min(0) private long defaultQueryTimeout = 300_000; // 5 minutes + @JsonProperty + @Min(1) + private long maxScatterGatherBytes = Long.MAX_VALUE; + public int getNumThreads() { return numThreads; @@ -56,6 +60,11 @@ public long getDefaultQueryTimeout() return defaultQueryTimeout; } + public long getMaxScatterGatherBytes() + { + return maxScatterGatherBytes; + } + @Override public String toString() { @@ -63,6 +72,7 @@ public String toString() "numThreads=" + numThreads + ", maxIdleTime=" + maxIdleTime + ", defaultQueryTimeout=" + defaultQueryTimeout + + ", maxScatterGatherBytes=" + maxScatterGatherBytes + '}'; } } diff --git a/server/src/test/java/io/druid/client/DirectDruidClientTest.java b/server/src/test/java/io/druid/client/DirectDruidClientTest.java index c9ebab8eae8d..3f09859b8545 100644 --- a/server/src/test/java/io/druid/client/DirectDruidClientTest.java +++ b/server/src/test/java/io/druid/client/DirectDruidClientTest.java @@ -51,6 +51,7 @@ import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.jboss.netty.handler.timeout.ReadTimeoutException; import org.joda.time.DateTime; +import org.joda.time.Duration; import org.joda.time.Interval; import org.junit.Assert; import org.junit.Test; @@ -60,9 +61,20 @@ import java.net.URL; import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; public class DirectDruidClientTest { + private final Map defaultContext; + + public DirectDruidClientTest() + { + defaultContext = new HashMap<>(); + defaultContext.put(DirectDruidClient.QUERY_FAIL_TIME, Long.MAX_VALUE); + defaultContext.put(DirectDruidClient.QUERY_TOTAL_BYTES_GATHERED, new AtomicLong()); + } + @Test public void testRun() throws Exception { @@ -74,7 +86,8 @@ public void testRun() throws Exception EasyMock.expect( httpClient.go( EasyMock.capture(capturedRequest), - EasyMock.anyObject() + EasyMock.anyObject(), + EasyMock.anyObject(Duration.class) ) ) .andReturn(futureResult) @@ -84,7 +97,8 @@ public void testRun() throws Exception EasyMock.expect( httpClient.go( EasyMock.capture(capturedRequest), - EasyMock.anyObject() + EasyMock.anyObject(), + EasyMock.anyObject(Duration.class) ) ) .andReturn(futureException) @@ -93,7 +107,8 @@ public void testRun() throws Exception EasyMock.expect( httpClient.go( EasyMock.capture(capturedRequest), - EasyMock.anyObject() + EasyMock.anyObject(), + EasyMock.anyObject(Duration.class) ) ) .andReturn(SettableFuture.create()) @@ -145,23 +160,23 @@ public void testRun() throws Exception serverSelector.addServerAndUpdateSegment(queryableDruidServer2, serverSelector.getSegment()); TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build(); - HashMap context = Maps.newHashMap(); - Sequence s1 = client1.run(query, context); + + Sequence s1 = client1.run(query, defaultContext); Assert.assertTrue(capturedRequest.hasCaptured()); Assert.assertEquals(url, capturedRequest.getValue().getUrl()); Assert.assertEquals(HttpMethod.POST, capturedRequest.getValue().getMethod()); Assert.assertEquals(1, client1.getNumOpenConnections()); // simulate read timeout - Sequence s2 = client1.run(query, context); + Sequence s2 = client1.run(query, defaultContext); Assert.assertEquals(2, client1.getNumOpenConnections()); futureException.setException(new ReadTimeoutException()); Assert.assertEquals(1, client1.getNumOpenConnections()); // subsequent connections should work - Sequence s3 = client1.run(query, context); - Sequence s4 = client1.run(query, context); - Sequence s5 = client1.run(query, context); + Sequence s3 = client1.run(query, defaultContext); + Sequence s4 = client1.run(query, defaultContext); + Sequence s5 = client1.run(query, defaultContext); Assert.assertTrue(client1.getNumOpenConnections() == 4); @@ -172,8 +187,8 @@ public void testRun() throws Exception Assert.assertEquals(new DateTime("2014-01-01T01:02:03Z"), results.get(0).getTimestamp()); Assert.assertEquals(3, client1.getNumOpenConnections()); - client2.run(query, context); - client2.run(query, context); + client2.run(query, defaultContext); + client2.run(query, defaultContext); Assert.assertTrue(client2.getNumOpenConnections() == 2); @@ -194,7 +209,8 @@ public void testCancel() throws Exception EasyMock.expect( httpClient.go( EasyMock.capture(capturedRequest), - EasyMock.anyObject() + EasyMock.anyObject(), + EasyMock.anyObject(Duration.class) ) ) .andReturn(cancelledFuture) @@ -203,7 +219,8 @@ public void testCancel() throws Exception EasyMock.expect( httpClient.go( EasyMock.capture(capturedRequest), - EasyMock.anyObject() + EasyMock.anyObject(), + EasyMock.anyObject(Duration.class) ) ) .andReturn(cancellationFuture) @@ -242,9 +259,8 @@ public void testCancel() throws Exception serverSelector.addServerAndUpdateSegment(queryableDruidServer1, serverSelector.getSegment()); TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build(); - HashMap context = Maps.newHashMap(); cancellationFuture.set(new StatusResponseHolder(HttpResponseStatus.OK, new StringBuilder("cancelled"))); - Sequence results = client1.run(query, context); + Sequence results = client1.run(query, defaultContext); Assert.assertEquals(HttpMethod.DELETE, capturedRequest.getValue().getMethod()); Assert.assertEquals(0, client1.getNumOpenConnections()); @@ -271,7 +287,8 @@ public void testQueryInterruptionExceptionLogMessage() throws JsonProcessingExce EasyMock.expect( httpClient.go( EasyMock.capture(capturedRequest), - EasyMock.anyObject() + EasyMock.anyObject(), + EasyMock.anyObject(Duration.class) ) ) .andReturn(interruptionFuture) @@ -312,9 +329,8 @@ public void testQueryInterruptionExceptionLogMessage() throws JsonProcessingExce serverSelector.addServerAndUpdateSegment(queryableDruidServer, dataSegment); TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build(); - HashMap context = Maps.newHashMap(); interruptionFuture.set(new ByteArrayInputStream("{\"error\":\"testing1\",\"errorMessage\":\"testing2\"}".getBytes())); - Sequence results = client1.run(query, context); + Sequence results = client1.run(query, defaultContext); QueryInterruptedException actualException = null; try {