From 7a6905c9e9bfe94f463ea091b521e80cf28308dc Mon Sep 17 00:00:00 2001 From: leventov Date: Wed, 19 Apr 2017 15:26:29 +0300 Subject: [PATCH 1/6] Add QueryPlus. Add QueryRunner.run(QueryPlus, Map) method with default implementation, to replace QueryRunner.run(Query, Map). --- .../benchmark/query/QueryBenchmarkUtil.java | 5 +- .../query/SerializingQueryRunner.java | 6 +- .../query/scan/ScanQueryLimitRowIterator.java | 7 +- .../query/scan/ScanQueryQueryToolChest.java | 18 +-- .../query/scan/ScanQueryRunnerFactory.java | 10 +- .../query/scan/MultiSegmentScanQueryTest.java | 10 +- .../druid/indexing/kafka/KafkaIndexTask.java | 5 +- .../java/io/druid/query/AsyncQueryRunner.java | 5 +- .../io/druid/query/BySegmentQueryRunner.java | 10 +- .../query/BySegmentSkippingQueryRunner.java | 22 +++- .../druid/query/CPUTimeMetricQueryRunner.java | 10 +- .../query/ChainedExecutionQueryRunner.java | 5 +- .../io/druid/query/ConcatQueryRunner.java | 4 +- .../query/FinalizeResultsQueryRunner.java | 5 +- .../druid/query/FluentQueryRunnerBuilder.java | 4 +- .../druid/query/GroupByMergedQueryRunner.java | 8 +- .../query/IntervalChunkingQueryRunner.java | 12 +- .../query/MetricsEmittingQueryRunner.java | 7 +- .../java/io/druid/query/NoopQueryRunner.java | 2 +- .../src/main/java/io/druid/query/Query.java | 10 ++ .../main/java/io/druid/query/QueryPlus.java | 104 ++++++++++++++++++ .../main/java/io/druid/query/QueryRunner.java | 23 +++- .../io/druid/query/QueryRunnerHelper.java | 4 +- .../ReferenceCountingSegmentQueryRunner.java | 6 +- ...portTimelineMissingSegmentQueryRunner.java | 2 +- .../druid/query/ResultMergeQueryRunner.java | 5 +- .../java/io/druid/query/RetryQueryRunner.java | 10 +- .../io/druid/query/SubqueryQueryRunner.java | 8 +- .../java/io/druid/query/TimewarpOperator.java | 8 +- .../java/io/druid/query/UnionQueryRunner.java | 7 +- .../DataSourceMetadataQueryRunnerFactory.java | 10 +- .../DataSourceQueryQueryToolChest.java | 7 +- .../groupby/GroupByQueryQueryToolChest.java | 18 +-- .../groupby/GroupByQueryRunnerFactory.java | 19 ++-- .../GroupByMergingQueryRunnerV2.java | 15 +-- .../groupby/strategy/GroupByStrategyV1.java | 53 ++++----- .../groupby/strategy/GroupByStrategyV2.java | 47 ++++---- .../SegmentMetadataQueryQueryToolChest.java | 6 +- .../SegmentMetadataQueryRunnerFactory.java | 10 +- .../search/SearchQueryQueryToolChest.java | 15 ++- .../druid/query/search/SearchQueryRunner.java | 4 +- .../select/SelectQueryQueryToolChest.java | 8 +- .../select/SelectQueryRunnerFactory.java | 4 +- .../spec/SpecificSegmentQueryRunner.java | 8 +- .../TimeBoundaryQueryQueryToolChest.java | 11 +- .../TimeBoundaryQueryRunnerFactory.java | 4 +- .../TimeseriesQueryQueryToolChest.java | 12 +- .../TimeseriesQueryRunnerFactory.java | 4 +- .../query/topn/TopNQueryQueryToolChest.java | 25 +++-- .../query/topn/TopNQueryRunnerFactory.java | 8 +- .../io/druid/query/AsyncQueryRunnerTest.java | 6 +- .../ChainedExecutionQueryRunnerTest.java | 2 +- .../IntervalChunkingQueryRunnerTest.java | 6 +- .../io/druid/query/QueryRunnerTestHelper.java | 15 +-- .../io/druid/query/RetryQueryRunnerTest.java | 11 +- .../io/druid/query/TimewarpOperatorTest.java | 9 +- .../io/druid/query/UnionQueryRunnerTest.java | 8 +- .../aggregation/AggregationTestHelper.java | 7 +- .../GroupByQueryRunnerFactoryTest.java | 12 +- .../query/groupby/GroupByQueryRunnerTest.java | 52 ++++----- .../GroupByTimeseriesQueryRunnerTest.java | 6 +- .../query/search/SearchQueryRunnerTest.java | 9 +- .../spec/SpecificSegmentQueryRunnerTest.java | 6 +- .../TimeSeriesUnionQueryRunnerTest.java | 6 +- .../topn/TopNQueryQueryToolChestTest.java | 6 +- .../druid/client/CachingClusteredClient.java | 15 +-- .../io/druid/client/CachingQueryRunner.java | 8 +- .../io/druid/client/DirectDruidClient.java | 4 +- .../appenderator/AppenderatorPlumber.java | 9 +- .../java/io/druid/server/QueryResource.java | 3 +- .../client/CachingClusteredClientTest.java | 29 ++--- .../druid/client/CachingQueryRunnerTest.java | 5 +- .../coordination/ServerManagerTest.java | 5 +- .../main/java/io/druid/cli/DumpSegment.java | 3 +- .../io/druid/sql/calcite/rel/QueryMaker.java | 9 +- .../druid/sql/calcite/schema/DruidSchema.java | 3 +- .../SpecificSegmentsQuerySegmentWalker.java | 6 +- 77 files changed, 550 insertions(+), 340 deletions(-) create mode 100644 processing/src/main/java/io/druid/query/QueryPlus.java diff --git a/benchmarks/src/main/java/io/druid/benchmark/query/QueryBenchmarkUtil.java b/benchmarks/src/main/java/io/druid/benchmark/query/QueryBenchmarkUtil.java index 662b0ed71e8e..a06bb2ba9757 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/query/QueryBenchmarkUtil.java +++ b/benchmarks/src/main/java/io/druid/benchmark/query/QueryBenchmarkUtil.java @@ -25,6 +25,7 @@ import io.druid.query.FinalizeResultsQueryRunner; import io.druid.query.IntervalChunkingQueryRunnerDecorator; import io.druid.query.Query; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryToolChest; @@ -58,9 +59,9 @@ public QueryRunner decorate(final QueryRunner delegate, QueryToolChest> toolChest) { return new QueryRunner() { @Override - public Sequence run(Query query, Map responseContext) + public Sequence run(QueryPlus queryPlus, Map responseContext) { - return delegate.run(query, responseContext); + return delegate.run(queryPlus, responseContext); } }; } diff --git a/benchmarks/src/main/java/io/druid/benchmark/query/SerializingQueryRunner.java b/benchmarks/src/main/java/io/druid/benchmark/query/SerializingQueryRunner.java index 256559685524..7b70beb75d0b 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/query/SerializingQueryRunner.java +++ b/benchmarks/src/main/java/io/druid/benchmark/query/SerializingQueryRunner.java @@ -23,7 +23,7 @@ import com.google.common.base.Function; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; -import io.druid.query.Query; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import java.util.Map; @@ -47,12 +47,12 @@ public SerializingQueryRunner( @Override public Sequence run( - final Query query, + final QueryPlus queryPlus, final Map responseContext ) { return Sequences.map( - baseRunner.run(query, responseContext), + baseRunner.run(queryPlus, responseContext), new Function() { @Override diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryLimitRowIterator.java b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryLimitRowIterator.java index f102d1c3ec67..d2f07a17da31 100644 --- a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryLimitRowIterator.java +++ b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryLimitRowIterator.java @@ -22,6 +22,7 @@ import io.druid.java.util.common.guava.Yielder; import io.druid.java.util.common.guava.YieldingAccumulator; import io.druid.java.util.common.parsers.CloseableIterator; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import java.io.IOException; @@ -36,13 +37,15 @@ public class ScanQueryLimitRowIterator implements CloseableIterator baseRunner, ScanQuery query, + QueryRunner baseRunner, + QueryPlus queryPlus, Map responseContext ) { + ScanQuery query = (ScanQuery) queryPlus.getQuery(); resultFormat = query.getResultFormat(); limit = query.getLimit(); - Sequence baseSequence = baseRunner.run(query, responseContext); + Sequence baseSequence = baseRunner.run(queryPlus, responseContext); yielder = baseSequence.toYielder( null, new YieldingAccumulator() diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryQueryToolChest.java b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryQueryToolChest.java index 8cc1c7e3ffce..26edb06c9fd4 100644 --- a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryQueryToolChest.java +++ b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryQueryToolChest.java @@ -25,9 +25,10 @@ import io.druid.java.util.common.guava.BaseSequence; import io.druid.java.util.common.guava.CloseQuietly; import io.druid.java.util.common.guava.Sequence; +import io.druid.query.GenericQueryMetricsFactory; import io.druid.query.Query; import io.druid.query.QueryMetrics; -import io.druid.query.GenericQueryMetricsFactory; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryToolChest; import io.druid.query.aggregation.MetricManipulationFn; @@ -55,12 +56,12 @@ public QueryRunner mergeResults(final QueryRunner run( - final Query query, final Map responseContext + final QueryPlus queryPlus, final Map responseContext ) { - ScanQuery scanQuery = (ScanQuery) query; + ScanQuery scanQuery = (ScanQuery) queryPlus.getQuery(); if (scanQuery.getLimit() == Long.MAX_VALUE) { - return runner.run(query, responseContext); + return runner.run(queryPlus, responseContext); } return new BaseSequence<>( new BaseSequence.IteratorMaker() @@ -68,7 +69,7 @@ public Sequence run( @Override public ScanQueryLimitRowIterator make() { - return new ScanQueryLimitRowIterator(runner, (ScanQuery) query, responseContext); + return new ScanQueryLimitRowIterator(runner, queryPlus, responseContext); } @Override @@ -109,14 +110,15 @@ public QueryRunner preMergeQueryDecoration(final QueryRunner run( - Query query, Map responseContext + QueryPlus queryPlus, Map responseContext ) { - ScanQuery scanQuery = (ScanQuery) query; + ScanQuery scanQuery = (ScanQuery) queryPlus.getQuery(); if (scanQuery.getDimensionsFilter() != null) { scanQuery = scanQuery.withDimFilter(scanQuery.getDimensionsFilter().optimize()); + queryPlus = queryPlus.withQuery(scanQuery); } - return runner.run(scanQuery, responseContext); + return runner.run(queryPlus, responseContext); } }; } diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryRunnerFactory.java b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryRunnerFactory.java index fcfe11b43664..0fb7d16a03cb 100644 --- a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryRunnerFactory.java +++ b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryRunnerFactory.java @@ -26,6 +26,7 @@ import io.druid.java.util.common.guava.Sequences; import io.druid.query.Query; import io.druid.query.QueryContexts; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryToolChest; @@ -70,12 +71,12 @@ public QueryRunner mergeRunners( { @Override public Sequence run( - final Query query, final Map responseContext + final QueryPlus queryPlus, final Map responseContext ) { // Note: this variable is effective only when queryContext has a timeout. // See the comment of CTX_TIMEOUT_AT. - final long timeoutAt = System.currentTimeMillis() + QueryContexts.getTimeout(query); + final long timeoutAt = System.currentTimeMillis() + QueryContexts.getTimeout(queryPlus.getQuery()); responseContext.put(CTX_TIMEOUT_AT, timeoutAt); return Sequences.concat( Sequences.map( @@ -85,7 +86,7 @@ public Sequence run( @Override public Sequence apply(final QueryRunner input) { - return input.run(query, responseContext); + return input.run(queryPlus, responseContext); } } ) @@ -113,9 +114,10 @@ public ScanQueryRunner(ScanQueryEngine engine, Segment segment) @Override public Sequence run( - Query query, Map responseContext + QueryPlus queryPlus, Map responseContext ) { + Query query = queryPlus.getQuery(); if (!(query instanceof ScanQuery)) { throw new ISE("Got a [%s] which isn't a %s", query.getClass(), ScanQuery.class); } diff --git a/extensions-contrib/scan-query/src/test/java/io/druid/query/scan/MultiSegmentScanQueryTest.java b/extensions-contrib/scan-query/src/test/java/io/druid/query/scan/MultiSegmentScanQueryTest.java index 8f088ad8a687..254f4dceda96 100644 --- a/extensions-contrib/scan-query/src/test/java/io/druid/query/scan/MultiSegmentScanQueryTest.java +++ b/extensions-contrib/scan-query/src/test/java/io/druid/query/scan/MultiSegmentScanQueryTest.java @@ -28,7 +28,7 @@ import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; import io.druid.query.DefaultGenericQueryMetricsFactory; -import io.druid.query.Query; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryRunnerTestHelper; @@ -214,15 +214,15 @@ public void testMergeResultsWithLimit() new QueryRunner() { @Override public Sequence run( - Query query, Map responseContext + QueryPlus queryPlus, Map responseContext ) { // simulate results back from 2 historicals List> sequences = Lists.newArrayListWithExpectedSize(2); - sequences.add(factory.createRunner(segment0).run(query, new HashMap())); - sequences.add(factory.createRunner(segment1).run(query, new HashMap())); + sequences.add(factory.createRunner(segment0).run(queryPlus, new HashMap())); + sequences.add(factory.createRunner(segment1).run(queryPlus, new HashMap())); return new MergeSequence<>( - query.getResultOrdering(), + queryPlus.getQuery().getResultOrdering(), Sequences.simple(sequences) ); } diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java index b344b8a3e9b5..d6e34082fe7e 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java @@ -57,6 +57,7 @@ import io.druid.query.DruidMetrics; import io.druid.query.NoopQueryRunner; import io.druid.query.Query; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.RealtimeIOConfig; @@ -625,9 +626,9 @@ public QueryRunner getQueryRunner(Query query) return new QueryRunner() { @Override - public Sequence run(final Query query, final Map responseContext) + public Sequence run(final QueryPlus queryPlus, final Map responseContext) { - return query.run(appenderator, responseContext); + return queryPlus.run(appenderator, responseContext); } }; } diff --git a/processing/src/main/java/io/druid/query/AsyncQueryRunner.java b/processing/src/main/java/io/druid/query/AsyncQueryRunner.java index ce5f1fc64b1b..8de2a2b17d7c 100644 --- a/processing/src/main/java/io/druid/query/AsyncQueryRunner.java +++ b/processing/src/main/java/io/druid/query/AsyncQueryRunner.java @@ -47,8 +47,9 @@ public AsyncQueryRunner(QueryRunner baseRunner, ExecutorService executor, Que } @Override - public Sequence run(final Query query, final Map responseContext) + public Sequence run(final QueryPlus queryPlus, final Map responseContext) { + final Query query = queryPlus.getQuery(); final int priority = QueryContexts.getPriority(query); final ListenableFuture> future = executor.submit(new AbstractPrioritizedCallable>(priority) { @@ -57,7 +58,7 @@ public Sequence call() throws Exception { //Note: this is assumed that baseRunner does most of the work eagerly on call to the //run() method and resulting sequence accumulate/yield is fast. - return baseRunner.run(query, responseContext); + return baseRunner.run(queryPlus, responseContext); } }); queryWatcher.registerQuery(query, future); diff --git a/processing/src/main/java/io/druid/query/BySegmentQueryRunner.java b/processing/src/main/java/io/druid/query/BySegmentQueryRunner.java index e57b9471ce3b..e3781825aadf 100644 --- a/processing/src/main/java/io/druid/query/BySegmentQueryRunner.java +++ b/processing/src/main/java/io/druid/query/BySegmentQueryRunner.java @@ -49,10 +49,10 @@ public BySegmentQueryRunner( @Override @SuppressWarnings("unchecked") - public Sequence run(final Query query, Map responseContext) + public Sequence run(final QueryPlus queryPlus, Map responseContext) { - if (QueryContexts.isBySegment(query)) { - final Sequence baseSequence = base.run(query, responseContext); + if (QueryContexts.isBySegment(queryPlus.getQuery())) { + final Sequence baseSequence = base.run(queryPlus, responseContext); final List results = Sequences.toList(baseSequence, Lists.newArrayList()); return Sequences.simple( Arrays.asList( @@ -61,12 +61,12 @@ public Sequence run(final Query query, Map responseContext new BySegmentResultValueClass( results, segmentIdentifier, - query.getIntervals().get(0) + queryPlus.getQuery().getIntervals().get(0) ) ) ) ); } - return base.run(query, responseContext); + return base.run(queryPlus, responseContext); } } diff --git a/processing/src/main/java/io/druid/query/BySegmentSkippingQueryRunner.java b/processing/src/main/java/io/druid/query/BySegmentSkippingQueryRunner.java index 373c2e3b1192..9c103a29d747 100644 --- a/processing/src/main/java/io/druid/query/BySegmentSkippingQueryRunner.java +++ b/processing/src/main/java/io/druid/query/BySegmentSkippingQueryRunner.java @@ -38,14 +38,26 @@ public BySegmentSkippingQueryRunner( } @Override - public Sequence run(Query query, Map responseContext) + public Sequence run(QueryPlus queryPlus, Map responseContext) { - if (QueryContexts.isBySegment(query)) { - return baseRunner.run(query, responseContext); + if (QueryContexts.isBySegment(queryPlus.getQuery())) { + return baseRunner.run(queryPlus, responseContext); } - return doRun(baseRunner, query, responseContext); + return doRun(baseRunner, queryPlus, responseContext); } - protected abstract Sequence doRun(QueryRunner baseRunner, Query query, Map context); + /** + * @deprecated override {@link #doRun(QueryRunner, QueryPlus, Map)} instead + */ + @Deprecated + protected Sequence doRun(QueryRunner baseRunner, Query query, Map context) + { + return doRun(baseRunner, QueryPlus.wrap(query), context); + } + + protected Sequence doRun(QueryRunner baseRunner, QueryPlus queryPlus, Map context) + { + return doRun(baseRunner, queryPlus.getQuery(), context); + } } diff --git a/processing/src/main/java/io/druid/query/CPUTimeMetricQueryRunner.java b/processing/src/main/java/io/druid/query/CPUTimeMetricQueryRunner.java index 7aefbb7ad472..141f2bd075bb 100644 --- a/processing/src/main/java/io/druid/query/CPUTimeMetricQueryRunner.java +++ b/processing/src/main/java/io/druid/query/CPUTimeMetricQueryRunner.java @@ -58,11 +58,11 @@ private CPUTimeMetricQueryRunner( @Override - public Sequence run( - final Query query, final Map responseContext - ) + public Sequence run(final QueryPlus queryPlus, final Map responseContext) { - final Sequence baseSequence = delegate.run(query, responseContext); + final QueryPlus queryWithMetrics = + queryPlus.withQueryMetrics((QueryToolChest>) queryToolChest); + final Sequence baseSequence = delegate.run(queryWithMetrics, responseContext); return Sequences.wrap( baseSequence, new SequenceWrapper() @@ -84,7 +84,7 @@ public void after(boolean isDone, Throwable thrown) throws Exception if (report) { final long cpuTimeNs = cpuTimeAccumulator.get(); if (cpuTimeNs > 0) { - queryToolChest.makeMetrics(query).reportCpuTime(cpuTimeNs).emit(emitter); + queryWithMetrics.getQueryMetrics().reportCpuTime(cpuTimeNs).emit(emitter); } } } diff --git a/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java b/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java index 117d6436082b..7ef86f51a4df 100644 --- a/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java +++ b/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java @@ -89,8 +89,9 @@ public ChainedExecutionQueryRunner( } @Override - public Sequence run(final Query query, final Map responseContext) + public Sequence run(final QueryPlus queryPlus, final Map responseContext) { + Query query = queryPlus.getQuery(); final int priority = QueryContexts.getPriority(query); final Ordering ordering = query.getResultOrdering(); @@ -121,7 +122,7 @@ public ListenableFuture> apply(final QueryRunner input) public Iterable call() throws Exception { try { - Sequence result = input.run(query, responseContext); + Sequence result = input.run(queryPlus, responseContext); if (result == null) { throw new ISE("Got a null result! Segments are missing!"); } diff --git a/processing/src/main/java/io/druid/query/ConcatQueryRunner.java b/processing/src/main/java/io/druid/query/ConcatQueryRunner.java index e36ec4cdc2d1..2237ad8c5b1a 100644 --- a/processing/src/main/java/io/druid/query/ConcatQueryRunner.java +++ b/processing/src/main/java/io/druid/query/ConcatQueryRunner.java @@ -38,7 +38,7 @@ public ConcatQueryRunner( } @Override - public Sequence run(final Query query, final Map responseContext) + public Sequence run(final QueryPlus queryPlus, final Map responseContext) { return Sequences.concat( Sequences.map( @@ -48,7 +48,7 @@ public Sequence run(final Query query, final Map responseC @Override public Sequence apply(final QueryRunner input) { - return input.run(query, responseContext); + return input.run(queryPlus, responseContext); } } ) diff --git a/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java b/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java index 7fe58ee06b90..acf52b2aea46 100644 --- a/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java +++ b/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java @@ -47,8 +47,9 @@ public FinalizeResultsQueryRunner( } @Override - public Sequence run(final Query query, Map responseContext) + public Sequence run(final QueryPlus queryPlus, Map responseContext) { + final Query query = queryPlus.getQuery(); final boolean isBySegment = QueryContexts.isBySegment(query); final boolean shouldFinalize = QueryContexts.isFinalize(query, true); @@ -100,7 +101,7 @@ public T apply(T input) return Sequences.map( - baseRunner.run(queryToRun, responseContext), + baseRunner.run(queryPlus.withQuery(queryToRun), responseContext), finalizerFn ); diff --git a/processing/src/main/java/io/druid/query/FluentQueryRunnerBuilder.java b/processing/src/main/java/io/druid/query/FluentQueryRunnerBuilder.java index 71bf44451cda..a52da78c9ef5 100644 --- a/processing/src/main/java/io/druid/query/FluentQueryRunnerBuilder.java +++ b/processing/src/main/java/io/druid/query/FluentQueryRunnerBuilder.java @@ -49,10 +49,10 @@ public FluentQueryRunner(QueryRunner runner) @Override public Sequence run( - Query query, Map responseContext + QueryPlus queryPlus, Map responseContext ) { - return baseRunner.run(query, responseContext); + return baseRunner.run(queryPlus, responseContext); } public FluentQueryRunner from(QueryRunner runner) { diff --git a/processing/src/main/java/io/druid/query/GroupByMergedQueryRunner.java b/processing/src/main/java/io/druid/query/GroupByMergedQueryRunner.java index 77775f295c6f..295e79eee23f 100644 --- a/processing/src/main/java/io/druid/query/GroupByMergedQueryRunner.java +++ b/processing/src/main/java/io/druid/query/GroupByMergedQueryRunner.java @@ -78,9 +78,9 @@ public GroupByMergedQueryRunner( } @Override - public Sequence run(final Query queryParam, final Map responseContext) + public Sequence run(final QueryPlus queryPlus, final Map responseContext) { - final GroupByQuery query = (GroupByQuery) queryParam; + final GroupByQuery query = (GroupByQuery) queryPlus.getQuery(); final GroupByQueryConfig querySpecificConfig = configSupplier.get().withOverrides(query); final boolean isSingleThreaded = querySpecificConfig.isSingleThreaded(); final Pair> indexAccumulatorPair = GroupByQueryHelper.createIndexAccumulatorPair( @@ -114,10 +114,10 @@ public Void call() throws Exception { try { if (bySegment) { - input.run(queryParam, responseContext) + input.run(queryPlus, responseContext) .accumulate(bySegmentAccumulatorPair.lhs, bySegmentAccumulatorPair.rhs); } else { - input.run(queryParam, responseContext) + input.run(queryPlus, responseContext) .accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs); } diff --git a/processing/src/main/java/io/druid/query/IntervalChunkingQueryRunner.java b/processing/src/main/java/io/druid/query/IntervalChunkingQueryRunner.java index 5e8b529b1535..a6b26072f6df 100644 --- a/processing/src/main/java/io/druid/query/IntervalChunkingQueryRunner.java +++ b/processing/src/main/java/io/druid/query/IntervalChunkingQueryRunner.java @@ -63,18 +63,18 @@ public IntervalChunkingQueryRunner( } @Override - public Sequence run(final Query query, final Map responseContext) + public Sequence run(final QueryPlus queryPlus, final Map responseContext) { - final Period chunkPeriod = getChunkPeriod(query); + final Period chunkPeriod = getChunkPeriod(queryPlus.getQuery()); // Check for non-empty chunkPeriod, avoiding toStandardDuration since that cannot handle periods like P1M. if (EPOCH.plus(chunkPeriod).getMillis() == EPOCH.getMillis()) { - return baseRunner.run(query, responseContext); + return baseRunner.run(queryPlus, responseContext); } List chunkIntervals = Lists.newArrayList( FunctionalIterable - .create(query.getIntervals()) + .create(queryPlus.getQuery().getIntervals()) .transformCat( new Function>() { @@ -88,7 +88,7 @@ public Iterable apply(Interval input) ); if (chunkIntervals.size() <= 1) { - return baseRunner.run(query, responseContext); + return baseRunner.run(queryPlus, responseContext); } return Sequences.concat( @@ -113,7 +113,7 @@ public Sequence apply(Interval singleInterval) ), executor, queryWatcher ).run( - query.withQuerySegmentSpec(new MultipleIntervalSegmentSpec(Arrays.asList(singleInterval))), + queryPlus.withQuerySegmentSpec(new MultipleIntervalSegmentSpec(Arrays.asList(singleInterval))), responseContext ); } diff --git a/processing/src/main/java/io/druid/query/MetricsEmittingQueryRunner.java b/processing/src/main/java/io/druid/query/MetricsEmittingQueryRunner.java index 096fb39d45fc..582bdae10a30 100644 --- a/processing/src/main/java/io/druid/query/MetricsEmittingQueryRunner.java +++ b/processing/src/main/java/io/druid/query/MetricsEmittingQueryRunner.java @@ -82,9 +82,10 @@ public MetricsEmittingQueryRunner withWaitMeasuredFromNow() } @Override - public Sequence run(final Query query, final Map responseContext) + public Sequence run(final QueryPlus queryPlus, final Map responseContext) { - final QueryMetrics> queryMetrics = queryToolChest.makeMetrics(query); + QueryPlus queryWithMetrics = queryPlus.withQueryMetrics((QueryToolChest>) queryToolChest); + final QueryMetrics> queryMetrics = (QueryMetrics>) queryWithMetrics.getQueryMetrics(); applyCustomDimensions.accept(queryMetrics); @@ -97,7 +98,7 @@ public Sequence run(final Query query, final Map responseC @Override public Sequence get() { - return queryRunner.run(query, responseContext); + return queryRunner.run(queryWithMetrics, responseContext); } }), new SequenceWrapper() diff --git a/processing/src/main/java/io/druid/query/NoopQueryRunner.java b/processing/src/main/java/io/druid/query/NoopQueryRunner.java index 3d3aa19b30d2..b058a4d89399 100644 --- a/processing/src/main/java/io/druid/query/NoopQueryRunner.java +++ b/processing/src/main/java/io/druid/query/NoopQueryRunner.java @@ -29,7 +29,7 @@ public class NoopQueryRunner implements QueryRunner { @Override - public Sequence run(Query query, Map responseContext) + public Sequence run(QueryPlus queryPlus, Map responseContext) { return Sequences.empty(); } diff --git a/processing/src/main/java/io/druid/query/Query.java b/processing/src/main/java/io/druid/query/Query.java index cfbf6f2d3403..c6c3c281c02a 100644 --- a/processing/src/main/java/io/druid/query/Query.java +++ b/processing/src/main/java/io/druid/query/Query.java @@ -70,8 +70,18 @@ public interface Query String getType(); + /** + * @deprecated use {@link QueryPlus#run(QuerySegmentWalker, Map)} instead. This method could be removed in the next + * minor or major version of Druid. + */ + @Deprecated Sequence run(QuerySegmentWalker walker, Map context); + /** + * @deprecated use {@link QueryRunner#run(QueryPlus, Map)} instead. This method could be removed in the next minor or + * major version of Druid. + */ + @Deprecated Sequence run(QueryRunner runner, Map context); List getIntervals(); diff --git a/processing/src/main/java/io/druid/query/QueryPlus.java b/processing/src/main/java/io/druid/query/QueryPlus.java new file mode 100644 index 000000000000..ae6977011776 --- /dev/null +++ b/processing/src/main/java/io/druid/query/QueryPlus.java @@ -0,0 +1,104 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query; + +import com.google.common.base.Preconditions; +import io.druid.java.util.common.guava.Sequence; +import io.druid.query.spec.QuerySegmentSpec; + +import javax.annotation.Nullable; +import java.util.Map; + +/** + * An immutable composite object of {@link Query} + extra stuff needed in {@link QueryRunner}s. This "extra stuff" + * is only {@link QueryMetrics} yet. + */ +public final class QueryPlus +{ + /** + * Returns the minimum bare QueryPlus object with the given query. {@link #getQueryMetrics()} of the QueryPlus object, + * returned from this factory method, returns {@code null}. + */ + public static QueryPlus wrap(Query query) + { + Preconditions.checkNotNull(query); + return new QueryPlus<>(query, null); + } + + private final Query query; + private final QueryMetrics queryMetrics; + + private QueryPlus(Query query, QueryMetrics queryMetrics) + { + this.query = query; + this.queryMetrics = queryMetrics; + } + + public Query getQuery() + { + return query; + } + + @Nullable + public QueryMetrics getQueryMetrics() + { + return queryMetrics; + } + + /** + * Returns the same QueryPlus object, if it already has {@link QueryMetrics} ({@link #getQueryMetrics()} returns not + * null), or returns a new QueryPlus object with {@link Query} from this QueryPlus and QueryMetrics created using the + * given {@link QueryToolChest}, via {@link QueryToolChest#makeMetrics(Query)} method. + */ + public QueryPlus withQueryMetrics(QueryToolChest> queryToolChest) + { + if (queryMetrics != null) { + return this; + } else { + return new QueryPlus<>(query, ((QueryToolChest) queryToolChest).makeMetrics(query)); + } + } + + /** + * Equivalent of withQuery(getQuery().withQuerySegmentSpec(spec)). + */ + public QueryPlus withQuerySegmentSpec(QuerySegmentSpec spec) + { + return new QueryPlus<>(query.withQuerySegmentSpec(spec), queryMetrics); + } + + /** + * Returns a QueryPlus object with {@link QueryMetrics} from this QueryPlus object, and the provided {@link Query}. + */ + public QueryPlus withQuery(Query replacementQuery) + { + return new QueryPlus<>(replacementQuery, queryMetrics); + } + + public Sequence run(QuerySegmentWalker walker, Map context) + { + if (query instanceof BaseQuery) { + return ((BaseQuery) query).getQuerySegmentSpec().lookup(query, walker).run(this, context); + } else { + // fallback + return query.run(walker, context); + } + } +} diff --git a/processing/src/main/java/io/druid/query/QueryRunner.java b/processing/src/main/java/io/druid/query/QueryRunner.java index 58cdf9d70813..dc8b260a58d9 100644 --- a/processing/src/main/java/io/druid/query/QueryRunner.java +++ b/processing/src/main/java/io/druid/query/QueryRunner.java @@ -24,14 +24,27 @@ import java.util.Map; /** + * This interface has two similar run() methods. {@link #run(Query, Map)} is legacy and {@link #run(QueryPlus, Map)} + * is the new one. Their default implementations delegate to each other. Every implementation of QueryRunner should + * override only one of those methods. New implementations should override the new method: {@link #run(QueryPlus, Map)}. */ public interface QueryRunner { /** - * Runs the given query and returns results in a time-ordered sequence - * @param query - * @param responseContext - * @return + * @deprecated use and override {@link #run(QueryPlus, Map)} instead. This method could be removed in the next minor + * or major version of Druid. */ - Sequence run(Query query, Map responseContext); + @Deprecated + default Sequence run(Query query, Map responseContext) + { + return run(QueryPlus.wrap(query), responseContext); + } + + /** + * Runs the given query and returns results in a time-ordered sequence. + */ + default Sequence run(QueryPlus queryPlus, Map responseContext) + { + return run(queryPlus.getQuery(), responseContext); + } } diff --git a/processing/src/main/java/io/druid/query/QueryRunnerHelper.java b/processing/src/main/java/io/druid/query/QueryRunnerHelper.java index 5f9a62ccdf24..ed619f2711cf 100644 --- a/processing/src/main/java/io/druid/query/QueryRunnerHelper.java +++ b/processing/src/main/java/io/druid/query/QueryRunnerHelper.java @@ -77,9 +77,9 @@ public static QueryRunner makeClosingQueryRunner(final QueryRunner ru return new QueryRunner() { @Override - public Sequence run(Query query, Map responseContext) + public Sequence run(QueryPlus queryPlus, Map responseContext) { - return Sequences.withBaggage(runner.run(query, responseContext), closeable); + return Sequences.withBaggage(runner.run(queryPlus, responseContext), closeable); } }; } diff --git a/processing/src/main/java/io/druid/query/ReferenceCountingSegmentQueryRunner.java b/processing/src/main/java/io/druid/query/ReferenceCountingSegmentQueryRunner.java index a85a6f911286..d513e0794098 100644 --- a/processing/src/main/java/io/druid/query/ReferenceCountingSegmentQueryRunner.java +++ b/processing/src/main/java/io/druid/query/ReferenceCountingSegmentQueryRunner.java @@ -47,12 +47,12 @@ public ReferenceCountingSegmentQueryRunner( } @Override - public Sequence run(final Query query, Map responseContext) + public Sequence run(final QueryPlus queryPlus, Map responseContext) { final Closeable closeable = adapter.increment(); if (closeable != null) { try { - final Sequence baseSequence = factory.createRunner(adapter).run(query, responseContext); + final Sequence baseSequence = factory.createRunner(adapter).run(queryPlus, responseContext); return Sequences.withBaggage(baseSequence, closeable); } @@ -62,7 +62,7 @@ public Sequence run(final Query query, Map responseContext } } else { // Segment was closed before we had a chance to increment the reference count - return new ReportTimelineMissingSegmentQueryRunner(descriptor).run(query, responseContext); + return new ReportTimelineMissingSegmentQueryRunner(descriptor).run(queryPlus, responseContext); } } } diff --git a/processing/src/main/java/io/druid/query/ReportTimelineMissingSegmentQueryRunner.java b/processing/src/main/java/io/druid/query/ReportTimelineMissingSegmentQueryRunner.java index a8b6c5e2ffd6..7b60899249bb 100644 --- a/processing/src/main/java/io/druid/query/ReportTimelineMissingSegmentQueryRunner.java +++ b/processing/src/main/java/io/druid/query/ReportTimelineMissingSegmentQueryRunner.java @@ -39,7 +39,7 @@ public ReportTimelineMissingSegmentQueryRunner(SegmentDescriptor descriptor) @Override public Sequence run( - Query query, Map responseContext + QueryPlus queryPlus, Map responseContext ) { List missingSegments = (List) responseContext.get(Result.MISSING_SEGMENTS_KEY); diff --git a/processing/src/main/java/io/druid/query/ResultMergeQueryRunner.java b/processing/src/main/java/io/druid/query/ResultMergeQueryRunner.java index b6276a107f3c..e160cbf75b59 100644 --- a/processing/src/main/java/io/druid/query/ResultMergeQueryRunner.java +++ b/processing/src/main/java/io/druid/query/ResultMergeQueryRunner.java @@ -38,9 +38,10 @@ public ResultMergeQueryRunner( } @Override - public Sequence doRun(QueryRunner baseRunner, Query query, Map context) + public Sequence doRun(QueryRunner baseRunner, QueryPlus queryPlus, Map context) { - return CombiningSequence.create(baseRunner.run(query, context), makeOrdering(query), createMergeFn(query)); + Query query = queryPlus.getQuery(); + return CombiningSequence.create(baseRunner.run(queryPlus, context), makeOrdering(query), createMergeFn(query)); } protected abstract Ordering makeOrdering(Query query); diff --git a/processing/src/main/java/io/druid/query/RetryQueryRunner.java b/processing/src/main/java/io/druid/query/RetryQueryRunner.java index 57a5cde5124b..dc0a6df06e8d 100644 --- a/processing/src/main/java/io/druid/query/RetryQueryRunner.java +++ b/processing/src/main/java/io/druid/query/RetryQueryRunner.java @@ -61,10 +61,10 @@ public RetryQueryRunner( } @Override - public Sequence run(final Query query, final Map context) + public Sequence run(final QueryPlus queryPlus, final Map context) { final List> listOfSequences = Lists.newArrayList(); - listOfSequences.add(baseRunner.run(query, context)); + listOfSequences.add(baseRunner.run(queryPlus, context)); return new YieldingSequenceBase() { @@ -80,12 +80,12 @@ public Yielder toYielder( log.info("[%,d] missing segments found. Retry attempt [%,d]", missingSegments.size(), i); context.put(Result.MISSING_SEGMENTS_KEY, Lists.newArrayList()); - final Query retryQuery = query.withQuerySegmentSpec( + final QueryPlus retryQueryPlus = queryPlus.withQuerySegmentSpec( new MultipleSpecificSegmentSpec( missingSegments ) ); - Sequence retrySequence = baseRunner.run(retryQuery, context); + Sequence retrySequence = baseRunner.run(retryQueryPlus, context); listOfSequences.add(retrySequence); missingSegments = getMissingSegments(context); if (missingSegments.isEmpty()) { @@ -99,7 +99,7 @@ public Yielder toYielder( } return new MergeSequence<>( - query.getResultOrdering(), + queryPlus.getQuery().getResultOrdering(), Sequences.simple(listOfSequences)).toYielder( initValue, accumulator ); diff --git a/processing/src/main/java/io/druid/query/SubqueryQueryRunner.java b/processing/src/main/java/io/druid/query/SubqueryQueryRunner.java index e3dc7356c243..b2bc013e1dc8 100644 --- a/processing/src/main/java/io/druid/query/SubqueryQueryRunner.java +++ b/processing/src/main/java/io/druid/query/SubqueryQueryRunner.java @@ -36,13 +36,13 @@ public SubqueryQueryRunner(QueryRunner baseRunner) } @Override - public Sequence run(final Query query, Map responseContext) + public Sequence run(final QueryPlus queryPlus, Map responseContext) { - DataSource dataSource = query.getDataSource(); + DataSource dataSource = queryPlus.getQuery().getDataSource(); if (dataSource instanceof QueryDataSource) { - return run((Query) ((QueryDataSource) dataSource).getQuery(), responseContext); + return run(queryPlus.withQuery((Query) ((QueryDataSource) dataSource).getQuery()), responseContext); } else { - return baseRunner.run(query, responseContext); + return baseRunner.run(queryPlus, responseContext); } } } diff --git a/processing/src/main/java/io/druid/query/TimewarpOperator.java b/processing/src/main/java/io/druid/query/TimewarpOperator.java index 0444d95d518c..d6a9e8ac4593 100644 --- a/processing/src/main/java/io/druid/query/TimewarpOperator.java +++ b/processing/src/main/java/io/druid/query/TimewarpOperator.java @@ -81,18 +81,18 @@ public QueryRunner postProcess(final QueryRunner baseRunner, final long no return new QueryRunner() { @Override - public Sequence run(final Query query, final Map responseContext) + public Sequence run(final QueryPlus queryPlus, final Map responseContext) { final long offset = computeOffset(now); - final Interval interval = query.getIntervals().get(0); + final Interval interval = queryPlus.getQuery().getIntervals().get(0); final Interval modifiedInterval = new Interval( Math.min(interval.getStartMillis() + offset, now + offset), Math.min(interval.getEndMillis() + offset, now + offset) ); return Sequences.map( baseRunner.run( - query.withQuerySegmentSpec(new MultipleIntervalSegmentSpec(Arrays.asList(modifiedInterval))), + queryPlus.withQuerySegmentSpec(new MultipleIntervalSegmentSpec(Arrays.asList(modifiedInterval))), responseContext ), new Function() @@ -113,7 +113,7 @@ public T apply(T input) final DateTime maxTime = boundary.getMaxTime(); - return (T) ((TimeBoundaryQuery) query).buildResult( + return (T) ((TimeBoundaryQuery) queryPlus.getQuery()).buildResult( new DateTime(Math.min(res.getTimestamp().getMillis() - offset, now)), minTime != null ? minTime.minus(offset) : null, maxTime != null ? new DateTime(Math.min(maxTime.getMillis() - offset, now)) : null diff --git a/processing/src/main/java/io/druid/query/UnionQueryRunner.java b/processing/src/main/java/io/druid/query/UnionQueryRunner.java index cfb337d9c94d..815838416c57 100644 --- a/processing/src/main/java/io/druid/query/UnionQueryRunner.java +++ b/processing/src/main/java/io/druid/query/UnionQueryRunner.java @@ -39,8 +39,9 @@ public UnionQueryRunner( } @Override - public Sequence run(final Query query, final Map responseContext) + public Sequence run(final QueryPlus queryPlus, final Map responseContext) { + Query query = queryPlus.getQuery(); DataSource dataSource = query.getDataSource(); if (dataSource instanceof UnionDataSource) { @@ -55,7 +56,7 @@ public Sequence run(final Query query, final Map responseC public Sequence apply(DataSource singleSource) { return baseRunner.run( - query.withDataSource(singleSource), + queryPlus.withQuery(query.withDataSource(singleSource)), responseContext ); } @@ -64,7 +65,7 @@ public Sequence apply(DataSource singleSource) ) ); } else { - return baseRunner.run(query, responseContext); + return baseRunner.run(queryPlus, responseContext); } } diff --git a/processing/src/main/java/io/druid/query/datasourcemetadata/DataSourceMetadataQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/datasourcemetadata/DataSourceMetadataQueryRunnerFactory.java index c137f75c9b1c..b60522bd6960 100644 --- a/processing/src/main/java/io/druid/query/datasourcemetadata/DataSourceMetadataQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/datasourcemetadata/DataSourceMetadataQueryRunnerFactory.java @@ -25,6 +25,7 @@ import io.druid.java.util.common.guava.Sequence; import io.druid.query.ChainedExecutionQueryRunner; import io.druid.query.Query; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryToolChest; @@ -88,15 +89,16 @@ public DataSourceMetadataQueryRunner(Segment segment) @Override public Sequence> run( - Query> input, + QueryPlus> input, Map responseContext ) { - if (!(input instanceof DataSourceMetadataQuery)) { - throw new ISE("Got a [%s] which isn't a %s", input.getClass().getCanonicalName(), DataSourceMetadataQuery.class); + Query> query = input.getQuery(); + if (!(query instanceof DataSourceMetadataQuery)) { + throw new ISE("Got a [%s] which isn't a %s", query.getClass().getCanonicalName(), DataSourceMetadataQuery.class); } - final DataSourceMetadataQuery legacyQuery = (DataSourceMetadataQuery) input; + final DataSourceMetadataQuery legacyQuery = (DataSourceMetadataQuery) query; return new BaseSequence<>( new BaseSequence.IteratorMaker, Iterator>>() diff --git a/processing/src/main/java/io/druid/query/datasourcemetadata/DataSourceQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/datasourcemetadata/DataSourceQueryQueryToolChest.java index 1c5b2eb00ea7..03b52599cc31 100644 --- a/processing/src/main/java/io/druid/query/datasourcemetadata/DataSourceQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/datasourcemetadata/DataSourceQueryQueryToolChest.java @@ -33,6 +33,7 @@ import io.druid.query.GenericQueryMetricsFactory; import io.druid.query.Query; import io.druid.query.QueryMetrics; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryToolChest; import io.druid.query.Result; @@ -93,15 +94,15 @@ public QueryRunner> mergeResults( @Override protected Sequence> doRun( QueryRunner> baseRunner, - Query> input, + QueryPlus> input, Map context ) { - DataSourceMetadataQuery query = (DataSourceMetadataQuery) input; + DataSourceMetadataQuery query = (DataSourceMetadataQuery) input.getQuery(); return Sequences.simple( query.mergeResults( Sequences.toList( - baseRunner.run(query, context), + baseRunner.run(input, context), Lists.>newArrayList() ) ) diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java index ddf9c4277c3f..19d8c541d7da 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -41,9 +41,9 @@ import io.druid.query.CacheStrategy; import io.druid.query.DataSource; import io.druid.query.IntervalChunkingQueryRunnerDecorator; -import io.druid.query.Query; import io.druid.query.QueryContexts; import io.druid.query.QueryDataSource; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryToolChest; import io.druid.query.SubqueryQueryRunner; @@ -113,13 +113,13 @@ public QueryRunner mergeResults(final QueryRunner runner) return new QueryRunner() { @Override - public Sequence run(Query query, Map responseContext) + public Sequence run(QueryPlus queryPlus, Map responseContext) { - if (QueryContexts.isBySegment(query)) { - return runner.run(query, responseContext); + if (QueryContexts.isBySegment(queryPlus.getQuery())) { + return runner.run(queryPlus, responseContext); } - final GroupByQuery groupByQuery = (GroupByQuery) query; + final GroupByQuery groupByQuery = (GroupByQuery) queryPlus.getQuery(); if (strategySelector.strategize(groupByQuery).doMergeResults(groupByQuery)) { return initAndMergeGroupByResults( groupByQuery, @@ -127,7 +127,7 @@ public Sequence run(Query query, Map responseContext) responseContext ); } - return runner.run(query, responseContext); + return runner.run(queryPlus, responseContext); } }; } @@ -327,9 +327,9 @@ public QueryRunner preMergeQueryDecoration(final QueryRunner runner) new QueryRunner() { @Override - public Sequence run(Query query, Map responseContext) + public Sequence run(QueryPlus queryPlus, Map responseContext) { - GroupByQuery groupByQuery = (GroupByQuery) query; + GroupByQuery groupByQuery = (GroupByQuery) queryPlus.getQuery(); if (groupByQuery.getDimFilter() != null) { groupByQuery = groupByQuery.withDimFilter(groupByQuery.getDimFilter().optimize()); } @@ -365,7 +365,7 @@ public String apply(DimensionSpec input) GroupByQueryQueryToolChest.this ) .run( - delegateGroupByQuery.withDimensionSpecs(dimensionSpecs), + queryPlus.withQuery(delegateGroupByQuery.withDimensionSpecs(dimensionSpecs)), responseContext ); } diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryRunnerFactory.java index 5f677e0cf7c6..c35372476867 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryRunnerFactory.java @@ -26,6 +26,7 @@ import io.druid.java.util.common.ISE; import io.druid.java.util.common.guava.Sequence; import io.druid.query.Query; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryToolChest; @@ -68,12 +69,13 @@ public QueryRunner mergeRunners(final ExecutorService exec, final Iterable< return new QueryRunner() { @Override - public Sequence run(Query query, Map responseContext) + public Sequence run(QueryPlus queryPlus, Map responseContext) { - return strategySelector.strategize((GroupByQuery) query).mergeRunners(queryExecutor, queryRunners).run( - query, - responseContext + QueryRunner rowQueryRunner = strategySelector.strategize((GroupByQuery) queryPlus.getQuery()).mergeRunners( + queryExecutor, + queryRunners ); + return rowQueryRunner.run(queryPlus, responseContext); } }; } @@ -96,13 +98,14 @@ public GroupByQueryRunner(Segment segment, final GroupByStrategySelector strateg } @Override - public Sequence run(Query input, Map responseContext) + public Sequence run(QueryPlus queryPlus, Map responseContext) { - if (!(input instanceof GroupByQuery)) { - throw new ISE("Got a [%s] which isn't a %s", input.getClass(), GroupByQuery.class); + Query query = queryPlus.getQuery(); + if (!(query instanceof GroupByQuery)) { + throw new ISE("Got a [%s] which isn't a %s", query.getClass(), GroupByQuery.class); } - return strategySelector.strategize((GroupByQuery) input).process((GroupByQuery) input, adapter); + return strategySelector.strategize((GroupByQuery) query).process((GroupByQuery) query, adapter); } } } diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java index de6fee13b21a..549f27becdc1 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java @@ -45,9 +45,9 @@ import io.druid.java.util.common.logger.Logger; import io.druid.query.AbstractPrioritizedCallable; import io.druid.query.ChainedExecutionQueryRunner; -import io.druid.query.Query; import io.druid.query.QueryContexts; import io.druid.query.QueryInterruptedException; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryWatcher; import io.druid.query.ResourceLimitExceededException; @@ -105,9 +105,9 @@ public GroupByMergingQueryRunnerV2( } @Override - public Sequence run(final Query queryParam, final Map responseContext) + public Sequence run(final QueryPlus queryPlus, final Map responseContext) { - final GroupByQuery query = (GroupByQuery) queryParam; + final GroupByQuery query = (GroupByQuery) queryPlus.getQuery(); final GroupByQueryConfig querySpecificConfig = config.withOverrides(query); // CTX_KEY_MERGE_RUNNERS_USING_CHAINED_EXECUTION is here because realtime servers use nested mergeRunners calls @@ -119,12 +119,13 @@ public Sequence run(final Query queryParam, final Map CTX_KEY_MERGE_RUNNERS_USING_CHAINED_EXECUTION, false ); - final GroupByQuery queryForRunners = query.withOverriddenContext( - ImmutableMap.of(CTX_KEY_MERGE_RUNNERS_USING_CHAINED_EXECUTION, true) + final QueryPlus queryPlusForRunners = queryPlus.withQuery( + query.withOverriddenContext(ImmutableMap.of(CTX_KEY_MERGE_RUNNERS_USING_CHAINED_EXECUTION, true)) ); if (QueryContexts.isBySegment(query) || forceChainedExecution) { - return new ChainedExecutionQueryRunner(exec, queryWatcher, queryables).run(query, responseContext); + // TODO why query, not queryForRunners? + return new ChainedExecutionQueryRunner(exec, queryWatcher, queryables).run(queryPlus, responseContext); } final boolean isSingleThreaded = querySpecificConfig.isSingleThreaded(); @@ -225,7 +226,7 @@ public AggregateResult call() throws Exception Releaser bufferReleaser = mergeBufferHolder.increment(); Releaser grouperReleaser = grouperHolder.increment() ) { - final AggregateResult retVal = input.run(queryForRunners, responseContext) + final AggregateResult retVal = input.run(queryPlusForRunners, responseContext) .accumulate( AggregateResult.ok(), accumulator diff --git a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java index 9b2c9163818b..810ce253e1be 100644 --- a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java +++ b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java @@ -37,6 +37,7 @@ import io.druid.java.util.common.guava.Sequences; import io.druid.query.GroupByMergedQueryRunner; import io.druid.query.IntervalChunkingQueryRunnerDecorator; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryWatcher; import io.druid.query.aggregation.AggregatorFactory; @@ -119,31 +120,33 @@ public Sequence mergeResults( configSupplier.get(), bufferPool, baseRunner.run( - new GroupByQuery( - query.getDataSource(), - query.getQuerySegmentSpec(), - query.getVirtualColumns(), - query.getDimFilter(), - query.getGranularity(), - query.getDimensions(), - query.getAggregatorSpecs(), - // Don't do post aggs until the end of this method. - ImmutableList.of(), - // Don't do "having" clause until the end of this method. - null, - null, - query.getContext() - ).withOverriddenContext( - ImmutableMap.of( - "finalize", false, - //setting sort to false avoids unnecessary sorting while merging results. we only need to sort - //in the end when returning results to user. (note this is only respected by groupBy v1) - GroupByQueryHelper.CTX_KEY_SORT_RESULTS, false, - //no merging needed at historicals because GroupByQueryRunnerFactory.mergeRunners(..) would return - //merged results. (note this is only respected by groupBy v1) - GroupByQueryQueryToolChest.GROUP_BY_MERGE_KEY, false, - GroupByQueryConfig.CTX_KEY_STRATEGY, GroupByStrategySelector.STRATEGY_V1 - ) + QueryPlus.wrap( + new GroupByQuery( + query.getDataSource(), + query.getQuerySegmentSpec(), + query.getVirtualColumns(), + query.getDimFilter(), + query.getGranularity(), + query.getDimensions(), + query.getAggregatorSpecs(), + // Don't do post aggs until the end of this method. + ImmutableList.of(), + // Don't do "having" clause until the end of this method. + null, + null, + query.getContext() + ).withOverriddenContext( + ImmutableMap.of( + "finalize", false, + //setting sort to false avoids unnecessary sorting while merging results. we only need to sort + //in the end when returning results to user. (note this is only respected by groupBy v1) + GroupByQueryHelper.CTX_KEY_SORT_RESULTS, false, + //no merging needed at historicals because GroupByQueryRunnerFactory.mergeRunners(..) would return + //merged results. (note this is only respected by groupBy v1) + GroupByQueryQueryToolChest.GROUP_BY_MERGE_KEY, false, + GroupByQueryConfig.CTX_KEY_STRATEGY, GroupByStrategySelector.STRATEGY_V1 + ) + ) ), responseContext ), diff --git a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java index e0e7273f572b..84257b67a3d6 100644 --- a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java +++ b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java @@ -48,6 +48,7 @@ import io.druid.query.Query; import io.druid.query.QueryContexts; import io.druid.query.QueryDataSource; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryWatcher; import io.druid.query.ResourceLimitExceededException; @@ -232,27 +233,29 @@ protected BinaryFn createMergeFn(Query queryParam) return query.applyLimit( Sequences.map( mergingQueryRunner.run( - new GroupByQuery( - query.getDataSource(), - query.getQuerySegmentSpec(), - query.getVirtualColumns(), - query.getDimFilter(), - query.getGranularity(), - query.getDimensions(), - query.getAggregatorSpecs(), - // Don't do post aggs until the end of this method. - ImmutableList.of(), - // Don't do "having" clause until the end of this method. - null, - null, - query.getContext() - ).withOverriddenContext( - ImmutableMap.of( - "finalize", false, - GroupByQueryConfig.CTX_KEY_STRATEGY, GroupByStrategySelector.STRATEGY_V2, - CTX_KEY_FUDGE_TIMESTAMP, fudgeTimestamp == null ? "" : String.valueOf(fudgeTimestamp.getMillis()), - CTX_KEY_OUTERMOST, false - ) + QueryPlus.wrap( + new GroupByQuery( + query.getDataSource(), + query.getQuerySegmentSpec(), + query.getVirtualColumns(), + query.getDimFilter(), + query.getGranularity(), + query.getDimensions(), + query.getAggregatorSpecs(), + // Don't do post aggs until the end of this method. + ImmutableList.of(), + // Don't do "having" clause until the end of this method. + null, + null, + query.getContext() + ).withOverriddenContext( + ImmutableMap.of( + "finalize", false, + GroupByQueryConfig.CTX_KEY_STRATEGY, GroupByStrategySelector.STRATEGY_V2, + CTX_KEY_FUDGE_TIMESTAMP, fudgeTimestamp == null ? "" : String.valueOf(fudgeTimestamp.getMillis()), + CTX_KEY_OUTERMOST, false + ) + ) ), responseContext ), @@ -310,7 +313,7 @@ public Sequence processSubqueryResult( return mergeResults(new QueryRunner() { @Override - public Sequence run(Query query, Map responseContext) + public Sequence run(QueryPlus queryPlus, Map responseContext) { return results; } diff --git a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java index a13f87ccd390..5e284b01dbb7 100644 --- a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java @@ -43,6 +43,7 @@ import io.druid.query.GenericQueryMetricsFactory; import io.druid.query.Query; import io.druid.query.QueryMetrics; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryToolChest; import io.druid.query.ResultMergeQueryRunner; @@ -102,13 +103,14 @@ public QueryRunner mergeResults(final QueryRunner doRun( QueryRunner baseRunner, - Query query, + QueryPlus queryPlus, Map context ) { + Query query = queryPlus.getQuery(); return new MappedSequence<>( CombiningSequence.create( - baseRunner.run(query, context), + baseRunner.run(queryPlus, context), makeOrdering(query), createMergeFn(query) ), diff --git a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java index 475d98453df9..5c4aaaceca35 100644 --- a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java @@ -36,6 +36,7 @@ import io.druid.query.Query; import io.druid.query.QueryContexts; import io.druid.query.QueryInterruptedException; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryToolChest; @@ -83,9 +84,9 @@ public QueryRunner createRunner(final Segment segment) return new QueryRunner() { @Override - public Sequence run(Query inQ, Map responseContext) + public Sequence run(QueryPlus inQ, Map responseContext) { - SegmentMetadataQuery query = (SegmentMetadataQuery) inQ; + SegmentMetadataQuery query = (SegmentMetadataQuery) inQ.getQuery(); final SegmentAnalyzer analyzer = new SegmentAnalyzer(query.getAnalysisTypes()); final Map analyzedColumns = analyzer.analyze(segment); final long numRows = analyzer.numRows(segment); @@ -197,10 +198,11 @@ public QueryRunner apply(final QueryRunner inp { @Override public Sequence run( - final Query query, + final QueryPlus queryPlus, final Map responseContext ) { + final Query query = queryPlus.getQuery(); final int priority = QueryContexts.getPriority(query); ListenableFuture> future = queryExecutor.submit( new AbstractPrioritizedCallable>(priority) @@ -209,7 +211,7 @@ public Sequence run( public Sequence call() throws Exception { return Sequences.simple( - Sequences.toList(input.run(query, responseContext), new ArrayList<>()) + Sequences.toList(input.run(queryPlus, responseContext), new ArrayList<>()) ); } } diff --git a/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java index d84a7396e3e6..592418e0a8c8 100644 --- a/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java @@ -42,6 +42,7 @@ import io.druid.query.Query; import io.druid.query.QueryContexts; import io.druid.query.QueryMetrics; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryToolChest; import io.druid.query.Result; @@ -348,14 +349,15 @@ public QueryRunner> preMergeQueryDecoration(final Quer { @Override public Sequence> run( - Query> query, Map responseContext + QueryPlus> queryPlus, Map responseContext ) { - SearchQuery searchQuery = (SearchQuery) query; + SearchQuery searchQuery = (SearchQuery) queryPlus.getQuery(); if (searchQuery.getDimensionsFilter() != null) { searchQuery = searchQuery.withDimFilter(searchQuery.getDimensionsFilter().optimize()); + queryPlus = queryPlus.withQuery(searchQuery); } - return runner.run(searchQuery, responseContext); + return runner.run(queryPlus, responseContext); } } , this), config @@ -378,23 +380,24 @@ public SearchThresholdAdjustingQueryRunner( @Override public Sequence> run( - Query> input, + QueryPlus> queryPlus, Map responseContext ) { + Query> input = queryPlus.getQuery(); if (!(input instanceof SearchQuery)) { throw new ISE("Can only handle [%s], got [%s]", SearchQuery.class, input.getClass()); } final SearchQuery query = (SearchQuery) input; if (query.getLimit() < config.getMaxSearchLimit()) { - return runner.run(query, responseContext); + return runner.run(queryPlus, responseContext); } final boolean isBySegment = QueryContexts.isBySegment(query); return Sequences.map( - runner.run(query.withLimit(config.getMaxSearchLimit()), responseContext), + runner.run(queryPlus.withQuery(query.withLimit(config.getMaxSearchLimit())), responseContext), new Function, Result>() { @Override diff --git a/processing/src/main/java/io/druid/query/search/SearchQueryRunner.java b/processing/src/main/java/io/druid/query/search/SearchQueryRunner.java index cde2e4567dbc..9367efb5f08d 100644 --- a/processing/src/main/java/io/druid/query/search/SearchQueryRunner.java +++ b/processing/src/main/java/io/druid/query/search/SearchQueryRunner.java @@ -30,6 +30,7 @@ import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; import io.druid.query.Query; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.Result; import io.druid.query.dimension.ColumnSelectorStrategy; @@ -186,10 +187,11 @@ public void updateSearchResultSet( @Override public Sequence> run( - final Query> input, + final QueryPlus> queryPlus, Map responseContext ) { + Query> input = queryPlus.getQuery(); if (!(input instanceof SearchQuery)) { throw new ISE("Got a [%s] which isn't a %s", input.getClass(), SearchQuery.class); } diff --git a/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java index 45948efbe71e..7dabd959ec18 100644 --- a/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/select/SelectQueryQueryToolChest.java @@ -43,6 +43,7 @@ import io.druid.query.IntervalChunkingQueryRunnerDecorator; import io.druid.query.Query; import io.druid.query.QueryMetrics; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryToolChest; import io.druid.query.Result; @@ -348,14 +349,15 @@ public QueryRunner> preMergeQueryDecoration(final Quer { @Override public Sequence> run( - Query> query, Map responseContext + QueryPlus> queryPlus, Map responseContext ) { - SelectQuery selectQuery = (SelectQuery) query; + SelectQuery selectQuery = (SelectQuery) queryPlus.getQuery(); if (selectQuery.getDimensionsFilter() != null) { selectQuery = selectQuery.withDimFilter(selectQuery.getDimensionsFilter().optimize()); + queryPlus = queryPlus.withQuery(selectQuery); } - return runner.run(selectQuery, responseContext); + return runner.run(queryPlus, responseContext); } }, this); } diff --git a/processing/src/main/java/io/druid/query/select/SelectQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/select/SelectQueryRunnerFactory.java index 19e4bbd1d2ce..2b6d2359d1a2 100644 --- a/processing/src/main/java/io/druid/query/select/SelectQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/select/SelectQueryRunnerFactory.java @@ -24,6 +24,7 @@ import io.druid.java.util.common.guava.Sequence; import io.druid.query.ChainedExecutionQueryRunner; import io.druid.query.Query; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryToolChest; @@ -90,10 +91,11 @@ private SelectQueryRunner(SelectQueryEngine engine, Segment segment) @Override public Sequence> run( - Query> input, + QueryPlus> queryPlus, Map responseContext ) { + Query> input = queryPlus.getQuery(); if (!(input instanceof SelectQuery)) { throw new ISE("Got a [%s] which isn't a %s", input.getClass(), SelectQuery.class); } diff --git a/processing/src/main/java/io/druid/query/spec/SpecificSegmentQueryRunner.java b/processing/src/main/java/io/druid/query/spec/SpecificSegmentQueryRunner.java index c331ef7556d2..e60c545c5cf3 100644 --- a/processing/src/main/java/io/druid/query/spec/SpecificSegmentQueryRunner.java +++ b/processing/src/main/java/io/druid/query/spec/SpecificSegmentQueryRunner.java @@ -29,6 +29,7 @@ import io.druid.java.util.common.guava.Yielders; import io.druid.java.util.common.guava.YieldingAccumulator; import io.druid.query.Query; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.Result; import io.druid.query.SegmentDescriptor; @@ -55,9 +56,10 @@ public SpecificSegmentQueryRunner( } @Override - public Sequence run(final Query input, final Map responseContext) + public Sequence run(final QueryPlus input, final Map responseContext) { - final Query query = input.withQuerySegmentSpec(specificSpec); + final QueryPlus queryPlus = input.withQuerySegmentSpec(specificSpec); + final Query query = queryPlus.getQuery(); final Thread currThread = Thread.currentThread(); final String currThreadName = currThread.getName(); @@ -69,7 +71,7 @@ public Sequence run(final Query input, final Map responseC @Override public Sequence get() { - return base.run(query, responseContext); + return base.run(queryPlus, responseContext); } } ); diff --git a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java index a6569baed713..8c7abc576d29 100644 --- a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryQueryToolChest.java @@ -32,9 +32,10 @@ import io.druid.query.BySegmentSkippingQueryRunner; import io.druid.query.CacheStrategy; import io.druid.query.DefaultGenericQueryMetricsFactory; +import io.druid.query.GenericQueryMetricsFactory; import io.druid.query.Query; import io.druid.query.QueryMetrics; -import io.druid.query.GenericQueryMetricsFactory; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryToolChest; import io.druid.query.Result; @@ -109,13 +110,15 @@ public QueryRunner> mergeResults( { @Override protected Sequence> doRun( - QueryRunner> baseRunner, Query> input, Map context + QueryRunner> baseRunner, + QueryPlus> input, + Map context ) { - TimeBoundaryQuery query = (TimeBoundaryQuery) input; + TimeBoundaryQuery query = (TimeBoundaryQuery) input.getQuery(); return Sequences.simple( query.mergeResults( - Sequences.toList(baseRunner.run(query, context), Lists.>newArrayList()) + Sequences.toList(baseRunner.run(input, context), Lists.>newArrayList()) ) ); } diff --git a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java index e3b383cc78e3..fd5fe25ca5af 100644 --- a/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java @@ -29,6 +29,7 @@ import io.druid.java.util.common.guava.Sequences; import io.druid.query.ChainedExecutionQueryRunner; import io.druid.query.Query; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryRunnerHelper; @@ -130,10 +131,11 @@ private DateTime getTimeBoundary(StorageAdapter adapter, TimeBoundaryQuery legac @Override public Sequence> run( - final Query> input, + final QueryPlus> queryPlus, final Map responseContext ) { + Query> input = queryPlus.getQuery(); if (!(input instanceof TimeBoundaryQuery)) { throw new ISE("Got a [%s] which isn't a %s", input.getClass(), TimeBoundaryQuery.class); } diff --git a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java index 11f9209bedde..473775b71bc0 100644 --- a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java @@ -33,6 +33,7 @@ import io.druid.query.CacheStrategy; import io.druid.query.IntervalChunkingQueryRunnerDecorator; import io.druid.query.Query; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryToolChest; import io.druid.query.Result; @@ -92,14 +93,14 @@ public QueryRunner> mergeResults( @Override public Sequence> doRun( QueryRunner> baseRunner, - Query> query, + QueryPlus> queryPlus, Map context ) { return super.doRun( baseRunner, // Don't do post aggs until makePostComputeManipulatorFn() is called - ((TimeseriesQuery) query).withPostAggregatorSpecs(ImmutableList.of()), + queryPlus.withQuery(((TimeseriesQuery) queryPlus.getQuery()).withPostAggregatorSpecs(ImmutableList.of())), context ); } @@ -234,14 +235,15 @@ public QueryRunner> preMergeQueryDecoration(final { @Override public Sequence> run( - Query> query, Map responseContext + QueryPlus> queryPlus, Map responseContext ) { - TimeseriesQuery timeseriesQuery = (TimeseriesQuery) query; + TimeseriesQuery timeseriesQuery = (TimeseriesQuery) queryPlus.getQuery(); if (timeseriesQuery.getDimensionsFilter() != null) { timeseriesQuery = timeseriesQuery.withDimFilter(timeseriesQuery.getDimensionsFilter().optimize()); + queryPlus = queryPlus.withQuery(timeseriesQuery); } - return runner.run(timeseriesQuery, responseContext); + return runner.run(queryPlus, responseContext); } }, this); } diff --git a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryRunnerFactory.java index e6ff92b16e21..79922df83e91 100644 --- a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryRunnerFactory.java @@ -24,6 +24,7 @@ import io.druid.java.util.common.guava.Sequence; import io.druid.query.ChainedExecutionQueryRunner; import io.druid.query.Query; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryToolChest; @@ -91,10 +92,11 @@ private TimeseriesQueryRunner(TimeseriesQueryEngine engine, StorageAdapter adapt @Override public Sequence> run( - Query> input, + QueryPlus> queryPlus, Map responseContext ) { + Query> input = queryPlus.getQuery(); if (!(input instanceof TimeseriesQuery)) { throw new ISE("Got a [%s] which isn't a %s", input.getClass(), TimeseriesQuery.class); } diff --git a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java index 7d9be82c6fb4..12e489a85a65 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java @@ -37,6 +37,7 @@ import io.druid.query.IntervalChunkingQueryRunnerDecorator; import io.druid.query.Query; import io.druid.query.QueryContexts; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryToolChest; import io.druid.query.Result; @@ -416,26 +417,27 @@ public QueryRunner> preMergeQueryDecoration(final QueryR { @Override public Sequence> run( - Query> query, Map responseContext + QueryPlus> queryPlus, Map responseContext ) { - TopNQuery topNQuery = (TopNQuery) query; + TopNQuery topNQuery = (TopNQuery) queryPlus.getQuery(); if (topNQuery.getDimensionsFilter() != null) { topNQuery = topNQuery.withDimFilter(topNQuery.getDimensionsFilter().optimize()); } final TopNQuery delegateTopNQuery = topNQuery; if (TopNQueryEngine.canApplyExtractionInPost(delegateTopNQuery)) { final DimensionSpec dimensionSpec = delegateTopNQuery.getDimensionSpec(); - return runner.run( + QueryPlus> delegateQueryPlus = queryPlus.withQuery( delegateTopNQuery.withDimensionSpec( new DefaultDimensionSpec( dimensionSpec.getDimension(), dimensionSpec.getOutputName() ) - ), responseContext + ) ); + return runner.run(delegateQueryPlus, responseContext); } else { - return runner.run(delegateTopNQuery, responseContext); + return runner.run(queryPlus.withQuery(delegateTopNQuery), responseContext); } } } @@ -455,12 +457,12 @@ public QueryRunner> postMergeQueryDecoration(final Query @Override public Sequence> run( - final Query> query, final Map responseContext + final QueryPlus> queryPlus, final Map responseContext ) { // thresholdRunner.run throws ISE if query is not TopNQuery - final Sequence> resultSequence = thresholdRunner.run(query, responseContext); - final TopNQuery topNQuery = (TopNQuery) query; + final Sequence> resultSequence = thresholdRunner.run(queryPlus, responseContext); + final TopNQuery topNQuery = (TopNQuery) queryPlus.getQuery(); if (!TopNQueryEngine.canApplyExtractionInPost(topNQuery)) { return resultSequence; } else { @@ -521,10 +523,11 @@ public ThresholdAdjustingQueryRunner( @Override public Sequence> run( - Query> input, + QueryPlus> queryPlus, Map responseContext ) { + Query> input = queryPlus.getQuery(); if (!(input instanceof TopNQuery)) { throw new ISE("Can only handle [%s], got [%s]", TopNQuery.class, input.getClass()); } @@ -532,13 +535,13 @@ public Sequence> run( final TopNQuery query = (TopNQuery) input; final int minTopNThreshold = query.getContextValue("minTopNThreshold", config.getMinTopNThreshold()); if (query.getThreshold() > minTopNThreshold) { - return runner.run(query, responseContext); + return runner.run(queryPlus, responseContext); } final boolean isBySegment = QueryContexts.isBySegment(query); return Sequences.map( - runner.run(query.withThreshold(minTopNThreshold), responseContext), + runner.run(queryPlus.withQuery(query.withThreshold(minTopNThreshold)), responseContext), new Function, Result>() { @Override diff --git a/processing/src/main/java/io/druid/query/topn/TopNQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/topn/TopNQueryRunnerFactory.java index 35af25e12a60..2e392f0b75e9 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/topn/TopNQueryRunnerFactory.java @@ -25,7 +25,7 @@ import io.druid.java.util.common.ISE; import io.druid.java.util.common.guava.Sequence; import io.druid.query.ChainedExecutionQueryRunner; -import io.druid.query.Query; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryToolChest; @@ -65,15 +65,15 @@ public QueryRunner> createRunner(final Segment segment) { @Override public Sequence> run( - Query> input, + QueryPlus> input, Map responseContext ) { - if (!(input instanceof TopNQuery)) { + if (!(input.getQuery() instanceof TopNQuery)) { throw new ISE("Got a [%s] which isn't a %s", input.getClass(), TopNQuery.class); } - return queryEngine.query((TopNQuery) input, segment.asStorageAdapter()); + return queryEngine.query((TopNQuery) input.getQuery(), segment.asStorageAdapter()); } }; diff --git a/processing/src/test/java/io/druid/query/AsyncQueryRunnerTest.java b/processing/src/test/java/io/druid/query/AsyncQueryRunnerTest.java index b6c074e7eeb6..a5761454f635 100644 --- a/processing/src/test/java/io/druid/query/AsyncQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/AsyncQueryRunnerTest.java @@ -61,7 +61,7 @@ public void testAsyncNature() throws Exception { QueryRunner baseRunner = new QueryRunner() { @Override - public Sequence run(Query query, Map responseContext) + public Sequence run(QueryPlus queryPlus, Map responseContext) { try { latch.await(); @@ -85,7 +85,7 @@ public void testQueryTimeoutHonored() { QueryRunner baseRunner = new QueryRunner() { @Override - public Sequence run(Query query, Map responseContext) + public Sequence run(QueryPlus queryPlus, Map responseContext) { try { Thread.sleep(Long.MAX_VALUE); @@ -117,7 +117,7 @@ public void testQueryRegistration() { QueryRunner baseRunner = new QueryRunner() { @Override - public Sequence run(Query query, Map responseContext) { return null; } + public Sequence run(QueryPlus queryPlus, Map responseContext) { return null; } }; QueryWatcher mock = EasyMock.createMock(QueryWatcher.class); diff --git a/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java b/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java index 311aaaa798f9..935957eb079d 100644 --- a/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java @@ -330,7 +330,7 @@ public DyingQueryRunner(CountDownLatch start, CountDownLatch stop, Queue run(Query query, Map responseContext) + public Sequence run(QueryPlus queryPlus, Map responseContext) { // do a lot of work synchronized (this) { diff --git a/processing/src/test/java/io/druid/query/IntervalChunkingQueryRunnerTest.java b/processing/src/test/java/io/druid/query/IntervalChunkingQueryRunnerTest.java index 194b12e6bdc5..6ec306db2fc8 100644 --- a/processing/src/test/java/io/druid/query/IntervalChunkingQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/IntervalChunkingQueryRunnerTest.java @@ -60,13 +60,13 @@ public void setup() { @Test public void testDefaultNoChunking() { - Query query = queryBuilder.intervals("2014/2016").build(); + QueryPlus queryPlus = QueryPlus.wrap(queryBuilder.intervals("2014/2016").build()); - EasyMock.expect(baseRunner.run(query, Collections.EMPTY_MAP)).andReturn(Sequences.empty()); + EasyMock.expect(baseRunner.run(queryPlus, Collections.EMPTY_MAP)).andReturn(Sequences.empty()); EasyMock.replay(baseRunner); QueryRunner runner = decorator.decorate(baseRunner, toolChest); - runner.run(query, Collections.EMPTY_MAP); + runner.run(queryPlus, Collections.EMPTY_MAP); EasyMock.verify(baseRunner); } diff --git a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java index c1206adae34d..a24d92b5a4e2 100644 --- a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java +++ b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java @@ -501,9 +501,9 @@ public static QueryRunner makeUnionQueryRunner( return new QueryRunner() { @Override - public Sequence run(Query query, Map responseContext) + public Sequence run(QueryPlus queryPlus, Map responseContext) { - return qr.run(query, responseContext); + return qr.run(queryPlus, responseContext); } @Override @@ -526,8 +526,9 @@ public static QueryRunner makeFilteringQueryRunner( new QueryRunner() { @Override - public Sequence run(Query query, Map responseContext) + public Sequence run(QueryPlus queryPlus, Map responseContext) { + Query query = queryPlus.getQuery(); List segments = Lists.newArrayList(); for (Interval interval : query.getIntervals()) { segments.addAll(timeline.lookup(interval)); @@ -535,7 +536,7 @@ public Sequence run(Query query, Map responseContext) List> sequences = Lists.newArrayList(); for (TimelineObjectHolder holder : toolChest.filterSegments(query, segments)) { Segment segment = holder.getObject().getChunk(0).getObject(); - Query running = query.withQuerySegmentSpec( + QueryPlus queryPlusRunning = queryPlus.withQuerySegmentSpec( new SpecificSegmentSpec( new SegmentDescriptor( holder.getInterval(), @@ -544,7 +545,7 @@ public Sequence run(Query query, Map responseContext) ) ) ); - sequences.add(factory.createRunner(segment).run(running, responseContext)); + sequences.add(factory.createRunner(segment).run(queryPlusRunning, responseContext)); } return new MergeSequence<>(query.getResultOrdering(), Sequences.simple(sequences)); } @@ -568,9 +569,9 @@ public QueryRunner decorate( return new QueryRunner() { @Override - public Sequence run(Query query, Map responseContext) + public Sequence run(QueryPlus queryPlus, Map responseContext) { - return delegate.run(query, responseContext); + return delegate.run(queryPlus, responseContext); } }; } diff --git a/processing/src/test/java/io/druid/query/RetryQueryRunnerTest.java b/processing/src/test/java/io/druid/query/RetryQueryRunnerTest.java index d7ae4b8b7c1c..40f1fff4ad1a 100644 --- a/processing/src/test/java/io/druid/query/RetryQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/RetryQueryRunnerTest.java @@ -71,7 +71,7 @@ public void testRunWithMissingSegments() throws Exception new QueryRunner>() { @Override - public Sequence> run(Query query, Map context) + public Sequence> run(QueryPlus queryPlus, Map context) { ((List) context.get(Result.MISSING_SEGMENTS_KEY)).add( new SegmentDescriptor( @@ -128,7 +128,7 @@ public void testRetry() throws Exception { @Override public Sequence> run( - Query> query, + QueryPlus> queryPlus, Map context ) { @@ -195,7 +195,7 @@ public void testRetryMultiple() throws Exception { @Override public Sequence> run( - Query> query, + QueryPlus> queryPlus, Map context ) { @@ -261,7 +261,7 @@ public void testException() throws Exception { @Override public Sequence> run( - Query> query, + QueryPlus> queryPlus, Map context ) { @@ -313,10 +313,11 @@ public void testNoDuplicateRetry() throws Exception { @Override public Sequence> run( - Query> query, + QueryPlus> queryPlus, Map context ) { + final Query> query = queryPlus.getQuery(); if ((int) context.get("count") == 0) { // assume 2 missing segments at first run ((List) context.get(Result.MISSING_SEGMENTS_KEY)).add( diff --git a/processing/src/test/java/io/druid/query/TimewarpOperatorTest.java b/processing/src/test/java/io/druid/query/TimewarpOperatorTest.java index 2f8ae2181088..133e435ed97f 100644 --- a/processing/src/test/java/io/druid/query/TimewarpOperatorTest.java +++ b/processing/src/test/java/io/druid/query/TimewarpOperatorTest.java @@ -81,7 +81,7 @@ public void testPostProcess() throws Exception { @Override public Sequence> run( - Query> query, + QueryPlus> queryPlus, Map responseContext ) { @@ -96,7 +96,7 @@ public Sequence> run( new TimeseriesResultValue(ImmutableMap.of("metric", 3)) ), new Result<>( - query.getIntervals().get(0).getEnd(), + queryPlus.getQuery().getIntervals().get(0).getEnd(), new TimeseriesResultValue(ImmutableMap.of("metric", 5)) ) ) @@ -143,7 +143,7 @@ public Sequence> run( { @Override public Sequence> run( - Query> query, + QueryPlus> queryPlus, Map responseContext ) { @@ -193,10 +193,11 @@ public void testEmptyFutureInterval() throws Exception { @Override public Sequence> run( - Query> query, + QueryPlus> queryPlus, Map responseContext ) { + final Query> query = queryPlus.getQuery(); return Sequences.simple( ImmutableList.of( new Result<>( diff --git a/processing/src/test/java/io/druid/query/UnionQueryRunnerTest.java b/processing/src/test/java/io/druid/query/UnionQueryRunnerTest.java index b7b914fe5ab4..250006458ba1 100644 --- a/processing/src/test/java/io/druid/query/UnionQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/UnionQueryRunnerTest.java @@ -39,11 +39,11 @@ public void testUnionQueryRunner() QueryRunner baseRunner = new QueryRunner() { @Override - public Sequence run(Query query, Map responseContext) + public Sequence run(QueryPlus queryPlus, Map responseContext) { // verify that table datasource is passed to baseQueryRunner - Assert.assertTrue(query.getDataSource() instanceof TableDataSource); - String dsName = Iterables.getOnlyElement(query.getDataSource().getNames()); + Assert.assertTrue(queryPlus.getQuery().getDataSource() instanceof TableDataSource); + String dsName = Iterables.getOnlyElement(queryPlus.getQuery().getDataSource().getNames()); if (dsName.equals("ds1")) { responseContext.put("ds1", "ds1"); return Sequences.simple(Arrays.asList(1, 2, 3)); @@ -70,7 +70,7 @@ public Sequence run(Query query, Map responseContext) .aggregators(QueryRunnerTestHelper.commonAggregators) .build(); Map responseContext = Maps.newHashMap(); - Sequence result = runner.run(q, responseContext); + Sequence result = runner.run(q, responseContext); List res = Sequences.toList(result, Lists.newArrayList()); Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6), res); diff --git a/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java b/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java index df73615e9911..a782e39c5b72 100644 --- a/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java +++ b/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java @@ -50,6 +50,7 @@ import io.druid.java.util.common.guava.YieldingAccumulator; import io.druid.query.FinalizeResultsQueryRunner; import io.druid.query.Query; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryRunnerTestHelper; @@ -535,10 +536,10 @@ public QueryRunner makeStringSerdeQueryRunner(final ObjectMapper mapper, fi return new QueryRunner() { @Override - public Sequence run(Query query, Map map) + public Sequence run(QueryPlus queryPlus, Map map) { try { - Sequence resultSeq = baseRunner.run(query, Maps.newHashMap()); + Sequence resultSeq = baseRunner.run(queryPlus, Maps.newHashMap()); final Yielder yielder = resultSeq.toYielder( null, new YieldingAccumulator() @@ -559,7 +560,7 @@ public Object accumulate(Object accumulated, Object in) List resultRows = Lists.transform( readQueryResultArrayFromString(resultStr), toolChest.makePreComputeManipulatorFn( - query, + queryPlus.getQuery(), MetricManipulatorFns.deserializing() ) ); diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFactoryTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFactoryTest.java index dc7648154da2..26df8376601a 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFactoryTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFactoryTest.java @@ -33,6 +33,7 @@ import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; import io.druid.query.Query; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.aggregation.AggregatorFactory; @@ -86,21 +87,22 @@ public void testMergeRunnersEnsureGroupMerging() throws Exception new QueryRunner() { @Override - public Sequence run(Query query, Map responseContext) + public Sequence run(QueryPlus queryPlus, Map responseContext) { return factory.getToolchest().mergeResults( new QueryRunner() { @Override - public Sequence run(Query query, Map responseContext) + public Sequence run(QueryPlus queryPlus, Map responseContext) { + final Query query = queryPlus.getQuery(); try { return new MergeSequence( query.getResultOrdering(), Sequences.simple( Arrays.asList( - factory.createRunner(createSegment()).run(query, responseContext), - factory.createRunner(createSegment()).run(query, responseContext) + factory.createRunner(createSegment()).run(queryPlus, responseContext), + factory.createRunner(createSegment()).run(queryPlus, responseContext) ) ) ); @@ -110,7 +112,7 @@ public Sequence run(Query query, Map responseContext) } } } - ).run(query, responseContext); + ).run(queryPlus, responseContext); } } ); diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java index dfcc6d2b0982..9171285dcdef 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java @@ -51,9 +51,9 @@ import io.druid.query.DruidProcessingConfig; import io.druid.query.Druids; import io.druid.query.FinalizeResultsQueryRunner; -import io.druid.query.Query; import io.druid.query.QueryContexts; import io.druid.query.QueryDataSource; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerTestHelper; import io.druid.query.QueryToolChest; @@ -2353,20 +2353,20 @@ public void testMergeResults() { @Override public Sequence run( - Query query, Map responseContext + QueryPlus queryPlus, Map responseContext ) { // simulate two daily segments - final Query query1 = query.withQuerySegmentSpec( + final QueryPlus queryPlus1 = queryPlus.withQuerySegmentSpec( new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-04-02/2011-04-03"))) ); - final Query query2 = query.withQuerySegmentSpec( + final QueryPlus queryPlus2 = queryPlus.withQuerySegmentSpec( new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-04-03/2011-04-04"))) ); return new MergeSequence( - query.getResultOrdering(), + queryPlus.getQuery().getResultOrdering(), Sequences.simple( - Arrays.asList(runner.run(query1, responseContext), runner.run(query2, responseContext)) + Arrays.asList(runner.run(queryPlus1, responseContext), runner.run(queryPlus2, responseContext)) ) ); } @@ -2649,20 +2649,20 @@ private void doTestMergeResultsWithOrderBy(LimitSpec orderBySpec, List expe { @Override public Sequence run( - Query query, Map responseContext + QueryPlus queryPlus, Map responseContext ) { // simulate two daily segments - final Query query1 = query.withQuerySegmentSpec( + final QueryPlus queryPlus1 = queryPlus.withQuerySegmentSpec( new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-04-02/2011-04-03"))) ); - final Query query2 = query.withQuerySegmentSpec( + final QueryPlus queryPlus2 = queryPlus.withQuerySegmentSpec( new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-04-03/2011-04-04"))) ); return new MergeSequence( - query.getResultOrdering(), + queryPlus.getQuery().getResultOrdering(), Sequences.simple( - Arrays.asList(runner.run(query1, responseContext), runner.run(query2, responseContext)) + Arrays.asList(runner.run(queryPlus1, responseContext), runner.run(queryPlus2, responseContext)) ) ); } @@ -3437,20 +3437,20 @@ public void testPostAggMergedHavingSpec() { @Override public Sequence run( - Query query, Map responseContext + QueryPlus queryPlus, Map responseContext ) { // simulate two daily segments - final Query query1 = query.withQuerySegmentSpec( + final QueryPlus queryPlus1 = queryPlus.withQuerySegmentSpec( new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-04-02/2011-04-03"))) ); - final Query query2 = query.withQuerySegmentSpec( + final QueryPlus queryPlus2 = queryPlus.withQuerySegmentSpec( new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-04-03/2011-04-04"))) ); return new MergeSequence( - query.getResultOrdering(), + queryPlus.getQuery().getResultOrdering(), Sequences.simple( - Arrays.asList(runner.run(query1, responseContext), runner.run(query2, responseContext)) + Arrays.asList(runner.run(queryPlus1, responseContext), runner.run(queryPlus2, responseContext)) ) ); } @@ -3770,20 +3770,20 @@ public void testMergedHavingSpec() { @Override public Sequence run( - Query query, Map responseContext + QueryPlus queryPlus, Map responseContext ) { // simulate two daily segments - final Query query1 = query.withQuerySegmentSpec( + final QueryPlus queryPlus1 = queryPlus.withQuerySegmentSpec( new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-04-02/2011-04-03"))) ); - final Query query2 = query.withQuerySegmentSpec( + final QueryPlus queryPlus2 = queryPlus.withQuerySegmentSpec( new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-04-03/2011-04-04"))) ); return new MergeSequence( - query.getResultOrdering(), + queryPlus.getQuery().getResultOrdering(), Sequences.simple( - Arrays.asList(runner.run(query1, responseContext), runner.run(query2, responseContext)) + Arrays.asList(runner.run(queryPlus1, responseContext), runner.run(queryPlus2, responseContext)) ) ); } @@ -3879,20 +3879,20 @@ public void testMergedPostAggHavingSpec() { @Override public Sequence run( - Query query, Map responseContext + QueryPlus queryPlus, Map responseContext ) { // simulate two daily segments - final Query query1 = query.withQuerySegmentSpec( + final QueryPlus queryPlus1 = queryPlus.withQuerySegmentSpec( new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-04-02/2011-04-03"))) ); - final Query query2 = query.withQuerySegmentSpec( + final QueryPlus queryPlus2 = queryPlus.withQuerySegmentSpec( new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-04-03/2011-04-04"))) ); return new MergeSequence( - query.getResultOrdering(), + queryPlus.getQuery().getResultOrdering(), Sequences.simple( - Arrays.asList(runner.run(query1, responseContext), runner.run(query2, responseContext)) + Arrays.asList(runner.run(queryPlus1, responseContext), runner.run(queryPlus2, responseContext)) ) ); } diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java index 4ac25c7499b7..09d121cedcd9 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java @@ -30,7 +30,7 @@ import io.druid.java.util.common.guava.Sequences; import io.druid.query.Druids; import io.druid.query.FinalizeResultsQueryRunner; -import io.druid.query.Query; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerTestHelper; import io.druid.query.QueryToolChest; @@ -76,9 +76,9 @@ public Object apply(final QueryRunner input) return new QueryRunner() { @Override - public Sequence run(Query query, Map responseContext) + public Sequence run(QueryPlus queryPlus, Map responseContext) { - TimeseriesQuery tsQuery = (TimeseriesQuery) query; + TimeseriesQuery tsQuery = (TimeseriesQuery) queryPlus.getQuery(); QueryRunner newRunner = factory.mergeRunners( MoreExecutors.sameThreadExecutor(), ImmutableList.>of(input) ); diff --git a/processing/src/test/java/io/druid/query/search/SearchQueryRunnerTest.java b/processing/src/test/java/io/druid/query/search/SearchQueryRunnerTest.java index 527a3f0fa662..1d23741d327a 100644 --- a/processing/src/test/java/io/druid/query/search/SearchQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/search/SearchQueryRunnerTest.java @@ -30,6 +30,7 @@ import io.druid.js.JavaScriptConfig; import io.druid.query.Druids; import io.druid.query.Query; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryRunnerTestHelper; @@ -168,16 +169,16 @@ public void testSearchWithCardinality() { @Override public Sequence> run( - Query> query, Map responseContext + QueryPlus> queryPlus, Map responseContext ) { - final Query> query1 = searchQuery.withQuerySegmentSpec( + final QueryPlus> queryPlus1 = queryPlus.withQuerySegmentSpec( new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-01-12/2011-02-28"))) ); - final Query> query2 = searchQuery.withQuerySegmentSpec( + final QueryPlus> queryPlus2 = queryPlus.withQuerySegmentSpec( new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-03-01/2011-04-15"))) ); - return Sequences.concat(runner.run(query1, responseContext), runner.run(query2, responseContext)); + return Sequences.concat(runner.run(queryPlus1, responseContext), runner.run(queryPlus2, responseContext)); } } ); diff --git a/processing/src/test/java/io/druid/query/spec/SpecificSegmentQueryRunnerTest.java b/processing/src/test/java/io/druid/query/spec/SpecificSegmentQueryRunnerTest.java index 04bf6d66dc82..b4f5bcc92a9a 100644 --- a/processing/src/test/java/io/druid/query/spec/SpecificSegmentQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/spec/SpecificSegmentQueryRunnerTest.java @@ -32,7 +32,7 @@ import io.druid.java.util.common.guava.Yielder; import io.druid.java.util.common.guava.YieldingAccumulator; import io.druid.query.Druids; -import io.druid.query.Query; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.Result; import io.druid.query.SegmentDescriptor; @@ -68,7 +68,7 @@ public void testRetry() throws Exception new QueryRunner() { @Override - public Sequence run(Query query, Map responseContext) + public Sequence run(QueryPlus queryPlus, Map responseContext) { return new Sequence() { @@ -151,7 +151,7 @@ public void testRetry2() throws Exception new QueryRunner() { @Override - public Sequence run(Query query, Map responseContext) + public Sequence run(QueryPlus queryPlus, Map responseContext) { return Sequences.withEffect( Sequences.simple(Arrays.asList(value)), diff --git a/processing/src/test/java/io/druid/query/timeseries/TimeSeriesUnionQueryRunnerTest.java b/processing/src/test/java/io/druid/query/timeseries/TimeSeriesUnionQueryRunnerTest.java index 69a1ea435154..b69072cd543b 100644 --- a/processing/src/test/java/io/druid/query/timeseries/TimeSeriesUnionQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/timeseries/TimeSeriesUnionQueryRunnerTest.java @@ -25,7 +25,7 @@ import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; import io.druid.query.Druids; -import io.druid.query.Query; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerTestHelper; import io.druid.query.QueryToolChest; @@ -186,11 +186,11 @@ public void testUnionResultMerging() { @Override public Sequence> run( - Query> query, + QueryPlus> queryPlus, Map responseContext ) { - if (query.getDataSource().equals(new TableDataSource("ds1"))) { + if (queryPlus.getQuery().getDataSource().equals(new TableDataSource("ds1"))) { return Sequences.simple(descending ? Lists.reverse(ds1) : ds1); } else { return Sequences.simple(descending ? Lists.reverse(ds2) : ds2); diff --git a/processing/src/test/java/io/druid/query/topn/TopNQueryQueryToolChestTest.java b/processing/src/test/java/io/druid/query/topn/TopNQueryQueryToolChestTest.java index 9e59eb11cde3..0a063aeb59ab 100644 --- a/processing/src/test/java/io/druid/query/topn/TopNQueryQueryToolChestTest.java +++ b/processing/src/test/java/io/druid/query/topn/TopNQueryQueryToolChestTest.java @@ -27,7 +27,7 @@ import io.druid.java.util.common.granularity.Granularities; import io.druid.java.util.common.guava.Sequence; import io.druid.query.CacheStrategy; -import io.druid.query.Query; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryRunnerTestHelper; @@ -243,11 +243,11 @@ static class MockQueryRunner implements QueryRunner> @Override public Sequence> run( - Query> query, + QueryPlus> queryPlus, Map responseContext ) { - this.query = (TopNQuery) query; + this.query = (TopNQuery) queryPlus.getQuery(); return query.run(runner, responseContext); } } diff --git a/server/src/main/java/io/druid/client/CachingClusteredClient.java b/server/src/main/java/io/druid/client/CachingClusteredClient.java index c86123b0ec85..abacb196b105 100644 --- a/server/src/main/java/io/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/io/druid/client/CachingClusteredClient.java @@ -59,6 +59,7 @@ import io.druid.query.CacheStrategy; import io.druid.query.Query; import io.druid.query.QueryContexts; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryToolChest; import io.druid.query.QueryToolChestWarehouse; @@ -139,8 +140,9 @@ public ServerView.CallbackAction segmentRemoved(DruidServerMetadata server, Data } @Override - public Sequence run(final Query query, final Map responseContext) + public Sequence run(final QueryPlus queryPlus, final Map responseContext) { + final Query query = queryPlus.getQuery(); final QueryToolChest> toolChest = warehouse.getToolChest(query); final CacheStrategy> strategy = toolChest.getCacheStrategy(query); @@ -429,17 +431,12 @@ private void addSequencesFromServer(ArrayList> listOfSequences) final Sequence resultSeqToAdd; if (!server.isAssignable() || !populateCache || isBySegment) { // Direct server queryable if (!isBySegment) { - resultSeqToAdd = clientQueryable.run(query.withQuerySegmentSpec(segmentSpec), responseContext); + resultSeqToAdd = clientQueryable.run(queryPlus.withQuerySegmentSpec(segmentSpec), responseContext); } else { // bySegment queries need to be de-serialized, see DirectDruidClient.run() - - @SuppressWarnings("unchecked") - final Query>> bySegmentQuery = - (Query>>) ((Query) query); - @SuppressWarnings("unchecked") final Sequence>> resultSequence = clientQueryable.run( - bySegmentQuery.withQuerySegmentSpec(segmentSpec), + queryPlus.withQuerySegmentSpec(segmentSpec), responseContext ); @@ -472,7 +469,7 @@ public Result> apply(Result>> runningSequence = clientQueryable.run( - rewrittenQuery.withQuerySegmentSpec(segmentSpec), + queryPlus.withQuery(rewrittenQuery.withQuerySegmentSpec(segmentSpec)), responseContext ); resultSeqToAdd = new MergeSequence( diff --git a/server/src/main/java/io/druid/client/CachingQueryRunner.java b/server/src/main/java/io/druid/client/CachingQueryRunner.java index 418098c9d55f..17df72a59967 100644 --- a/server/src/main/java/io/druid/client/CachingQueryRunner.java +++ b/server/src/main/java/io/druid/client/CachingQueryRunner.java @@ -38,6 +38,7 @@ import io.druid.java.util.common.logger.Logger; import io.druid.query.CacheStrategy; import io.druid.query.Query; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryToolChest; import io.druid.query.SegmentDescriptor; @@ -83,8 +84,9 @@ public CachingQueryRunner( } @Override - public Sequence run(Query query, Map responseContext) + public Sequence run(QueryPlus queryPlus, Map responseContext) { + Query query = queryPlus.getQuery(); final CacheStrategy strategy = toolChest.getCacheStrategy(query); final boolean populateCache = CacheUtil.populateCacheOnDataNodes(query, strategy, cacheConfig); final boolean useCache = CacheUtil.useCacheOnDataNodes(query, strategy, cacheConfig); @@ -145,7 +147,7 @@ public void cleanup(Iterator iterFromMake) return Sequences.withEffect( Sequences.map( - base.run(query, responseContext), + base.run(queryPlus, responseContext), new Function() { @Override @@ -190,7 +192,7 @@ public void run() backgroundExecutorService ); } else { - return base.run(query, responseContext); + return base.run(queryPlus, responseContext); } } diff --git a/server/src/main/java/io/druid/client/DirectDruidClient.java b/server/src/main/java/io/druid/client/DirectDruidClient.java index d8dc7882a74b..97b06da6fc41 100644 --- a/server/src/main/java/io/druid/client/DirectDruidClient.java +++ b/server/src/main/java/io/druid/client/DirectDruidClient.java @@ -55,6 +55,7 @@ import io.druid.query.QueryContexts; import io.druid.query.QueryInterruptedException; import io.druid.query.QueryMetrics; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryToolChest; import io.druid.query.QueryToolChestWarehouse; @@ -131,8 +132,9 @@ public int getNumOpenConnections() } @Override - public Sequence run(final Query query, final Map context) + public Sequence run(final QueryPlus queryPlus, final Map context) { + final Query query = queryPlus.getQuery(); QueryToolChest> toolChest = warehouse.getToolChest(query); boolean isBySegment = QueryContexts.isBySegment(query); diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorPlumber.java b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorPlumber.java index 4f94c4f3c3f1..411d7500f930 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorPlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorPlumber.java @@ -31,16 +31,16 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.metamx.emitter.EmittingLogger; - import io.druid.common.guava.ThreadRenamingCallable; import io.druid.concurrent.Execs; import io.druid.data.input.Committer; import io.druid.data.input.InputRow; -import io.druid.java.util.common.granularity.Granularity; import io.druid.java.util.common.ISE; import io.druid.java.util.common.concurrent.ScheduledExecutors; +import io.druid.java.util.common.granularity.Granularity; import io.druid.java.util.common.guava.Sequence; import io.druid.query.Query; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.segment.incremental.IndexSizeExceededException; import io.druid.segment.indexing.DataSchema; @@ -54,7 +54,6 @@ import io.druid.segment.realtime.plumber.VersioningPolicy; import io.druid.server.coordination.DataSegmentAnnouncer; import io.druid.timeline.DataSegment; - import org.joda.time.DateTime; import org.joda.time.Duration; import org.joda.time.Interval; @@ -169,9 +168,9 @@ public QueryRunner getQueryRunner(final Query query) return new QueryRunner() { @Override - public Sequence run(final Query query, final Map responseContext) + public Sequence run(final QueryPlus queryPlus, final Map responseContext) { - return query.run(appenderator, responseContext); + return queryPlus.run(appenderator, responseContext); } }; } diff --git a/server/src/main/java/io/druid/server/QueryResource.java b/server/src/main/java/io/druid/server/QueryResource.java index 554cc34de83c..023429fc7453 100644 --- a/server/src/main/java/io/druid/server/QueryResource.java +++ b/server/src/main/java/io/druid/server/QueryResource.java @@ -42,6 +42,7 @@ import io.druid.query.Query; import io.druid.query.QueryInterruptedException; import io.druid.query.QueryMetrics; +import io.druid.query.QueryPlus; import io.druid.query.QuerySegmentWalker; import io.druid.query.QueryToolChest; import io.druid.query.QueryToolChestWarehouse; @@ -226,7 +227,7 @@ public Response doPost( } final Map responseContext = new MapMaker().makeMap(); - final Sequence res = query.run(texasRanger, responseContext); + final Sequence res = QueryPlus.wrap(query).run(texasRanger, responseContext); if (prevEtag != null && prevEtag.equals(responseContext.get(HDR_ETAG))) { return Response.notModified().build(); diff --git a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java index b81f1ee1b9a8..54486c9c578d 100644 --- a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java @@ -71,6 +71,7 @@ import io.druid.query.FinalizeResultsQueryRunner; import io.druid.query.MapQueryToolChestWarehouse; import io.druid.query.Query; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerTestHelper; import io.druid.query.QueryToolChest; @@ -1781,7 +1782,7 @@ client, new TimeseriesQueryQueryToolChest( timeline.add(interval2, "v", new StringPartitionChunk<>("d", null, 5, selector5)); timeline.add(interval3, "v", new StringPartitionChunk<>(null, null, 6, selector6)); - final Capture capture = Capture.newInstance(); + final Capture capture = Capture.newInstance(); final Capture> contextCap = Capture.newInstance(); QueryRunner mockRunner = EasyMock.createNiceMock(QueryRunner.class); @@ -1801,12 +1802,9 @@ client, new TimeseriesQueryQueryToolChest( descriptors.add(new SegmentDescriptor(interval3, "v", 6)); MultipleSpecificSegmentSpec expected = new MultipleSpecificSegmentSpec(descriptors); - Sequences.toList(runner.run( - query, - context - ), Lists.newArrayList()); + Sequences.toList(runner.run(QueryPlus.wrap(query), context), Lists.newArrayList()); - Assert.assertEquals(expected, capture.getValue().getQuerySegmentSpec()); + Assert.assertEquals(expected, ((TimeseriesQuery) capture.getValue().getQuery()).getQuerySegmentSpec()); } private ServerSelector makeMockSingleDimensionSelector( @@ -1923,7 +1921,7 @@ public void testQueryCachingWithFilter( .andReturn(expectations.getQueryRunner()) .times(0, 1); - final Capture capture = new Capture(); + final Capture capture = new Capture(); final Capture context = new Capture(); QueryRunner queryable = expectations.getQueryRunner(); @@ -1940,7 +1938,7 @@ public void testQueryCachingWithFilter( @Override public Sequence answer() throws Throwable { - return toFilteredQueryableTimeseriesResults((TimeseriesQuery)capture.getValue(), segmentIds, queryIntervals, results); + return toFilteredQueryableTimeseriesResults((TimeseriesQuery)capture.getValue().getQuery(), segmentIds, queryIntervals, results); } }) .times(0, 1); @@ -1965,10 +1963,12 @@ public void run() TestHelper.assertExpectedResults( expected, runner.run( - query.withQuerySegmentSpec( - new MultipleIntervalSegmentSpec( - ImmutableList.of( - actualQueryInterval + QueryPlus.wrap( + query.withQuerySegmentSpec( + new MultipleIntervalSegmentSpec( + ImmutableList.of( + actualQueryInterval + ) ) ) ), @@ -2062,7 +2062,7 @@ public void testQueryCaching( .andReturn(expectations.getQueryRunner()) .once(); - final Capture capture = new Capture(); + final Capture capture = new Capture(); final Capture context = new Capture(); queryCaptures.add(capture); QueryRunner queryable = expectations.getQueryRunner(); @@ -2210,7 +2210,8 @@ public Iterable>> apply(@Nullable Integer input) // make sure all the queries were sent down as 'bySegment' for (Capture queryCapture : queryCaptures) { - Query capturedQuery = (Query) queryCapture.getValue(); + QueryPlus capturedQueryPlus = (QueryPlus) queryCapture.getValue(); + Query capturedQuery = capturedQueryPlus.getQuery(); if (expectBySegment) { Assert.assertEquals(true, capturedQuery.getContextValue("bySegment")); } else { diff --git a/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java b/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java index af57a5e97acb..d2e2d365c911 100644 --- a/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java +++ b/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java @@ -39,6 +39,7 @@ import io.druid.query.CacheStrategy; import io.druid.query.Druids; import io.druid.query.Query; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerTestHelper; import io.druid.query.QueryToolChest; @@ -269,7 +270,7 @@ public void doMonitor(ServiceEmitter emitter) new QueryRunner() { @Override - public Sequence run(Query query, Map responseContext) + public Sequence run(QueryPlus queryPlus, Map responseContext) { return resultSeq; } @@ -362,7 +363,7 @@ private void testUseCache( new QueryRunner() { @Override - public Sequence run(Query query, Map responseContext) + public Sequence run(QueryPlus queryPlus, Map responseContext) { return Sequences.empty(); } diff --git a/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java b/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java index 7656e7583af9..502156e5e907 100644 --- a/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java +++ b/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java @@ -47,6 +47,7 @@ import io.druid.query.NoopQueryRunner; import io.druid.query.Query; import io.druid.query.QueryMetrics; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryRunnerFactoryConglomerate; @@ -679,9 +680,9 @@ public BlockingQueryRunner( } @Override - public Sequence run(Query query, Map responseContext) + public Sequence run(QueryPlus queryPlus, Map responseContext) { - return new BlockingSequence(runner.run(query, responseContext), waitLatch, waitYieldLatch, notifyLatch); + return new BlockingSequence<>(runner.run(queryPlus, responseContext), waitLatch, waitYieldLatch, notifyLatch); } } diff --git a/services/src/main/java/io/druid/cli/DumpSegment.java b/services/src/main/java/io/druid/cli/DumpSegment.java index fa14574cfca8..8901a4401909 100644 --- a/services/src/main/java/io/druid/cli/DumpSegment.java +++ b/services/src/main/java/io/druid/cli/DumpSegment.java @@ -51,6 +51,7 @@ import io.druid.java.util.common.logger.Logger; import io.druid.query.DruidProcessingConfig; import io.druid.query.Query; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryRunnerFactoryConglomerate; @@ -471,7 +472,7 @@ private static Sequence executeQuery(final Injector injector, final Query final QueryRunner runner = factory.createRunner(new QueryableIndexSegment("segment", index)); final Sequence results = factory.getToolchest().mergeResults( factory.mergeRunners(MoreExecutors.sameThreadExecutor(), ImmutableList.of(runner)) - ).run(query, Maps.newHashMap()); + ).run(QueryPlus.wrap(query), Maps.newHashMap()); return (Sequence) results; } diff --git a/sql/src/main/java/io/druid/sql/calcite/rel/QueryMaker.java b/sql/src/main/java/io/druid/sql/calcite/rel/QueryMaker.java index 5cbf91d4b182..c0eba66180d7 100644 --- a/sql/src/main/java/io/druid/sql/calcite/rel/QueryMaker.java +++ b/sql/src/main/java/io/druid/sql/calcite/rel/QueryMaker.java @@ -32,6 +32,7 @@ import io.druid.java.util.common.guava.Sequences; import io.druid.query.DataSource; import io.druid.query.QueryDataSource; +import io.druid.query.QueryPlus; import io.druid.query.QuerySegmentWalker; import io.druid.query.Result; import io.druid.query.dimension.DimensionSpec; @@ -193,7 +194,7 @@ public Sequence next() return Sequences.concat( Sequences.map( - queryWithPagination.run(walker, Maps.newHashMap()), + QueryPlus.wrap(queryWithPagination).run(walker, Maps.newHashMap()), new Function, Sequence>() { @Override @@ -264,7 +265,7 @@ private Sequence executeTimeseries( Hook.QUERY_PLAN.run(query); return Sequences.map( - query.run(walker, Maps.newHashMap()), + QueryPlus.wrap(query).run(walker, Maps.newHashMap()), new Function, Object[]>() { @Override @@ -299,7 +300,7 @@ private Sequence executeTopN( return Sequences.concat( Sequences.map( - query.run(walker, Maps.newHashMap()), + QueryPlus.wrap(query).run(walker, Maps.newHashMap()), new Function, Sequence>() { @Override @@ -335,7 +336,7 @@ private Sequence executeGroupBy( Hook.QUERY_PLAN.run(query); return Sequences.map( - query.run(walker, Maps.newHashMap()), + QueryPlus.wrap(query).run(walker, Maps.newHashMap()), new Function() { @Override diff --git a/sql/src/main/java/io/druid/sql/calcite/schema/DruidSchema.java b/sql/src/main/java/io/druid/sql/calcite/schema/DruidSchema.java index 38f82c8213c7..e5cd0da46c19 100644 --- a/sql/src/main/java/io/druid/sql/calcite/schema/DruidSchema.java +++ b/sql/src/main/java/io/druid/sql/calcite/schema/DruidSchema.java @@ -42,6 +42,7 @@ import io.druid.java.util.common.guava.Sequences; import io.druid.java.util.common.lifecycle.LifecycleStart; import io.druid.java.util.common.lifecycle.LifecycleStop; +import io.druid.query.QueryPlus; import io.druid.query.QuerySegmentWalker; import io.druid.query.TableDataSource; import io.druid.query.metadata.metadata.ColumnAnalysis; @@ -305,7 +306,7 @@ private DruidTable computeTable(final String dataSource) true ); - final Sequence sequence = segmentMetadataQuery.run(walker, Maps.newHashMap()); + final Sequence sequence = QueryPlus.wrap(segmentMetadataQuery).run(walker, Maps.newHashMap()); final List results = Sequences.toList(sequence, Lists.newArrayList()); if (results.isEmpty()) { return null; diff --git a/sql/src/test/java/io/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java b/sql/src/test/java/io/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java index c9b88af530a2..a96596555d6b 100644 --- a/sql/src/test/java/io/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java +++ b/sql/src/test/java/io/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java @@ -32,6 +32,7 @@ import io.druid.query.FinalizeResultsQueryRunner; import io.druid.query.NoopQueryRunner; import io.druid.query.Query; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryRunnerFactoryConglomerate; @@ -110,8 +111,9 @@ public QueryRunner getQueryRunnerForIntervals( new QueryRunner() { @Override - public Sequence run(Query query, Map responseContext) + public Sequence run(QueryPlus queryPlus, Map responseContext) { + Query query = queryPlus.getQuery(); final VersionedIntervalTimeline timeline = getTimelineForTableDataSource(query); return makeBaseRunner( query, @@ -154,7 +156,7 @@ public SegmentDescriptor apply(final PartitionChunk chunk) } } ) - ).run(query, responseContext); + ).run(queryPlus, responseContext); } } ) From d9b919e0b4860ecdbcbbf53fd92a2807f17b5947 Mon Sep 17 00:00:00 2001 From: leventov Date: Tue, 25 Apr 2017 17:43:03 +0300 Subject: [PATCH 2/6] Fix GroupByMergingQueryRunnerV2 --- .../groupby/epinephelinae/GroupByMergingQueryRunnerV2.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java index 549f27becdc1..242915ddf13f 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java @@ -124,8 +124,8 @@ public Sequence run(final QueryPlus queryPlus, final Map runner = new ChainedExecutionQueryRunner<>(exec, queryWatcher, queryables); + return runner.run(queryPlusForRunners, responseContext); } final boolean isSingleThreaded = querySpecificConfig.isSingleThreaded(); From bc47882479017bc0898e150f22c1c0226f1d91ed Mon Sep 17 00:00:00 2001 From: leventov Date: Tue, 25 Apr 2017 17:43:26 +0300 Subject: [PATCH 3/6] Fix QueryResourceTest --- processing/src/main/java/io/druid/query/BaseQuery.java | 1 + .../src/test/java/io/druid/server/QueryResourceTest.java | 7 +++---- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/processing/src/main/java/io/druid/query/BaseQuery.java b/processing/src/main/java/io/druid/query/BaseQuery.java index edb1ca5bf32e..cac39441de4f 100644 --- a/processing/src/main/java/io/druid/query/BaseQuery.java +++ b/processing/src/main/java/io/druid/query/BaseQuery.java @@ -92,6 +92,7 @@ public Sequence run(QuerySegmentWalker walker, Map context) return run(querySegmentSpec.lookup(this, walker), context); } + @Override public Sequence run(QueryRunner runner, Map context) { return runner.run(this, context); diff --git a/server/src/test/java/io/druid/server/QueryResourceTest.java b/server/src/test/java/io/druid/server/QueryResourceTest.java index f8ca466b97ac..3a4573f7cc97 100644 --- a/server/src/test/java/io/druid/server/QueryResourceTest.java +++ b/server/src/test/java/io/druid/server/QueryResourceTest.java @@ -32,6 +32,7 @@ import io.druid.query.DefaultGenericQueryMetricsFactory; import io.druid.query.MapQueryToolChestWarehouse; import io.druid.query.Query; +import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QuerySegmentWalker; import io.druid.query.QueryToolChest; @@ -95,11 +96,9 @@ public QueryRunner getQueryRunnerForIntervals( return new QueryRunner() { @Override - public Sequence run( - Query query, Map responseContext - ) + public Sequence run(QueryPlus query, Map responseContext) { - return Sequences.empty(); + return Sequences.empty(); } }; } From 73cb936d9ea4b327fde871e99c18ea55bf94150b Mon Sep 17 00:00:00 2001 From: leventov Date: Thu, 27 Apr 2017 00:33:58 +0300 Subject: [PATCH 4/6] Expand the comment to Query.run(walker, context) --- processing/src/main/java/io/druid/query/Query.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/processing/src/main/java/io/druid/query/Query.java b/processing/src/main/java/io/druid/query/Query.java index 37bda121249f..5d14ed133af2 100644 --- a/processing/src/main/java/io/druid/query/Query.java +++ b/processing/src/main/java/io/druid/query/Query.java @@ -72,7 +72,9 @@ public interface Query /** * @deprecated use {@link QueryPlus#run(QuerySegmentWalker, Map)} instead. This method could be removed in the next - * minor or major version of Druid. + * minor or major version of Druid. In the future, a method like getRunner(QuerySegmentWalker, Map) could be added + * instead of this method, so that {@link QueryPlus#run(QuerySegmentWalker, Map)} could be implemented as {@code + * this.query.getRunner(walker, context).run(this, context))}. */ @Deprecated Sequence run(QuerySegmentWalker walker, Map context); From 979ea64b5fe9f5ede22cda0330081a5ec86469a3 Mon Sep 17 00:00:00 2001 From: leventov Date: Mon, 1 May 2017 20:50:39 +0300 Subject: [PATCH 5/6] Remove legacy version of BySegmentSkippingQueryRunner.doRun() --- .../druid/query/BySegmentSkippingQueryRunner.java | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/processing/src/main/java/io/druid/query/BySegmentSkippingQueryRunner.java b/processing/src/main/java/io/druid/query/BySegmentSkippingQueryRunner.java index 9c103a29d747..5dda6f618a3c 100644 --- a/processing/src/main/java/io/druid/query/BySegmentSkippingQueryRunner.java +++ b/processing/src/main/java/io/druid/query/BySegmentSkippingQueryRunner.java @@ -47,17 +47,5 @@ public Sequence run(QueryPlus queryPlus, Map responseConte return doRun(baseRunner, queryPlus, responseContext); } - /** - * @deprecated override {@link #doRun(QueryRunner, QueryPlus, Map)} instead - */ - @Deprecated - protected Sequence doRun(QueryRunner baseRunner, Query query, Map context) - { - return doRun(baseRunner, QueryPlus.wrap(query), context); - } - - protected Sequence doRun(QueryRunner baseRunner, QueryPlus queryPlus, Map context) - { - return doRun(baseRunner, queryPlus.getQuery(), context); - } + protected abstract Sequence doRun(QueryRunner baseRunner, QueryPlus queryPlus, Map context); } From b389b9cbce6e7101dd0c2a7827e9dd2e2a0d0147 Mon Sep 17 00:00:00 2001 From: leventov Date: Mon, 8 May 2017 17:09:22 -0500 Subject: [PATCH 6/6] Add LegacyApiQueryRunnerTest and be more specific about legacy API removal plans in Druid 0.11 in Javadocs --- .../src/main/java/io/druid/query/Query.java | 9 +-- .../main/java/io/druid/query/QueryRunner.java | 3 +- .../druid/query/LegacyApiQueryRunnerTest.java | 64 ++++++++++++++++ .../io/druid/query/QueryContextsTest.java | 52 ------------- .../test/java/io/druid/query/TestQuery.java | 75 +++++++++++++++++++ 5 files changed, 144 insertions(+), 59 deletions(-) create mode 100644 processing/src/test/java/io/druid/query/LegacyApiQueryRunnerTest.java create mode 100644 processing/src/test/java/io/druid/query/TestQuery.java diff --git a/processing/src/main/java/io/druid/query/Query.java b/processing/src/main/java/io/druid/query/Query.java index 5d14ed133af2..4e6745ca6d43 100644 --- a/processing/src/main/java/io/druid/query/Query.java +++ b/processing/src/main/java/io/druid/query/Query.java @@ -71,17 +71,16 @@ public interface Query String getType(); /** - * @deprecated use {@link QueryPlus#run(QuerySegmentWalker, Map)} instead. This method could be removed in the next - * minor or major version of Druid. In the future, a method like getRunner(QuerySegmentWalker, Map) could be added - * instead of this method, so that {@link QueryPlus#run(QuerySegmentWalker, Map)} could be implemented as {@code + * @deprecated use {@link QueryPlus#run(QuerySegmentWalker, Map)} instead. This method is going to be removed in Druid + * 0.11. In the future, a method like getRunner(QuerySegmentWalker, Map) could be added instead of this method, so + * that {@link QueryPlus#run(QuerySegmentWalker, Map)} could be implemented as {@code * this.query.getRunner(walker, context).run(this, context))}. */ @Deprecated Sequence run(QuerySegmentWalker walker, Map context); /** - * @deprecated use {@link QueryRunner#run(QueryPlus, Map)} instead. This method could be removed in the next minor or - * major version of Druid. + * @deprecated use {@link QueryRunner#run(QueryPlus, Map)} instead. This method is going to be removed in Druid 0.11. */ @Deprecated Sequence run(QueryRunner runner, Map context); diff --git a/processing/src/main/java/io/druid/query/QueryRunner.java b/processing/src/main/java/io/druid/query/QueryRunner.java index dc8b260a58d9..a7e17f43d242 100644 --- a/processing/src/main/java/io/druid/query/QueryRunner.java +++ b/processing/src/main/java/io/druid/query/QueryRunner.java @@ -31,8 +31,7 @@ public interface QueryRunner { /** - * @deprecated use and override {@link #run(QueryPlus, Map)} instead. This method could be removed in the next minor - * or major version of Druid. + * @deprecated use and override {@link #run(QueryPlus, Map)} instead. This method is going to be removed in Druid 0.11 */ @Deprecated default Sequence run(Query query, Map responseContext) diff --git a/processing/src/test/java/io/druid/query/LegacyApiQueryRunnerTest.java b/processing/src/test/java/io/druid/query/LegacyApiQueryRunnerTest.java new file mode 100644 index 000000000000..31bf7825fc4c --- /dev/null +++ b/processing/src/test/java/io/druid/query/LegacyApiQueryRunnerTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query; + +import com.google.common.collect.ImmutableList; +import io.druid.java.util.common.guava.Sequence; +import io.druid.java.util.common.guava.Sequences; +import io.druid.query.spec.MultipleIntervalSegmentSpec; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +/** + * Tests that if a QueryRunner overrides a legacy {@link QueryRunner#run(Query, Map)} method, it still works. This + * test should be removed when {@link QueryRunner#run(Query, Map)} is removed. + */ +public class LegacyApiQueryRunnerTest +{ + private static class LegacyApiQueryRunner implements QueryRunner + { + /** + * Overrides legacy API. + */ + @Override + public Sequence run(Query query, Map responseContext) + { + return Sequences.empty(); + } + } + + @Test + public void testQueryRunnerLegacyApi() + { + final Query query = new TestQuery( + new TableDataSource("test"), + new MultipleIntervalSegmentSpec(ImmutableList.of(new Interval("0/100"))), + false, + new HashMap() + ); + + Map context = new HashMap<>(); + Assert.assertEquals(Sequences.empty(), new LegacyApiQueryRunner<>().run(QueryPlus.wrap(query), context)); + } +} diff --git a/processing/src/test/java/io/druid/query/QueryContextsTest.java b/processing/src/test/java/io/druid/query/QueryContextsTest.java index 2a6945b9d3cf..81f23374a21e 100644 --- a/processing/src/test/java/io/druid/query/QueryContextsTest.java +++ b/processing/src/test/java/io/druid/query/QueryContextsTest.java @@ -21,67 +21,15 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import io.druid.query.filter.DimFilter; import io.druid.query.spec.MultipleIntervalSegmentSpec; -import io.druid.query.spec.QuerySegmentSpec; import org.joda.time.Interval; import org.junit.Assert; import org.junit.Test; import java.util.HashMap; -import java.util.Map; public class QueryContextsTest { - private static class TestQuery extends BaseQuery - { - - public TestQuery(DataSource dataSource, QuerySegmentSpec querySegmentSpec, boolean descending, Map context) - { - super(dataSource, querySegmentSpec, descending, context); - } - - @Override - public boolean hasFilters() - { - return false; - } - - @Override - public DimFilter getFilter() - { - return null; - } - - @Override - public String getType() - { - return null; - } - - @Override - public Query withQuerySegmentSpec(QuerySegmentSpec spec) - { - return null; - } - - @Override - public Query withDataSource(DataSource dataSource) - { - return null; - } - - @Override - public Query withOverriddenContext(Map contextOverride) - { - return new TestQuery( - getDataSource(), - getQuerySegmentSpec(), - isDescending(), - BaseQuery.computeOverriddenContext(getContext(), contextOverride) - ); - } - } @Test public void testDefaultQueryTimeout() diff --git a/processing/src/test/java/io/druid/query/TestQuery.java b/processing/src/test/java/io/druid/query/TestQuery.java new file mode 100644 index 000000000000..bd478cc622c5 --- /dev/null +++ b/processing/src/test/java/io/druid/query/TestQuery.java @@ -0,0 +1,75 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query; + +import io.druid.query.filter.DimFilter; +import io.druid.query.spec.QuerySegmentSpec; + +import java.util.Map; + +class TestQuery extends BaseQuery +{ + + public TestQuery(DataSource dataSource, QuerySegmentSpec querySegmentSpec, boolean descending, Map context) + { + super(dataSource, querySegmentSpec, descending, context); + } + + @Override + public boolean hasFilters() + { + return false; + } + + @Override + public DimFilter getFilter() + { + return null; + } + + @Override + public String getType() + { + return null; + } + + @Override + public Query withQuerySegmentSpec(QuerySegmentSpec spec) + { + return null; + } + + @Override + public Query withDataSource(DataSource dataSource) + { + return null; + } + + @Override + public Query withOverriddenContext(Map contextOverride) + { + return new TestQuery( + getDataSource(), + getQuerySegmentSpec(), + isDescending(), + BaseQuery.computeOverriddenContext(getContext(), contextOverride) + ); + } +}