From 20540a44f79e074bd68c325c10d224c751654724 Mon Sep 17 00:00:00 2001 From: Adam Welsh Date: Tue, 29 May 2018 09:58:40 +0100 Subject: [PATCH 1/4] Fix defaultQueryTimeout - set default timeout in query context before query fail time is evaluated Remove unused import --- .../overlord/SingleTaskBackgroundRunner.java | 3 ++- .../java/io/druid/client/DirectDruidClient.java | 16 +--------------- .../druid/server/ClientQuerySegmentWalker.java | 3 ++- .../java/io/druid/server/QueryLifecycle.java | 7 ++----- .../server/SetAndVerifyContextQueryRunner.java | 9 +++++++-- .../druid/server/coordination/ServerManager.java | 3 ++- 6 files changed, 16 insertions(+), 25 deletions(-) diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/SingleTaskBackgroundRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/SingleTaskBackgroundRunner.java index 14c8f8cdef67..b141dda068e7 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/SingleTaskBackgroundRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/SingleTaskBackgroundRunner.java @@ -345,7 +345,8 @@ private QueryRunner getQueryRunnerImpl(Query query) return new SetAndVerifyContextQueryRunner<>( serverConfig, - queryRunner == null ? new NoopQueryRunner<>() : queryRunner + queryRunner == null ? new NoopQueryRunner<>() : queryRunner, + System.currentTimeMillis() ); } diff --git a/server/src/main/java/io/druid/client/DirectDruidClient.java b/server/src/main/java/io/druid/client/DirectDruidClient.java index 8db791b1c9ba..ecc988802b4a 100644 --- a/server/src/main/java/io/druid/client/DirectDruidClient.java +++ b/server/src/main/java/io/druid/client/DirectDruidClient.java @@ -124,20 +124,6 @@ public static void removeMagicResponseContextFields(Map response responseContext.remove(DirectDruidClient.QUERY_TOTAL_BYTES_GATHERED); } - public static Map makeResponseContextForQuery(Query query, long startTimeMillis) - { - final Map responseContext = new ConcurrentHashMap<>(); - responseContext.put( - DirectDruidClient.QUERY_FAIL_TIME, - startTimeMillis + QueryContexts.getTimeout(query) - ); - responseContext.put( - DirectDruidClient.QUERY_TOTAL_BYTES_GATHERED, - new AtomicLong() - ); - return responseContext; - } - public DirectDruidClient( QueryToolChestWarehouse warehouse, QueryWatcher queryWatcher, @@ -199,7 +185,7 @@ public Sequence run(final QueryPlus queryPlus, final Map c final long requestStartTimeNs = System.nanoTime(); - long timeoutAt = ((Long) context.get(QUERY_FAIL_TIME)).longValue(); + long timeoutAt = query.getContextValue(QUERY_FAIL_TIME); long maxScatterGatherBytes = QueryContexts.getMaxScatterGatherBytes(query); AtomicLong totalBytesGathered = (AtomicLong) context.get(QUERY_TOTAL_BYTES_GATHERED); diff --git a/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java b/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java index 9540e897bb14..e664bf075217 100644 --- a/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java +++ b/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java @@ -122,7 +122,8 @@ private QueryRunner makeRunner( baseClientRunner, retryConfig, objectMapper - ) + ), + System.currentTimeMillis() ) ) .applyPreMergeDecoration() diff --git a/server/src/main/java/io/druid/server/QueryLifecycle.java b/server/src/main/java/io/druid/server/QueryLifecycle.java index b3049a2fa339..08b828ce6749 100644 --- a/server/src/main/java/io/druid/server/QueryLifecycle.java +++ b/server/src/main/java/io/druid/server/QueryLifecycle.java @@ -22,7 +22,6 @@ import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.Iterables; -import io.druid.client.DirectDruidClient; import io.druid.java.util.common.DateTimes; import io.druid.java.util.common.ISE; import io.druid.java.util.common.guava.Sequence; @@ -50,6 +49,7 @@ import java.util.LinkedHashMap; import java.util.Map; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; /** @@ -247,10 +247,7 @@ public QueryResponse execute() { transition(State.AUTHORIZED, State.EXECUTING); - final Map responseContext = DirectDruidClient.makeResponseContextForQuery( - baseQuery, - System.currentTimeMillis() - ); + final Map responseContext = new ConcurrentHashMap<>(); final Sequence res = QueryPlus.wrap(baseQuery) .withIdentity(authenticationResult.getIdentity()) diff --git a/server/src/main/java/io/druid/server/SetAndVerifyContextQueryRunner.java b/server/src/main/java/io/druid/server/SetAndVerifyContextQueryRunner.java index 9d5a355aaa61..4b344db8ecb4 100644 --- a/server/src/main/java/io/druid/server/SetAndVerifyContextQueryRunner.java +++ b/server/src/main/java/io/druid/server/SetAndVerifyContextQueryRunner.java @@ -25,6 +25,8 @@ import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.server.initialization.ServerConfig; +import io.druid.client.DirectDruidClient; +import com.google.common.collect.ImmutableMap; import java.util.Map; @@ -35,11 +37,13 @@ public class SetAndVerifyContextQueryRunner implements QueryRunner { private final ServerConfig serverConfig; private final QueryRunner baseRunner; + private final long startTimeMillis; - public SetAndVerifyContextQueryRunner(ServerConfig serverConfig, QueryRunner baseRunner) + public SetAndVerifyContextQueryRunner(ServerConfig serverConfig, QueryRunner baseRunner, long startTimeMillis) { this.serverConfig = serverConfig; this.baseRunner = baseRunner; + this.startTimeMillis = startTimeMillis; } @Override @@ -53,7 +57,7 @@ public Sequence run(QueryPlus queryPlus, Map responseConte public Query withTimeoutAndMaxScatterGatherBytes(Query query, ServerConfig serverConfig) { - return QueryContexts.verifyMaxQueryTimeout( + Query new_query = QueryContexts.verifyMaxQueryTimeout( QueryContexts.withMaxScatterGatherBytes( QueryContexts.withDefaultTimeout( query, @@ -63,5 +67,6 @@ public Query withTimeoutAndMaxScatterGatherBytes(Query query, ServerConfig ), serverConfig.getMaxQueryTimeout() ); + return new_query.withOverriddenContext(ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME, this.startTimeMillis + QueryContexts.getTimeout(new_query))); } } diff --git a/server/src/main/java/io/druid/server/coordination/ServerManager.java b/server/src/main/java/io/druid/server/coordination/ServerManager.java index 52fdf36cf903..c1b6b43ad9d0 100644 --- a/server/src/main/java/io/druid/server/coordination/ServerManager.java +++ b/server/src/main/java/io/druid/server/coordination/ServerManager.java @@ -316,7 +316,8 @@ private QueryRunner buildAndDecorateQueryRunner( emitter, cpuTimeAccumulator, false - ) + ), + System.currentTimeMillis() ); } } From 986c368498638629489e5c62fa9a61f79475e97b Mon Sep 17 00:00:00 2001 From: Adam Welsh Date: Tue, 29 May 2018 14:54:49 +0100 Subject: [PATCH 2/4] Address failing checks --- server/src/main/java/io/druid/client/DirectDruidClient.java | 2 +- .../src/test/java/io/druid/client/DirectDruidClientTest.java | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/io/druid/client/DirectDruidClient.java b/server/src/main/java/io/druid/client/DirectDruidClient.java index ecc988802b4a..ba36ecf5bcbb 100644 --- a/server/src/main/java/io/druid/client/DirectDruidClient.java +++ b/server/src/main/java/io/druid/client/DirectDruidClient.java @@ -116,7 +116,7 @@ public class DirectDruidClient implements QueryRunner private final boolean isSmile; /** - * Removes the magical fields added by {@link #makeResponseContextForQuery(Query, long)}. + * Removes the magical fields added. */ public static void removeMagicResponseContextFields(Map responseContext) { diff --git a/server/src/test/java/io/druid/client/DirectDruidClientTest.java b/server/src/test/java/io/druid/client/DirectDruidClientTest.java index c98e05362bff..d1b0d57053da 100644 --- a/server/src/test/java/io/druid/client/DirectDruidClientTest.java +++ b/server/src/test/java/io/druid/client/DirectDruidClientTest.java @@ -19,6 +19,7 @@ package io.druid.client; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.util.concurrent.Futures; @@ -163,6 +164,7 @@ public void testRun() throws Exception serverSelector.addServerAndUpdateSegment(queryableDruidServer2, serverSelector.getSegment()); TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build(); + query.withOverriddenContext(ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME, Long.MAX_VALUE)); Sequence s1 = client1.run(QueryPlus.wrap(query), defaultContext); Assert.assertTrue(capturedRequest.hasCaptured()); @@ -267,6 +269,7 @@ public void testCancel() serverSelector.addServerAndUpdateSegment(queryableDruidServer1, serverSelector.getSegment()); TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build(); + query.withOverriddenContext(ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME, Long.MAX_VALUE)); cancellationFuture.set(new StatusResponseHolder(HttpResponseStatus.OK, new StringBuilder("cancelled"))); Sequence results = client1.run(QueryPlus.wrap(query), defaultContext); Assert.assertEquals(HttpMethod.DELETE, capturedRequest.getValue().getMethod()); @@ -338,6 +341,7 @@ public void testQueryInterruptionExceptionLogMessage() serverSelector.addServerAndUpdateSegment(queryableDruidServer, dataSegment); TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build(); + query.withOverriddenContext(ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME, Long.MAX_VALUE)); interruptionFuture.set( new ByteArrayInputStream( StringUtils.toUtf8("{\"error\":\"testing1\",\"errorMessage\":\"testing2\"}") From c5294a56901b1fb794fd3ed71e2f750b6e831d32 Mon Sep 17 00:00:00 2001 From: Adam Welsh Date: Wed, 30 May 2018 16:26:46 +0100 Subject: [PATCH 3/4] Addressing code review comments --- .../overlord/SingleTaskBackgroundRunner.java | 3 +-- .../main/java/io/druid/client/DirectDruidClient.java | 12 +++++++++++- .../io/druid/server/ClientQuerySegmentWalker.java | 3 +-- .../main/java/io/druid/server/QueryLifecycle.java | 4 ++-- .../druid/server/SetAndVerifyContextQueryRunner.java | 8 ++++---- .../io/druid/server/coordination/ServerManager.java | 3 +-- .../java/io/druid/client/DirectDruidClientTest.java | 7 +++---- 7 files changed, 23 insertions(+), 17 deletions(-) diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/SingleTaskBackgroundRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/SingleTaskBackgroundRunner.java index b141dda068e7..14c8f8cdef67 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/SingleTaskBackgroundRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/SingleTaskBackgroundRunner.java @@ -345,8 +345,7 @@ private QueryRunner getQueryRunnerImpl(Query query) return new SetAndVerifyContextQueryRunner<>( serverConfig, - queryRunner == null ? new NoopQueryRunner<>() : queryRunner, - System.currentTimeMillis() + queryRunner == null ? new NoopQueryRunner<>() : queryRunner ); } diff --git a/server/src/main/java/io/druid/client/DirectDruidClient.java b/server/src/main/java/io/druid/client/DirectDruidClient.java index ba36ecf5bcbb..08e19c434b27 100644 --- a/server/src/main/java/io/druid/client/DirectDruidClient.java +++ b/server/src/main/java/io/druid/client/DirectDruidClient.java @@ -116,7 +116,7 @@ public class DirectDruidClient implements QueryRunner private final boolean isSmile; /** - * Removes the magical fields added. + * Removes the magical fields added by {@link #makeResponseContextForQuery()}. */ public static void removeMagicResponseContextFields(Map responseContext) { @@ -124,6 +124,16 @@ public static void removeMagicResponseContextFields(Map response responseContext.remove(DirectDruidClient.QUERY_TOTAL_BYTES_GATHERED); } + public static Map makeResponseContextForQuery() + { + final Map responseContext = new ConcurrentHashMap<>(); + responseContext.put( + DirectDruidClient.QUERY_TOTAL_BYTES_GATHERED, + new AtomicLong() + ); + return responseContext; + } + public DirectDruidClient( QueryToolChestWarehouse warehouse, QueryWatcher queryWatcher, diff --git a/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java b/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java index e664bf075217..9540e897bb14 100644 --- a/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java +++ b/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java @@ -122,8 +122,7 @@ private QueryRunner makeRunner( baseClientRunner, retryConfig, objectMapper - ), - System.currentTimeMillis() + ) ) ) .applyPreMergeDecoration() diff --git a/server/src/main/java/io/druid/server/QueryLifecycle.java b/server/src/main/java/io/druid/server/QueryLifecycle.java index 08b828ce6749..9639aa860bc1 100644 --- a/server/src/main/java/io/druid/server/QueryLifecycle.java +++ b/server/src/main/java/io/druid/server/QueryLifecycle.java @@ -22,6 +22,7 @@ import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.Iterables; +import io.druid.client.DirectDruidClient; import io.druid.java.util.common.DateTimes; import io.druid.java.util.common.ISE; import io.druid.java.util.common.guava.Sequence; @@ -49,7 +50,6 @@ import java.util.LinkedHashMap; import java.util.Map; import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; /** @@ -247,7 +247,7 @@ public QueryResponse execute() { transition(State.AUTHORIZED, State.EXECUTING); - final Map responseContext = new ConcurrentHashMap<>(); + final Map responseContext = DirectDruidClient.makeResponseContextForQuery(); final Sequence res = QueryPlus.wrap(baseQuery) .withIdentity(authenticationResult.getIdentity()) diff --git a/server/src/main/java/io/druid/server/SetAndVerifyContextQueryRunner.java b/server/src/main/java/io/druid/server/SetAndVerifyContextQueryRunner.java index 4b344db8ecb4..8363e1bc6f4d 100644 --- a/server/src/main/java/io/druid/server/SetAndVerifyContextQueryRunner.java +++ b/server/src/main/java/io/druid/server/SetAndVerifyContextQueryRunner.java @@ -39,11 +39,11 @@ public class SetAndVerifyContextQueryRunner implements QueryRunner private final QueryRunner baseRunner; private final long startTimeMillis; - public SetAndVerifyContextQueryRunner(ServerConfig serverConfig, QueryRunner baseRunner, long startTimeMillis) + public SetAndVerifyContextQueryRunner(ServerConfig serverConfig, QueryRunner baseRunner) { this.serverConfig = serverConfig; this.baseRunner = baseRunner; - this.startTimeMillis = startTimeMillis; + this.startTimeMillis = System.currentTimeMillis(); } @Override @@ -57,7 +57,7 @@ public Sequence run(QueryPlus queryPlus, Map responseConte public Query withTimeoutAndMaxScatterGatherBytes(Query query, ServerConfig serverConfig) { - Query new_query = QueryContexts.verifyMaxQueryTimeout( + Query newQuery = QueryContexts.verifyMaxQueryTimeout( QueryContexts.withMaxScatterGatherBytes( QueryContexts.withDefaultTimeout( query, @@ -67,6 +67,6 @@ public Query withTimeoutAndMaxScatterGatherBytes(Query query, ServerConfig ), serverConfig.getMaxQueryTimeout() ); - return new_query.withOverriddenContext(ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME, this.startTimeMillis + QueryContexts.getTimeout(new_query))); + return newQuery.withOverriddenContext(ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME, this.startTimeMillis + QueryContexts.getTimeout(newQuery))); } } diff --git a/server/src/main/java/io/druid/server/coordination/ServerManager.java b/server/src/main/java/io/druid/server/coordination/ServerManager.java index c1b6b43ad9d0..52fdf36cf903 100644 --- a/server/src/main/java/io/druid/server/coordination/ServerManager.java +++ b/server/src/main/java/io/druid/server/coordination/ServerManager.java @@ -316,8 +316,7 @@ private QueryRunner buildAndDecorateQueryRunner( emitter, cpuTimeAccumulator, false - ), - System.currentTimeMillis() + ) ); } } diff --git a/server/src/test/java/io/druid/client/DirectDruidClientTest.java b/server/src/test/java/io/druid/client/DirectDruidClientTest.java index d1b0d57053da..060dac6671c2 100644 --- a/server/src/test/java/io/druid/client/DirectDruidClientTest.java +++ b/server/src/test/java/io/druid/client/DirectDruidClientTest.java @@ -164,8 +164,7 @@ public void testRun() throws Exception serverSelector.addServerAndUpdateSegment(queryableDruidServer2, serverSelector.getSegment()); TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build(); - query.withOverriddenContext(ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME, Long.MAX_VALUE)); - + query = query.withOverriddenContext(ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME, Long.MAX_VALUE)); Sequence s1 = client1.run(QueryPlus.wrap(query), defaultContext); Assert.assertTrue(capturedRequest.hasCaptured()); Assert.assertEquals(url, capturedRequest.getValue().getUrl()); @@ -269,7 +268,7 @@ public void testCancel() serverSelector.addServerAndUpdateSegment(queryableDruidServer1, serverSelector.getSegment()); TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build(); - query.withOverriddenContext(ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME, Long.MAX_VALUE)); + query = query.withOverriddenContext(ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME, Long.MAX_VALUE)); cancellationFuture.set(new StatusResponseHolder(HttpResponseStatus.OK, new StringBuilder("cancelled"))); Sequence results = client1.run(QueryPlus.wrap(query), defaultContext); Assert.assertEquals(HttpMethod.DELETE, capturedRequest.getValue().getMethod()); @@ -341,7 +340,7 @@ public void testQueryInterruptionExceptionLogMessage() serverSelector.addServerAndUpdateSegment(queryableDruidServer, dataSegment); TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build(); - query.withOverriddenContext(ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME, Long.MAX_VALUE)); + query = query.withOverriddenContext(ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME, Long.MAX_VALUE)); interruptionFuture.set( new ByteArrayInputStream( StringUtils.toUtf8("{\"error\":\"testing1\",\"errorMessage\":\"testing2\"}") From dd55390487a74c0e535c63166478924f18a3deea Mon Sep 17 00:00:00 2001 From: Adam Welsh Date: Wed, 6 Jun 2018 15:30:54 +0100 Subject: [PATCH 4/4] Removed line that was no longer used --- server/src/main/java/io/druid/client/DirectDruidClient.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/io/druid/client/DirectDruidClient.java b/server/src/main/java/io/druid/client/DirectDruidClient.java index 08e19c434b27..225f91dc42a8 100644 --- a/server/src/main/java/io/druid/client/DirectDruidClient.java +++ b/server/src/main/java/io/druid/client/DirectDruidClient.java @@ -120,7 +120,6 @@ public class DirectDruidClient implements QueryRunner */ public static void removeMagicResponseContextFields(Map responseContext) { - responseContext.remove(DirectDruidClient.QUERY_FAIL_TIME); responseContext.remove(DirectDruidClient.QUERY_TOTAL_BYTES_GATHERED); }