From 5eaae847c125146a98fcf60c1f92dd1a531a8d94 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Mon, 23 Mar 2020 12:24:06 -0700 Subject: [PATCH 1/4] add lane enforcement for joinish queries --- .../CachingClusteredClientBenchmark.java | 12 +- .../moving-average-query/pom.xml | 7 + .../movingaverage/MovingAverageQueryTest.java | 11 +- .../server/ClientQuerySegmentWalker.java | 3 +- .../druid/server/LocalQuerySegmentWalker.java | 37 ++-- .../apache/druid/server/QueryScheduler.java | 13 ++ ...chingClusteredClientFunctionalityTest.java | 7 +- .../server/ClientQuerySegmentWalkerTest.java | 65 ++++++- .../server/ObservableQueryScheduler.java | 159 ++++++++++++++++++ .../druid/server/QueryResourceTest.java | 7 +- .../druid/server/QuerySchedulerTest.java | 93 ---------- .../apache/druid/server/QueryStackTests.java | 22 ++- .../druid/sql/calcite/util/CalciteTests.java | 5 +- .../SpecificSegmentsQuerySegmentWalker.java | 5 +- 14 files changed, 300 insertions(+), 146 deletions(-) create mode 100644 server/src/test/java/org/apache/druid/server/ObservableQueryScheduler.java diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java index febebbfcab7d..94b560e8f4df 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java @@ -104,11 +104,8 @@ import org.apache.druid.query.topn.TopNResultValue; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.QueryableIndexSegment; -import org.apache.druid.server.QueryScheduler; +import org.apache.druid.server.QueryStackTests; import org.apache.druid.server.coordination.ServerType; -import org.apache.druid.server.initialization.ServerConfig; -import org.apache.druid.server.scheduling.ManualQueryPrioritizationStrategy; -import org.apache.druid.server.scheduling.NoQueryLaningStrategy; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment.PruneSpecsHolder; import org.apache.druid.timeline.SegmentId; @@ -343,12 +340,7 @@ public > QueryToolChest getToolChest new DruidHttpClientConfig(), processingConfig, forkJoinPool, - new QueryScheduler( - 0, - ManualQueryPrioritizationStrategy.INSTANCE, - NoQueryLaningStrategy.INSTANCE, - new ServerConfig() - ) + QueryStackTests.DEFAULT_NOOP_SCHEDULER ); } diff --git a/extensions-contrib/moving-average-query/pom.xml b/extensions-contrib/moving-average-query/pom.xml index c5ad9eebf6ca..6c77b6051182 100644 --- a/extensions-contrib/moving-average-query/pom.xml +++ b/extensions-contrib/moving-average-query/pom.xml @@ -105,6 +105,13 @@ test-jar test + + org.apache.druid + druid-server + ${project.parent.version} + test-jar + test + junit junit diff --git a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java index 8d76692e9c85..971f8ed8b264 100644 --- a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java +++ b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java @@ -65,10 +65,8 @@ import org.apache.druid.query.timeseries.TimeseriesQuery; import org.apache.druid.query.timeseries.TimeseriesResultValue; import org.apache.druid.server.ClientQuerySegmentWalker; -import org.apache.druid.server.QueryScheduler; +import org.apache.druid.server.QueryStackTests; import org.apache.druid.server.initialization.ServerConfig; -import org.apache.druid.server.scheduling.ManualQueryPrioritizationStrategy; -import org.apache.druid.server.scheduling.NoQueryLaningStrategy; import org.apache.druid.testing.InitializedNullHandlingTest; import org.apache.druid.timeline.TimelineLookup; import org.hamcrest.core.IsInstanceOf; @@ -365,12 +363,7 @@ public String getFormatString() } }, ForkJoinPool.commonPool(), - new QueryScheduler( - 0, - ManualQueryPrioritizationStrategy.INSTANCE, - NoQueryLaningStrategy.INSTANCE, - new ServerConfig() - ) + QueryStackTests.DEFAULT_NOOP_SCHEDULER ); ClientQuerySegmentWalker walker = new ClientQuerySegmentWalker( diff --git a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java index c3379bab0214..5a467645d08f 100644 --- a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java @@ -462,7 +462,8 @@ public Sequence run(QueryPlus queryPlus, ResponseContext responseContext) throw new ISE("Unexpected query received"); } - return baseRunner.run(queryPlus.withQuery(newQuery), responseContext); + Sequence result = baseRunner.run(queryPlus.withQuery(newQuery), responseContext); + return result; } } } diff --git a/server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java index 0a01b8c4bb0c..1198034f639e 100644 --- a/server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java @@ -20,6 +20,7 @@ package org.apache.druid.server; import com.google.inject.Inject; +import org.apache.druid.client.SegmentServerSelector; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.concurrent.Execs; @@ -27,6 +28,7 @@ import org.apache.druid.query.FluentQueryRunnerBuilder; import org.apache.druid.query.Query; import org.apache.druid.query.QueryContexts; +import org.apache.druid.query.QueryPlus; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryRunnerFactory; import org.apache.druid.query.QueryRunnerFactoryConglomerate; @@ -39,6 +41,8 @@ import org.apache.druid.segment.join.Joinables; import org.joda.time.Interval; +import java.util.HashSet; +import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.stream.StreamSupport; @@ -57,6 +61,7 @@ public class LocalQuerySegmentWalker implements QuerySegmentWalker private final QueryRunnerFactoryConglomerate conglomerate; private final SegmentWrangler segmentWrangler; private final JoinableFactory joinableFactory; + private final QueryScheduler scheduler; private final ServiceEmitter emitter; @Inject @@ -64,12 +69,14 @@ public LocalQuerySegmentWalker( QueryRunnerFactoryConglomerate conglomerate, SegmentWrangler segmentWrangler, JoinableFactory joinableFactory, + QueryScheduler scheduler, ServiceEmitter emitter ) { this.conglomerate = conglomerate; this.segmentWrangler = segmentWrangler; this.joinableFactory = joinableFactory; + this.scheduler = scheduler; this.emitter = emitter; } @@ -82,21 +89,23 @@ public QueryRunner getQueryRunnerForIntervals(final Query query, final throw new IAE("Cannot query dataSource locally: %s", analysis.getDataSource()); } - final AtomicLong cpuAccumulator = new AtomicLong(0L); - final QueryRunnerFactory> queryRunnerFactory = conglomerate.findFactory(query); final Iterable segments = segmentWrangler.getSegmentsForIntervals(analysis.getBaseDataSource(), intervals); + final Query prioritizedAndLaned = prioritizeAndLaneQuery(query, segments); + + final AtomicLong cpuAccumulator = new AtomicLong(0L); final Function segmentMapFn = Joinables.createSegmentMapFn( analysis.getPreJoinableClauses(), joinableFactory, cpuAccumulator, - QueryContexts.getEnableJoinFilterPushDown(query), - QueryContexts.getEnableJoinFilterRewrite(query), - QueryContexts.getEnableJoinFilterRewriteValueColumnFilters(query), - QueryContexts.getJoinFilterRewriteMaxSize(query), - query.getFilter() == null ? null : query.getFilter().toFilter(), - query.getVirtualColumns() + QueryContexts.getEnableJoinFilterPushDown(prioritizedAndLaned), + QueryContexts.getEnableJoinFilterRewrite(prioritizedAndLaned), + QueryContexts.getEnableJoinFilterRewriteValueColumnFilters(prioritizedAndLaned), + QueryContexts.getJoinFilterRewriteMaxSize(prioritizedAndLaned), + prioritizedAndLaned.getFilter() == null ? null : prioritizedAndLaned.getFilter().toFilter(), + prioritizedAndLaned.getVirtualColumns() ); + final QueryRunnerFactory> queryRunnerFactory = conglomerate.findFactory(prioritizedAndLaned); final QueryRunner baseRunner = queryRunnerFactory.mergeRunners( Execs.directExecutor(), () -> StreamSupport.stream(segments.spliterator(), false) @@ -107,17 +116,25 @@ public QueryRunner getQueryRunnerForIntervals(final Query query, final // Note: Not calling 'postProcess'; it isn't official/documented functionality so we'll only support it where // it is already supported. return new FluentQueryRunnerBuilder<>(queryRunnerFactory.getToolchest()) - .create(baseRunner) + .create(scheduler.wrapQueryRunner(baseRunner)) .applyPreMergeDecoration() .mergeResults() .applyPostMergeDecoration() .emitCPUTimeMetric(emitter, cpuAccumulator); } - @Override public QueryRunner getQueryRunnerForSegments(final Query query, final Iterable specs) { // SegmentWranglers only work based on intervals and cannot run with specific segments. throw new ISE("Cannot run with specific segments"); } + + private Query prioritizeAndLaneQuery(Query query, Iterable segments) + { + Set segmentServerSelectors = new HashSet<>(); + for (Segment s : segments) { + segmentServerSelectors.add(new SegmentServerSelector(null, s.getId().toDescriptor())); + } + return scheduler.prioritizeAndLaneQuery(QueryPlus.wrap(query), segmentServerSelectors); + } } diff --git a/server/src/main/java/org/apache/druid/server/QueryScheduler.java b/server/src/main/java/org/apache/druid/server/QueryScheduler.java index d999ab3d9d8b..2c7a23b5f559 100644 --- a/server/src/main/java/org/apache/druid/server/QueryScheduler.java +++ b/server/src/main/java/org/apache/druid/server/QueryScheduler.java @@ -30,10 +30,12 @@ import it.unimi.dsi.fastutil.objects.Object2IntMap; import org.apache.druid.client.SegmentServerSelector; import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.guava.LazySequence; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.query.Query; import org.apache.druid.query.QueryContexts; import org.apache.druid.query.QueryPlus; +import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryWatcher; import org.apache.druid.server.initialization.ServerConfig; @@ -140,6 +142,17 @@ public Sequence run(Query query, Sequence resultSequence) return resultSequence.withBaggage(() -> finishLanes(bulkheads)); } + /** + * Returns a {@link QueryRunner} that will call {@link QueryScheduler#run} when {@link QueryRunner#run} is called. + */ + public QueryRunner wrapQueryRunner(QueryRunner baseRunner) + { + return (queryPlus, responseContext) -> + QueryScheduler.this.run( + queryPlus.getQuery(), new LazySequence<>(() -> baseRunner.run(queryPlus, responseContext)) + ); + } + /** * Forcibly cancel all futures that have been registered to a specific query id */ diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java index 779fd5478d0c..5303989b80cc 100644 --- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java +++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java @@ -47,11 +47,8 @@ import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.context.ResponseContext; import org.apache.druid.query.planning.DataSourceAnalysis; -import org.apache.druid.server.QueryScheduler; +import org.apache.druid.server.QueryStackTests; import org.apache.druid.server.coordination.ServerType; -import org.apache.druid.server.initialization.ServerConfig; -import org.apache.druid.server.scheduling.ManualQueryPrioritizationStrategy; -import org.apache.druid.server.scheduling.NoQueryLaningStrategy; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.TimelineLookup; import org.apache.druid.timeline.VersionedIntervalTimeline; @@ -335,7 +332,7 @@ public int getMergePoolParallelism() } }, ForkJoinPool.commonPool(), - new QueryScheduler(0, ManualQueryPrioritizationStrategy.INSTANCE, NoQueryLaningStrategy.INSTANCE, new ServerConfig()) + QueryStackTests.DEFAULT_NOOP_SCHEDULER ); } diff --git a/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java b/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java index 2215dcb162ae..c98c7768decd 100644 --- a/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java +++ b/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java @@ -70,6 +70,8 @@ import org.apache.druid.segment.join.JoinableFactory; import org.apache.druid.segment.join.MapJoinableFactory; import org.apache.druid.server.initialization.ServerConfig; +import org.apache.druid.server.scheduling.ManualQueryPrioritizationStrategy; +import org.apache.druid.server.scheduling.NoQueryLaningStrategy; import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.partition.NumberedShardSpec; @@ -151,12 +153,20 @@ public class ClientQuerySegmentWalkerTest // version VERSION, and shard spec SHARD_SPEC. private ClientQuerySegmentWalker walker; + private ObservableQueryScheduler scheduler; + @Before public void setUp() { closer = Closer.create(); conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate(closer); - initWalker(ImmutableMap.of()); + scheduler = new ObservableQueryScheduler( + 8, + ManualQueryPrioritizationStrategy.INSTANCE, + NoQueryLaningStrategy.INSTANCE, + new ServerConfig() + ); + initWalker(ImmutableMap.of(), scheduler); } @After @@ -182,6 +192,11 @@ public void testTimeseriesOnTable() ImmutableList.of(ExpectedQuery.cluster(query)), ImmutableList.of(new Object[]{INTERVAL.getStartMillis(), 10L}) ); + + Assert.assertEquals(1, scheduler.getTotalRun().get()); + Assert.assertEquals(1, scheduler.getTotalPrioritizedAndLaned().get()); + Assert.assertEquals(1, scheduler.getTotalAcquired().get()); + Assert.assertEquals(1, scheduler.getTotalReleased().get()); } @Test @@ -200,6 +215,11 @@ public void testTimeseriesOnInline() ImmutableList.of(ExpectedQuery.local(query)), ImmutableList.of(new Object[]{INTERVAL.getStartMillis(), 10L}) ); + + Assert.assertEquals(1, scheduler.getTotalRun().get()); + Assert.assertEquals(1, scheduler.getTotalPrioritizedAndLaned().get()); + Assert.assertEquals(1, scheduler.getTotalAcquired().get()); + Assert.assertEquals(1, scheduler.getTotalReleased().get()); } @Test @@ -236,6 +256,13 @@ public void testTimeseriesOnGroupByOnTable() ), ImmutableList.of(new Object[]{Intervals.ETERNITY.getStartMillis(), 3L}) ); + + // note: this should really be 1, but in the interim queries that are composed of multiple queries count each + // invocation of either the cluster or local walker in ClientQuerySegmentWalker + Assert.assertEquals(2, scheduler.getTotalRun().get()); + Assert.assertEquals(2, scheduler.getTotalPrioritizedAndLaned().get()); + Assert.assertEquals(2, scheduler.getTotalAcquired().get()); + Assert.assertEquals(2, scheduler.getTotalReleased().get()); } @Test @@ -263,6 +290,11 @@ public void testGroupByOnGroupByOnTable() ImmutableList.of(ExpectedQuery.cluster(subquery)), ImmutableList.of(new Object[]{3L}) ); + + Assert.assertEquals(1, scheduler.getTotalRun().get()); + Assert.assertEquals(1, scheduler.getTotalPrioritizedAndLaned().get()); + Assert.assertEquals(1, scheduler.getTotalAcquired().get()); + Assert.assertEquals(1, scheduler.getTotalReleased().get()); } @Test @@ -299,6 +331,13 @@ public void testGroupByOnUnionOfTwoTables() new Object[]{"z", 1L} ) ); + + // note: this should really be 1, but in the interim queries that are composed of multiple queries count each + // invocation of either the cluster or local walker in ClientQuerySegmentWalker + Assert.assertEquals(2, scheduler.getTotalRun().get()); + Assert.assertEquals(2, scheduler.getTotalPrioritizedAndLaned().get()); + Assert.assertEquals(2, scheduler.getTotalAcquired().get()); + Assert.assertEquals(2, scheduler.getTotalReleased().get()); } @Test @@ -351,6 +390,13 @@ public void testJoinOnGroupByOnTable() ), ImmutableList.of(new Object[]{"y", "y", 1L}) ); + + // note: this should really be 1, but in the interim queries that are composed of multiple queries count each + // invocation of either the cluster or local walker in ClientQuerySegmentWalker + Assert.assertEquals(2, scheduler.getTotalRun().get()); + Assert.assertEquals(2, scheduler.getTotalPrioritizedAndLaned().get()); + Assert.assertEquals(2, scheduler.getTotalAcquired().get()); + Assert.assertEquals(2, scheduler.getTotalReleased().get()); } @Test @@ -408,9 +454,17 @@ public void testTimeseriesOnGroupByOnTableErrorTooManyRows() } /** - * Initialize (or reinitialize) our {@link #walker} and {@link #closer}. + * Initialize (or reinitialize) our {@link #walker} and {@link #closer} with default scheduler. */ private void initWalker(final Map serverProperties) + { + initWalker(serverProperties, QueryStackTests.DEFAULT_NOOP_SCHEDULER); + } + + /** + * Initialize (or reinitialize) our {@link #walker} and {@link #closer}. + */ + private void initWalker(final Map serverProperties, QueryScheduler schedulerForTest) { final ObjectMapper jsonMapper = TestHelper.makeJsonMapper(); final ServerConfig serverConfig = jsonMapper.convertValue(serverProperties, ServerConfig.class); @@ -472,7 +526,7 @@ BAR, makeTimeline(BAR, BAR_INLINE) ), joinableFactory, conglomerate, - null /* QueryScheduler */ + schedulerForTest ), ClusterOrLocal.CLUSTER ), @@ -480,8 +534,9 @@ BAR, makeTimeline(BAR, BAR_INLINE) QueryStackTests.createLocalQuerySegmentWalker( conglomerate, segmentWrangler, - joinableFactory - ), + joinableFactory, + schedulerForTest + ), ClusterOrLocal.LOCAL ), conglomerate, diff --git a/server/src/test/java/org/apache/druid/server/ObservableQueryScheduler.java b/server/src/test/java/org/apache/druid/server/ObservableQueryScheduler.java new file mode 100644 index 000000000000..21796d25946d --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/ObservableQueryScheduler.java @@ -0,0 +1,159 @@ +package org.apache.druid.server; + +import io.github.resilience4j.bulkhead.Bulkhead; +import org.apache.druid.client.SegmentServerSelector; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.query.Query; +import org.apache.druid.query.QueryPlus; +import org.apache.druid.server.initialization.ServerConfig; + +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; + +/** + * {@link QueryScheduler} for testing, with counters on its internal functions so its operation can be observed + * and verified by tests + */ +public class ObservableQueryScheduler extends QueryScheduler +{ + private final AtomicLong totalAcquired; + private final AtomicLong totalReleased; + private final AtomicLong laneAcquired; + private final AtomicLong laneNotAcquired; + private final AtomicLong laneReleased; + private final AtomicLong totalPrioritizedAndLaned; + private final AtomicLong totalRun; + + public ObservableQueryScheduler( + int totalNumThreads, + QueryPrioritizationStrategy prioritizationStrategy, + QueryLaningStrategy laningStrategy, + ServerConfig serverConfig + ) + { + super(totalNumThreads, prioritizationStrategy, laningStrategy, serverConfig); + + totalAcquired = new AtomicLong(); + totalReleased = new AtomicLong(); + laneAcquired = new AtomicLong(); + laneNotAcquired = new AtomicLong(); + laneReleased = new AtomicLong(); + totalPrioritizedAndLaned = new AtomicLong(); + totalRun = new AtomicLong(); + } + + @Override + public Sequence run( + Query query, Sequence resultSequence + ) + { + return super.run(query, resultSequence).withBaggage(totalRun::incrementAndGet); + } + + @Override + public Query prioritizeAndLaneQuery( + QueryPlus queryPlus, Set segments + ) + { + totalPrioritizedAndLaned.incrementAndGet(); + return super.prioritizeAndLaneQuery(queryPlus, segments); + } + + @Override + List acquireLanes(Query query) + { + List bulkheads = super.acquireLanes(query); + if (bulkheads.stream().anyMatch(b -> b.getName().equals(QueryScheduler.TOTAL))) { + totalAcquired.incrementAndGet(); + } + if (bulkheads.stream().anyMatch(b -> !b.getName().equals(QueryScheduler.TOTAL))) { + laneAcquired.incrementAndGet(); + } + + return bulkheads; + } + + @Override + void releaseLanes(List bulkheads) + { + super.releaseLanes(bulkheads); + if (bulkheads.stream().anyMatch(b -> b.getName().equals(QueryScheduler.TOTAL))) { + totalReleased.incrementAndGet(); + } + if (bulkheads.stream().anyMatch(b -> !b.getName().equals(QueryScheduler.TOTAL))) { + laneReleased.incrementAndGet(); + if (bulkheads.size() == 1) { + laneNotAcquired.incrementAndGet(); + } + } + } + + @Override + void finishLanes(List bulkheads) + { + super.finishLanes(bulkheads); + if (bulkheads.stream().anyMatch(b -> b.getName().equals(QueryScheduler.TOTAL))) { + totalReleased.incrementAndGet(); + } + if (bulkheads.stream().anyMatch(b -> !b.getName().equals(QueryScheduler.TOTAL))) { + laneReleased.incrementAndGet(); + } + } + + /** + * Number of times that 'total' query count semaphore was acquired + */ + public AtomicLong getTotalAcquired() + { + return totalAcquired; + } + + /** + * Number of times that 'total' query count semaphore was released + */ + public AtomicLong getTotalReleased() + { + return totalReleased; + } + + /** + * Number of times that the query count semaphore of any lane was acquired + */ + public AtomicLong getLaneAcquired() + { + return laneAcquired; + } + + /** + * Number of times that the query count semaphore of any lane was acquired but the 'total' semaphore was NOT acquired + */ + public AtomicLong getLaneNotAcquired() + { + return laneNotAcquired; + } + + /** + * Number of times that the query count semaphore of any lane was released + */ + public AtomicLong getLaneReleased() + { + return laneReleased; + } + + /** + * Number of times that {@link QueryScheduler#prioritizeAndLaneQuery} was called + */ + public AtomicLong getTotalPrioritizedAndLaned() + { + return totalPrioritizedAndLaned; + } + + /** + * Number of times that {@link QueryScheduler#run} was called + */ + public AtomicLong getTotalRun() + { + return totalRun; + } +} diff --git a/server/src/test/java/org/apache/druid/server/QueryResourceTest.java b/server/src/test/java/org/apache/druid/server/QueryResourceTest.java index a0604e1e4179..42a3cc24c2b0 100644 --- a/server/src/test/java/org/apache/druid/server/QueryResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/QueryResourceTest.java @@ -174,12 +174,7 @@ public void setup() EasyMock.expect(testServletRequest.getHeader("Accept")).andReturn(MediaType.APPLICATION_JSON).anyTimes(); EasyMock.expect(testServletRequest.getHeader(QueryResource.HEADER_IF_NONE_MATCH)).andReturn(null).anyTimes(); EasyMock.expect(testServletRequest.getRemoteAddr()).andReturn("localhost").anyTimes(); - queryScheduler = new QueryScheduler( - 8, - ManualQueryPrioritizationStrategy.INSTANCE, - NoQueryLaningStrategy.INSTANCE, - new ServerConfig() - ); + queryScheduler = QueryStackTests.DEFAULT_NOOP_SCHEDULER; testRequestLogger = new TestRequestLogger(); queryResource = new QueryResource( new QueryLifecycleFactory( diff --git a/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java b/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java index 683ef3b55981..5944a42cd0c9 100644 --- a/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java +++ b/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java @@ -30,7 +30,6 @@ import com.google.inject.Injector; import com.google.inject.Key; import com.google.inject.ProvisionException; -import io.github.resilience4j.bulkhead.Bulkhead; import org.apache.druid.client.SegmentServerSelector; import org.apache.druid.guice.GuiceInjectors; import org.apache.druid.guice.JsonConfigProvider; @@ -73,7 +72,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.atomic.AtomicLong; public class QuerySchedulerTest { @@ -709,95 +707,4 @@ private Injector createInjector() ); return injector; } - - private static class ObservableQueryScheduler extends QueryScheduler - { - private final AtomicLong totalAcquired; - private final AtomicLong totalReleased; - private final AtomicLong laneAcquired; - private final AtomicLong laneNotAcquired; - private final AtomicLong laneReleased; - - public ObservableQueryScheduler( - int totalNumThreads, - QueryPrioritizationStrategy prioritizationStrategy, - QueryLaningStrategy laningStrategy, - ServerConfig serverConfig - ) - { - super(totalNumThreads, prioritizationStrategy, laningStrategy, serverConfig); - - totalAcquired = new AtomicLong(); - totalReleased = new AtomicLong(); - laneAcquired = new AtomicLong(); - laneNotAcquired = new AtomicLong(); - laneReleased = new AtomicLong(); - } - - @Override - List acquireLanes(Query query) - { - List bulkheads = super.acquireLanes(query); - if (bulkheads.stream().anyMatch(b -> b.getName().equals(QueryScheduler.TOTAL))) { - totalAcquired.incrementAndGet(); - } - if (bulkheads.stream().anyMatch(b -> !b.getName().equals(QueryScheduler.TOTAL))) { - laneAcquired.incrementAndGet(); - } - - return bulkheads; - } - - @Override - void releaseLanes(List bulkheads) - { - super.releaseLanes(bulkheads); - if (bulkheads.stream().anyMatch(b -> b.getName().equals(QueryScheduler.TOTAL))) { - totalReleased.incrementAndGet(); - } - if (bulkheads.stream().anyMatch(b -> !b.getName().equals(QueryScheduler.TOTAL))) { - laneReleased.incrementAndGet(); - if (bulkheads.size() == 1) { - laneNotAcquired.incrementAndGet(); - } - } - } - - @Override - void finishLanes(List bulkheads) - { - super.finishLanes(bulkheads); - if (bulkheads.stream().anyMatch(b -> b.getName().equals(QueryScheduler.TOTAL))) { - totalReleased.incrementAndGet(); - } - if (bulkheads.stream().anyMatch(b -> !b.getName().equals(QueryScheduler.TOTAL))) { - laneReleased.incrementAndGet(); - } - } - - public AtomicLong getTotalAcquired() - { - return totalAcquired; - } - - public AtomicLong getTotalReleased() - { - return totalReleased; - } - - public AtomicLong getLaneAcquired() - { - return laneAcquired; - } - - public AtomicLong getLaneNotAcquired() - { - return laneNotAcquired; - } - - public AtomicLong getLaneReleased() - { - return laneReleased; - } - } } diff --git a/server/src/test/java/org/apache/druid/server/QueryStackTests.java b/server/src/test/java/org/apache/druid/server/QueryStackTests.java index a07e14ed5fa2..766040e1dae2 100644 --- a/server/src/test/java/org/apache/druid/server/QueryStackTests.java +++ b/server/src/test/java/org/apache/druid/server/QueryStackTests.java @@ -64,6 +64,8 @@ import org.apache.druid.segment.join.JoinableFactory; import org.apache.druid.server.initialization.ServerConfig; import org.apache.druid.server.metrics.NoopServiceEmitter; +import org.apache.druid.server.scheduling.ManualQueryPrioritizationStrategy; +import org.apache.druid.server.scheduling.NoQueryLaningStrategy; import org.apache.druid.timeline.VersionedIntervalTimeline; import javax.annotation.Nullable; @@ -75,6 +77,12 @@ */ public class QueryStackTests { + public static final QueryScheduler DEFAULT_NOOP_SCHEDULER = new QueryScheduler( + 0, + ManualQueryPrioritizationStrategy.INSTANCE, + NoQueryLaningStrategy.INSTANCE, + new ServerConfig() + ); private static final ServiceEmitter EMITTER = new NoopServiceEmitter(); private static final int COMPUTE_BUFFER_SIZE = 10 * 1024 * 1024; @@ -148,10 +156,17 @@ public static TestClusterQuerySegmentWalker createClusterQuerySegmentWalker( public static LocalQuerySegmentWalker createLocalQuerySegmentWalker( final QueryRunnerFactoryConglomerate conglomerate, final SegmentWrangler segmentWrangler, - final JoinableFactory joinableFactory - ) + final JoinableFactory joinableFactory, + final QueryScheduler scheduler + ) { - return new LocalQuerySegmentWalker(conglomerate, segmentWrangler, joinableFactory, EMITTER); + return new LocalQuerySegmentWalker( + conglomerate, + segmentWrangler, + joinableFactory, + scheduler, + EMITTER + ); } /** @@ -255,4 +270,5 @@ public int getNumMergeBuffers() return conglomerate; } + } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java index 5d446c54a83e..5ae13902c7e1 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java @@ -77,6 +77,7 @@ import org.apache.druid.server.DruidNode; import org.apache.druid.server.QueryLifecycleFactory; import org.apache.druid.server.QueryScheduler; +import org.apache.druid.server.QueryStackTests; import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler; import org.apache.druid.server.log.NoopRequestLogger; import org.apache.druid.server.security.Access; @@ -572,13 +573,13 @@ public static SpecificSegmentsQuerySegmentWalker createMockWalker( final File tmpDir ) { - return createMockWalker(conglomerate, tmpDir, null); + return createMockWalker(conglomerate, tmpDir, QueryStackTests.DEFAULT_NOOP_SCHEDULER); } public static SpecificSegmentsQuerySegmentWalker createMockWalker( final QueryRunnerFactoryConglomerate conglomerate, final File tmpDir, - @Nullable final QueryScheduler scheduler + final QueryScheduler scheduler ) { final QueryableIndex index1 = IndexBuilder diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java index 50134ffcb353..339aa4dd39fa 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java @@ -85,7 +85,7 @@ public SpecificSegmentsQuerySegmentWalker( final QueryRunnerFactoryConglomerate conglomerate, final LookupExtractorFactoryContainerProvider lookupProvider, @Nullable final JoinableFactory joinableFactory, - @Nullable final QueryScheduler scheduler + final QueryScheduler scheduler ) { final JoinableFactory joinableFactoryToUse; @@ -116,6 +116,7 @@ public SpecificSegmentsQuerySegmentWalker( .put(LookupDataSource.class, new LookupSegmentWrangler(lookupProvider)) .build() ), + scheduler, joinableFactoryToUse ), conglomerate, @@ -146,7 +147,7 @@ public Optional get(String lookupName) } }, null, - null + QueryStackTests.DEFAULT_NOOP_SCHEDULER ); } From 606af3f8c4b80c59df9d5664c911edaf3aff1cbb Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Wed, 25 Mar 2020 05:11:32 -0700 Subject: [PATCH 2/4] oops --- .../server/ObservableQueryScheduler.java | 19 +++++++++++++++++++ .../SpecificSegmentsQuerySegmentWalker.java | 4 ++-- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/apache/druid/server/ObservableQueryScheduler.java b/server/src/test/java/org/apache/druid/server/ObservableQueryScheduler.java index 21796d25946d..3b1e69116936 100644 --- a/server/src/test/java/org/apache/druid/server/ObservableQueryScheduler.java +++ b/server/src/test/java/org/apache/druid/server/ObservableQueryScheduler.java @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.druid.server; import io.github.resilience4j.bulkhead.Bulkhead; diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java index 339aa4dd39fa..1da0fbabf9c9 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java @@ -116,8 +116,8 @@ public SpecificSegmentsQuerySegmentWalker( .put(LookupDataSource.class, new LookupSegmentWrangler(lookupProvider)) .build() ), - scheduler, - joinableFactoryToUse + joinableFactoryToUse, + scheduler ), conglomerate, new ServerConfig() From beb572259a54eb906aca50319709e35af4177d70 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Wed, 25 Mar 2020 12:12:58 -0700 Subject: [PATCH 3/4] style --- .../org/apache/druid/server/ObservableQueryScheduler.java | 6 ++++-- .../test/java/org/apache/druid/server/QueryStackTests.java | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/server/src/test/java/org/apache/druid/server/ObservableQueryScheduler.java b/server/src/test/java/org/apache/druid/server/ObservableQueryScheduler.java index 3b1e69116936..638f5f28dc9f 100644 --- a/server/src/test/java/org/apache/druid/server/ObservableQueryScheduler.java +++ b/server/src/test/java/org/apache/druid/server/ObservableQueryScheduler.java @@ -64,7 +64,8 @@ public ObservableQueryScheduler( @Override public Sequence run( - Query query, Sequence resultSequence + Query query, + Sequence resultSequence ) { return super.run(query, resultSequence).withBaggage(totalRun::incrementAndGet); @@ -72,7 +73,8 @@ public Sequence run( @Override public Query prioritizeAndLaneQuery( - QueryPlus queryPlus, Set segments + QueryPlus queryPlus, + Set segments ) { totalPrioritizedAndLaned.incrementAndGet(); diff --git a/server/src/test/java/org/apache/druid/server/QueryStackTests.java b/server/src/test/java/org/apache/druid/server/QueryStackTests.java index 766040e1dae2..1370ed73ebac 100644 --- a/server/src/test/java/org/apache/druid/server/QueryStackTests.java +++ b/server/src/test/java/org/apache/druid/server/QueryStackTests.java @@ -158,7 +158,7 @@ public static LocalQuerySegmentWalker createLocalQuerySegmentWalker( final SegmentWrangler segmentWrangler, final JoinableFactory joinableFactory, final QueryScheduler scheduler - ) + ) { return new LocalQuerySegmentWalker( conglomerate, From 830a628378639aa5a9fb47086d38dc131ddc0ae4 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Fri, 27 Mar 2020 02:48:51 -0700 Subject: [PATCH 4/4] review stuffs --- .../druid/client/SegmentServerSelector.java | 26 ++++++++++++++++++- .../server/ClientQuerySegmentWalker.java | 3 +-- .../druid/server/LocalQuerySegmentWalker.java | 2 +- .../server/TestClusterQuerySegmentWalker.java | 6 +++-- 4 files changed, 31 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/apache/druid/client/SegmentServerSelector.java b/server/src/main/java/org/apache/druid/client/SegmentServerSelector.java index 007b7a254591..5f5de0e78136 100644 --- a/server/src/main/java/org/apache/druid/client/SegmentServerSelector.java +++ b/server/src/main/java/org/apache/druid/client/SegmentServerSelector.java @@ -19,22 +19,46 @@ package org.apache.druid.client; +import com.google.common.base.Preconditions; import org.apache.druid.client.selector.ServerSelector; import org.apache.druid.java.util.common.Pair; import org.apache.druid.query.SegmentDescriptor; +import javax.annotation.Nullable; + /** * Given a {@link SegmentDescriptor}, get a {@link ServerSelector} to use to pick a {@link DruidServer} to query. * - * Used by {@link CachingClusteredClient} on the broker to fan out queries to historical and realtime data + * Used by {@link CachingClusteredClient} on the broker to fan out queries to historical and realtime data. Used + * by {@link org.apache.druid.server.LocalQuerySegmentWalker} on the broker for on broker queries */ public class SegmentServerSelector extends Pair { + /** + * This is for a segment hosted on a remote server, where {@link ServerSelector} may be used to pick + * a {@link DruidServer} to query. + */ public SegmentServerSelector(ServerSelector server, SegmentDescriptor segment) { super(server, segment); + Preconditions.checkNotNull(server, "ServerSelector must not be null"); + Preconditions.checkNotNull(segment, "SegmentDescriptor must not be null"); + } + + /** + * This is for a segment hosted locally + */ + public SegmentServerSelector(SegmentDescriptor segment) + { + super(null, segment); + Preconditions.checkNotNull(segment, "SegmentDescriptor must not be null"); } + /** + * This may be null if {@link SegmentDescriptor} is locally available, but will definitely not be null for segments + * which must be queried remotely (e.g. {@link CachingClusteredClient}) + */ + @Nullable public ServerSelector getServer() { return lhs; diff --git a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java index 5a467645d08f..c3379bab0214 100644 --- a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java @@ -462,8 +462,7 @@ public Sequence run(QueryPlus queryPlus, ResponseContext responseContext) throw new ISE("Unexpected query received"); } - Sequence result = baseRunner.run(queryPlus.withQuery(newQuery), responseContext); - return result; + return baseRunner.run(queryPlus.withQuery(newQuery), responseContext); } } } diff --git a/server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java index 1198034f639e..d7f39adaa4ba 100644 --- a/server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java @@ -133,7 +133,7 @@ private Query prioritizeAndLaneQuery(Query query, Iterable se { Set segmentServerSelectors = new HashSet<>(); for (Segment s : segments) { - segmentServerSelectors.add(new SegmentServerSelector(null, s.getId().toDescriptor())); + segmentServerSelectors.add(new SegmentServerSelector(s.getId().toDescriptor())); } return scheduler.prioritizeAndLaneQuery(QueryPlus.wrap(query), segmentServerSelectors); } diff --git a/server/src/test/java/org/apache/druid/server/TestClusterQuerySegmentWalker.java b/server/src/test/java/org/apache/druid/server/TestClusterQuerySegmentWalker.java index 7fef5a9d71c2..cc3a406cfe78 100644 --- a/server/src/test/java/org/apache/druid/server/TestClusterQuerySegmentWalker.java +++ b/server/src/test/java/org/apache/druid/server/TestClusterQuerySegmentWalker.java @@ -165,11 +165,13 @@ public QueryRunner getQueryRunnerForSegments(final Query query, final // Wrap baseRunner in a runner that rewrites the QuerySegmentSpec to mention the specific segments. // This mimics what CachingClusteredClient on the Broker does, and is required for certain queries (like Scan) - // to function properly. + // to function properly. SegmentServerSelector does not currently mimic CachingClusteredClient, it is using + // the LocalQuerySegmentWalker constructor instead since this walker is not mimic remote DruidServer objects + // to actually serve the queries return (theQuery, responseContext) -> { if (scheduler != null) { Set segments = new HashSet<>(); - specs.forEach(spec -> segments.add(new SegmentServerSelector(null, spec))); + specs.forEach(spec -> segments.add(new SegmentServerSelector(spec))); return scheduler.run( scheduler.prioritizeAndLaneQuery(theQuery, segments), new LazySequence<>(