From 5e261be18af52cee0bd0229e2d562ef720dbc114 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Wed, 22 Jan 2020 15:09:40 -0800 Subject: [PATCH] Use DataSourceAnalysis throughout the query stack. Builds on #9235, using the datasource analysis functionality to replace various ad-hoc approaches. The most interesting changes are in ClientQuerySegmentWalker (brokers), ServerManager (historicals), and SinkQuerySegmentWalker (indexing tasks). Other changes related to improving how we analyze queries: 1) Changes TimelineServerView to return an Optional timeline, which I thought made the analysis changes cleaner to implement. 2) Added QueryToolChest#canPerformSubquery, which is now used by query entry points to determine whether it is safe to pass a subquery dataSource to the query toolchest. Fixes an issue introduced in #5471 where subqueries under non-groupBy-typed queries were silently ignored, since neither the query entry point nor the toolchest did anything special with them. 3) Removes the QueryPlus.withQuerySegmentSpec method, which was mostly being used in error-prone ways (ignoring any potential subqueries, and not verifying that the underlying data source is actually a table). Replaces with a new function, Queries.withSpecificSegments, that includes sanity checks. --- .../CachingClusteredClientBenchmark.java | 30 ++- .../materializedview/DataSourceOptimizer.java | 37 ++-- .../movingaverage/MovingAverageQueryTest.java | 7 +- .../overlord/SingleTaskBackgroundRunner.java | 10 +- .../org/apache/druid/query/BaseQuery.java | 15 +- .../java/org/apache/druid/query/Queries.java | 48 ++++- .../java/org/apache/druid/query/Query.java | 16 +- .../org/apache/druid/query/QueryPlus.java | 9 - .../druid/query/QuerySegmentWalker.java | 17 +- .../apache/druid/query/QueryToolChest.java | 14 ++ .../apache/druid/query/RetryQueryRunner.java | 7 +- .../apache/druid/query/TimewarpOperator.java | 7 +- .../druid/query/filter/DimFilterUtils.java | 29 ++- .../groupby/GroupByQueryQueryToolChest.java | 20 ++ .../spec/MultipleSpecificSegmentSpec.java | 16 +- .../spec/SpecificSegmentQueryRunner.java | 10 +- .../org/apache/druid/query/QueriesTest.java | 121 +++++++++++ .../druid/query/QueryRunnerTestHelper.java | 14 +- .../query/filter/DimFilterUtilsTest.java | 2 +- .../GroupByQueryQueryToolChestTest.java | 60 ++++++ .../query/groupby/GroupByQueryRunnerTest.java | 108 ++++++---- .../query/search/SearchQueryRunnerTest.java | 13 +- .../apache/druid/client/BrokerServerView.java | 18 +- .../druid/client/CachingClusteredClient.java | 53 +++-- .../apache/druid/client/ServerViewUtil.java | 9 +- .../druid/client/TimelineServerView.java | 17 +- .../appenderator/SinkQuerySegmentWalker.java | 68 +++--- .../druid/server/ClientInfoResource.java | 17 +- .../server/ClientQuerySegmentWalker.java | 29 ++- .../apache/druid/server/SegmentManager.java | 28 ++- .../server/coordination/ServerManager.java | 196 ++++++------------ .../druid/client/BrokerServerViewTest.java | 13 +- ...chingClusteredClientFunctionalityTest.java | 9 +- .../client/CachingClusteredClientTest.java | 7 +- .../druid/server/ClientInfoResourceTest.java | 6 +- .../druid/server/SegmentManagerTest.java | 13 +- .../druid/sql/calcite/rel/QueryMaker.java | 18 +- .../calcite/util/TestServerInventoryView.java | 5 +- 38 files changed, 711 insertions(+), 405 deletions(-) diff --git a/benchmarks/src/main/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java b/benchmarks/src/main/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java index d7b8b63bedfe..72fefa93946a 100644 --- a/benchmarks/src/main/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java +++ b/benchmarks/src/main/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java @@ -25,7 +25,6 @@ import com.google.common.base.Supplier; import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; import com.google.common.collect.Ordering; import org.apache.druid.benchmark.datagen.BenchmarkSchemaInfo; import org.apache.druid.benchmark.datagen.BenchmarkSchemas; @@ -58,7 +57,6 @@ import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.query.BySegmentQueryRunner; -import org.apache.druid.query.DataSource; import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate; import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.Druids; @@ -89,6 +87,7 @@ import org.apache.druid.query.groupby.strategy.GroupByStrategySelector; import org.apache.druid.query.groupby.strategy.GroupByStrategyV1; import org.apache.druid.query.groupby.strategy.GroupByStrategyV2; +import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; import org.apache.druid.query.spec.QuerySegmentSpec; import org.apache.druid.query.timeseries.TimeseriesQuery; @@ -126,7 +125,6 @@ import org.openjdk.jmh.annotations.Warmup; import org.openjdk.jmh.infra.Blackhole; -import javax.annotation.Nullable; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Collections; @@ -134,6 +132,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.ForkJoinPool; @@ -217,8 +216,17 @@ public void setup() .size(0) .build(); final SegmentGenerator segmentGenerator = closer.register(new SegmentGenerator()); - LOG.info("Starting benchmark setup using cacheDir[%s], rows[%,d].", segmentGenerator.getCacheDir(), rowsPerSegment); - final QueryableIndex index = segmentGenerator.generate(dataSegment, schemaInfo, Granularities.NONE, rowsPerSegment); + LOG.info( + "Starting benchmark setup using cacheDir[%s], rows[%,d].", + segmentGenerator.getCacheDir(), + rowsPerSegment + ); + final QueryableIndex index = segmentGenerator.generate( + dataSegment, + schemaInfo, + Granularities.NONE, + rowsPerSegment + ); queryableIndexes.put(dataSegment, index); } @@ -518,12 +526,10 @@ void addSegmentToServer(DruidServer server, DataSegment segment) .add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(selector)); } - @Nullable @Override - public TimelineLookup getTimeline(DataSource dataSource) + public Optional> getTimeline(DataSourceAnalysis analysis) { - final String table = Iterables.getOnlyElement(dataSource.getTableNames()); - return timelines.get(table); + return Optional.ofNullable(timelines.get(analysis.getBaseTableDataSource().get().getName())); } @Override @@ -563,7 +569,11 @@ private class SimpleQueryRunner implements QueryRunner private final QueryRunnerFactoryConglomerate conglomerate; private final QueryableIndexSegment segment; - public SimpleQueryRunner(QueryRunnerFactoryConglomerate conglomerate, SegmentId segmentId, QueryableIndex queryableIndex) + public SimpleQueryRunner( + QueryRunnerFactoryConglomerate conglomerate, + SegmentId segmentId, + QueryableIndex queryableIndex + ) { this.conglomerate = conglomerate; this.segment = new QueryableIndexSegment(queryableIndex, segmentId); diff --git a/extensions-contrib/materialized-view-selection/src/main/java/org/apache/druid/query/materializedview/DataSourceOptimizer.java b/extensions-contrib/materialized-view-selection/src/main/java/org/apache/druid/query/materializedview/DataSourceOptimizer.java index bbd6aa8a4f2b..1653539f3e36 100644 --- a/extensions-contrib/materialized-view-selection/src/main/java/org/apache/druid/query/materializedview/DataSourceOptimizer.java +++ b/extensions-contrib/materialized-view-selection/src/main/java/org/apache/druid/query/materializedview/DataSourceOptimizer.java @@ -23,9 +23,11 @@ import com.google.common.collect.ImmutableSortedSet; import com.google.inject.Inject; import org.apache.druid.client.TimelineServerView; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.query.Query; import org.apache.druid.query.TableDataSource; import org.apache.druid.query.groupby.GroupByQuery; +import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; import org.apache.druid.query.timeseries.TimeseriesQuery; import org.apache.druid.query.topn.TopNQuery; @@ -54,24 +56,24 @@ public class DataSourceOptimizer private ConcurrentHashMap hitCount = new ConcurrentHashMap<>(); private ConcurrentHashMap costTime = new ConcurrentHashMap<>(); private ConcurrentHashMap, AtomicLong>> missFields = new ConcurrentHashMap<>(); - + @Inject - public DataSourceOptimizer(TimelineServerView serverView) + public DataSourceOptimizer(TimelineServerView serverView) { this.serverView = serverView; } /** * Do main work about materialized view selection: transform user query to one or more sub-queries. - * - * In the sub-query, the dataSource is the derivative of dataSource in user query, and sum of all sub-queries' + * + * In the sub-query, the dataSource is the derivative of dataSource in user query, and sum of all sub-queries' * intervals equals the interval in user query - * + * * Derived dataSource with smallest average data size per segment granularity have highest priority to replace the * datasource in user query - * + * * @param query only TopNQuery/TimeseriesQuery/GroupByQuery can be optimized - * @return a list of queries with specified derived dataSources and intervals + * @return a list of queries with specified derived dataSources and intervals */ public List optimize(Query query) { @@ -86,7 +88,7 @@ public List optimize(Query query) // get all derivatives for datasource in query. The derivatives set is sorted by average size of // per segment granularity. Set derivatives = DerivativeDataSourceManager.getDerivatives(datasourceName); - + if (derivatives.isEmpty()) { return Collections.singletonList(query); } @@ -96,10 +98,10 @@ public List optimize(Query query) hitCount.putIfAbsent(datasourceName, new AtomicLong(0)); costTime.putIfAbsent(datasourceName, new AtomicLong(0)); totalCount.get(datasourceName).incrementAndGet(); - + // get all fields which the query required Set requiredFields = MaterializedViewUtils.getRequiredFields(query); - + Set derivativesWithRequiredFields = new HashSet<>(); for (DerivativeDataSource derivativeDataSource : derivatives) { derivativesHitCount.putIfAbsent(derivativeDataSource.getName(), new AtomicLong(0)); @@ -115,14 +117,15 @@ public List optimize(Query query) costTime.get(datasourceName).addAndGet(System.currentTimeMillis() - start); return Collections.singletonList(query); } - + List queries = new ArrayList<>(); List remainingQueryIntervals = (List) query.getIntervals(); - + for (DerivativeDataSource derivativeDataSource : ImmutableSortedSet.copyOf(derivativesWithRequiredFields)) { final List derivativeIntervals = remainingQueryIntervals.stream() .flatMap(interval -> serverView - .getTimeline((new TableDataSource(derivativeDataSource.getName()))) + .getTimeline(DataSourceAnalysis.forDataSource(new TableDataSource(derivativeDataSource.getName()))) + .orElseThrow(() -> new ISE("No timeline for dataSource: %s", derivativeDataSource.getName())) .lookup(interval) .stream() .map(TimelineObjectHolder::getInterval) @@ -133,7 +136,7 @@ public List optimize(Query query) if (derivativeIntervals.isEmpty()) { continue; } - + remainingQueryIntervals = MaterializedViewUtils.minus(remainingQueryIntervals, derivativeIntervals); queries.add( query.withDataSource(new TableDataSource(derivativeDataSource.getName())) @@ -158,13 +161,13 @@ public List optimize(Query query) hitCount.get(datasourceName).incrementAndGet(); costTime.get(datasourceName).addAndGet(System.currentTimeMillis() - start); return queries; - } + } finally { lock.readLock().unlock(); } } - public List getAndResetStats() + public List getAndResetStats() { ImmutableMap derivativesHitCountSnapshot; ImmutableMap totalCountSnapshot; @@ -183,7 +186,7 @@ public List getAndResetStats() hitCount.clear(); costTime.clear(); missFields.clear(); - } + } finally { lock.writeLock().unlock(); } diff --git a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java index 83881c79cf97..9090bfe168d8 100644 --- a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java +++ b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java @@ -49,7 +49,6 @@ import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.emitter.core.Event; import org.apache.druid.java.util.emitter.service.ServiceEmitter; -import org.apache.druid.query.DataSource; import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.Query; import org.apache.druid.query.QueryPlus; @@ -62,6 +61,7 @@ import org.apache.druid.query.groupby.GroupByQuery; import org.apache.druid.query.groupby.ResultRow; import org.apache.druid.query.movingaverage.test.TestConfig; +import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.query.timeseries.TimeseriesQuery; import org.apache.druid.query.timeseries.TimeseriesResultValue; import org.apache.druid.server.ClientQuerySegmentWalker; @@ -84,6 +84,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.Executor; import java.util.concurrent.ForkJoinPool; @@ -305,9 +306,9 @@ public void testQuery() throws IOException new TimelineServerView() { @Override - public TimelineLookup getTimeline(DataSource dataSource) + public Optional> getTimeline(DataSourceAnalysis analysis) { - return null; + return Optional.empty(); } @Override diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java index ecb5d9ab03da..7e7fafac3e8b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java @@ -22,7 +22,6 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; @@ -51,6 +50,7 @@ import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QuerySegmentWalker; import org.apache.druid.query.SegmentDescriptor; +import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.server.DruidNode; import org.apache.druid.server.SetAndVerifyContextQueryRunner; import org.apache.druid.server.initialization.ServerConfig; @@ -328,11 +328,13 @@ public QueryRunner getQueryRunnerForSegments(Query query, Iterable QueryRunner getQueryRunnerImpl(Query query) { QueryRunner queryRunner = null; - final String queryDataSource = Iterables.getOnlyElement(query.getDataSource().getTableNames()); if (runningItem != null) { + final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource()); final Task task = runningItem.getTask(); - if (task.getDataSource().equals(queryDataSource)) { + + if (analysis.getBaseTableDataSource().isPresent() + && task.getDataSource().equals(analysis.getBaseTableDataSource().get().getName())) { final QueryRunner taskQueryRunner = task.getQueryRunner(query); if (taskQueryRunner != null) { @@ -379,7 +381,7 @@ public String getTaskType() { return task.getType(); } - + @Override public String getDataSource() { diff --git a/processing/src/main/java/org/apache/druid/query/BaseQuery.java b/processing/src/main/java/org/apache/druid/query/BaseQuery.java index cc2cd6df6b4c..70fbdf992707 100644 --- a/processing/src/main/java/org/apache/druid/query/BaseQuery.java +++ b/processing/src/main/java/org/apache/druid/query/BaseQuery.java @@ -28,6 +28,7 @@ import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.granularity.PeriodGranularity; +import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.query.spec.QuerySegmentSpec; import org.joda.time.DateTimeZone; import org.joda.time.Duration; @@ -117,17 +118,11 @@ public QueryRunner getRunner(QuerySegmentWalker walker) } @VisibleForTesting - public static QuerySegmentSpec getQuerySegmentSpecForLookUp(BaseQuery query) + public static QuerySegmentSpec getQuerySegmentSpecForLookUp(BaseQuery query) { - if (query.getDataSource() instanceof QueryDataSource) { - QueryDataSource ds = (QueryDataSource) query.getDataSource(); - Query subquery = ds.getQuery(); - if (subquery instanceof BaseQuery) { - return getQuerySegmentSpecForLookUp((BaseQuery) subquery); - } - throw new IllegalStateException("Invalid subquery type " + subquery.getClass()); - } - return query.getQuerySegmentSpec(); + return DataSourceAnalysis.forDataSource(query.getDataSource()) + .getBaseQuerySegmentSpec() + .orElse(query.getQuerySegmentSpec()); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/Queries.java b/processing/src/main/java/org/apache/druid/query/Queries.java index 37408a4aea3f..a2ff0051bf94 100644 --- a/processing/src/main/java/org/apache/druid/query/Queries.java +++ b/processing/src/main/java/org/apache/druid/query/Queries.java @@ -23,8 +23,11 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; import org.apache.druid.guice.annotations.PublicApi; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.PostAggregator; +import org.apache.druid.query.planning.DataSourceAnalysis; +import org.apache.druid.query.spec.MultipleSpecificSegmentSpec; import java.util.Collections; import java.util.HashMap; @@ -33,9 +36,6 @@ import java.util.Map; import java.util.Set; -/** - * - */ @PublicApi public class Queries { @@ -131,4 +131,46 @@ public static List prepareAggregations( return postAggs; } + + /** + * Rewrite "query" to refer to some specific segment descriptors. + * + * The dataSource for "query" must be based on a single table for this operation to be valid. Otherwise, this + * function will throw an exception. + * + * Unlike the seemingly-similar {@code query.withQuerySegmentSpec(new MultipleSpecificSegmentSpec(descriptors))}, + * this this method will walk down subqueries found within the query datasource, if any, and modify the lowest-level + * subquery. The effect is that + * {@code DataSourceAnalysis.forDataSource(query.getDataSource()).getBaseQuerySegmentSpec()} is guaranteed to return + * either {@code new MultipleSpecificSegmentSpec(descriptors)} or empty. + * + * Because {@link BaseQuery#getRunner} is implemented using {@link DataSourceAnalysis#getBaseQuerySegmentSpec}, this + * method will cause the runner to be a specific-segments runner. + */ + public static Query withSpecificSegments(final Query query, final List descriptors) + { + final Query retVal; + + if (query.getDataSource() instanceof QueryDataSource) { + final Query subQuery = ((QueryDataSource) query.getDataSource()).getQuery(); + retVal = query.withDataSource(new QueryDataSource(withSpecificSegments(subQuery, descriptors))); + } else { + retVal = query.withQuerySegmentSpec(new MultipleSpecificSegmentSpec(descriptors)); + } + + // Verify preconditions and invariants, just in case. + final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(retVal.getDataSource()); + + if (!analysis.getBaseTableDataSource().isPresent()) { + throw new ISE("Unable to apply specific segments to non-table-based dataSource[%s]", query.getDataSource()); + } + + if (analysis.getBaseQuerySegmentSpec().isPresent() + && !analysis.getBaseQuerySegmentSpec().get().equals(new MultipleSpecificSegmentSpec(descriptors))) { + // If you see the error message below, it's a bug in either this function or in DataSourceAnalysis. + throw new ISE("Unable to apply specific segments to query with dataSource[%s]", query.getDataSource()); + } + + return retVal; + } } diff --git a/processing/src/main/java/org/apache/druid/query/Query.java b/processing/src/main/java/org/apache/druid/query/Query.java index 387509839bb5..37868b4a25d4 100644 --- a/processing/src/main/java/org/apache/druid/query/Query.java +++ b/processing/src/main/java/org/apache/druid/query/Query.java @@ -116,6 +116,12 @@ public interface Query Query withOverriddenContext(Map contextOverride); + /** + * Returns a new query, identical to this one, but with a different associated {@link QuerySegmentSpec}. + * + * This often changes the behavior of {@link #getRunner(QuerySegmentWalker)}, since most queries inherit that method + * from {@link BaseQuery}, which implements it by calling {@link QuerySegmentSpec#lookup}. + */ Query withQuerySegmentSpec(QuerySegmentSpec spec); Query withId(String id); @@ -140,14 +146,4 @@ default Query optimizeForSegment(PerSegmentQueryOptimizationContext optimizat { return this; } - - default List getIntervalsOfInnerMostQuery() - { - if (getDataSource() instanceof QueryDataSource) { - //noinspection unchecked - return ((QueryDataSource) getDataSource()).getQuery().getIntervalsOfInnerMostQuery(); - } else { - return getIntervals(); - } - } } diff --git a/processing/src/main/java/org/apache/druid/query/QueryPlus.java b/processing/src/main/java/org/apache/druid/query/QueryPlus.java index f1884d356242..1b18e9439099 100644 --- a/processing/src/main/java/org/apache/druid/query/QueryPlus.java +++ b/processing/src/main/java/org/apache/druid/query/QueryPlus.java @@ -24,7 +24,6 @@ import org.apache.druid.guice.annotations.PublicApi; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.query.context.ResponseContext; -import org.apache.druid.query.spec.QuerySegmentSpec; import javax.annotation.Nullable; @@ -125,14 +124,6 @@ private QueryPlus withoutQueryMetrics() } } - /** - * Equivalent of withQuery(getQuery().withQuerySegmentSpec(spec)). - */ - public QueryPlus withQuerySegmentSpec(QuerySegmentSpec spec) - { - return new QueryPlus<>(query.withQuerySegmentSpec(spec), queryMetrics, identity); - } - /** * Equivalent of withQuery(getQuery().withOverriddenContext(ImmutableMap.of(MAX_QUEUED_BYTES_KEY, maxQueuedBytes))). */ diff --git a/processing/src/main/java/org/apache/druid/query/QuerySegmentWalker.java b/processing/src/main/java/org/apache/druid/query/QuerySegmentWalker.java index 970fb6e28c08..7084a80935d1 100644 --- a/processing/src/main/java/org/apache/druid/query/QuerySegmentWalker.java +++ b/processing/src/main/java/org/apache/druid/query/QuerySegmentWalker.java @@ -22,6 +22,7 @@ import org.joda.time.Interval; /** + * An interface for query-handling entry points. */ public interface QuerySegmentWalker { @@ -29,19 +30,27 @@ public interface QuerySegmentWalker * Gets the Queryable for a given interval, the Queryable returned can be any version(s) or partitionNumber(s) * such that it represents the interval. * - * @param query result type - * @param query the query to find a Queryable for + * @param query result type + * @param query the query to find a Queryable for * @param intervals the intervals to find a Queryable for + * * @return a Queryable object that represents the interval */ QueryRunner getQueryRunnerForIntervals(Query query, Iterable intervals); /** - * Gets the Queryable for a given list of SegmentSpecs. + * Gets the Queryable for a given list of SegmentDescriptors. + * + * The descriptors are expected to apply to the base datasource involved in the query, i.e. the one returned by: * - * @param the query result type + *
+   *   DataSourceAnalysis.forDataSource(query.getDataSource()).getBaseDataSource()
+   * 
+ * + * @param the query result type * @param query the query to return a Queryable for * @param specs the list of SegmentSpecs to find a Queryable for + * * @return the Queryable object with the given SegmentSpecs */ QueryRunner getQueryRunnerForSegments(Query query, Iterable specs); diff --git a/processing/src/main/java/org/apache/druid/query/QueryToolChest.java b/processing/src/main/java/org/apache/druid/query/QueryToolChest.java index c271af8e4d5c..b72d9d76eb4e 100644 --- a/processing/src/main/java/org/apache/druid/query/QueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/QueryToolChest.java @@ -271,6 +271,20 @@ public List filterSegments(QueryType query, List subquery) + { + return false; + } + /** * Returns a list of field names in the order that {@link #resultsAsArrays} would return them. The returned list will * be the same length as each array returned by {@link #resultsAsArrays}. diff --git a/processing/src/main/java/org/apache/druid/query/RetryQueryRunner.java b/processing/src/main/java/org/apache/druid/query/RetryQueryRunner.java index 6b991b870575..fa337d047899 100644 --- a/processing/src/main/java/org/apache/druid/query/RetryQueryRunner.java +++ b/processing/src/main/java/org/apache/druid/query/RetryQueryRunner.java @@ -30,7 +30,6 @@ import org.apache.druid.java.util.common.guava.YieldingSequenceBase; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.query.context.ResponseContext; -import org.apache.druid.query.spec.MultipleSpecificSegmentSpec; import org.apache.druid.segment.SegmentMissingException; import java.util.ArrayList; @@ -73,10 +72,8 @@ public Yielder toYielder(OutType initValue, YieldingAccumulat log.info("[%,d] missing segments found. Retry attempt [%,d]", missingSegments.size(), i); context.put(ResponseContext.Key.MISSING_SEGMENTS, new ArrayList<>()); - final QueryPlus retryQueryPlus = queryPlus.withQuerySegmentSpec( - new MultipleSpecificSegmentSpec( - missingSegments - ) + final QueryPlus retryQueryPlus = queryPlus.withQuery( + Queries.withSpecificSegments(queryPlus.getQuery(), missingSegments) ); Sequence retrySequence = baseRunner.run(retryQueryPlus, context); listOfSequences.add(retrySequence); diff --git a/processing/src/main/java/org/apache/druid/query/TimewarpOperator.java b/processing/src/main/java/org/apache/druid/query/TimewarpOperator.java index 0beca6849a4f..88c88b324890 100644 --- a/processing/src/main/java/org/apache/druid/query/TimewarpOperator.java +++ b/processing/src/main/java/org/apache/druid/query/TimewarpOperator.java @@ -92,8 +92,11 @@ public Sequence run(final QueryPlus queryPlus, final ResponseContext respo ); return Sequences.map( baseRunner.run( - queryPlus.withQuerySegmentSpec(new MultipleIntervalSegmentSpec( - Collections.singletonList(modifiedInterval))), + queryPlus.withQuery( + queryPlus.getQuery().withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Collections.singletonList(modifiedInterval)) + ) + ), responseContext ), new Function() diff --git a/processing/src/main/java/org/apache/druid/query/filter/DimFilterUtils.java b/processing/src/main/java/org/apache/druid/query/filter/DimFilterUtils.java index 03966e944445..00a84fc92d6d 100644 --- a/processing/src/main/java/org/apache/druid/query/filter/DimFilterUtils.java +++ b/processing/src/main/java/org/apache/druid/query/filter/DimFilterUtils.java @@ -20,7 +20,6 @@ package org.apache.druid.query.filter; import com.google.common.base.Function; -import com.google.common.base.Optional; import com.google.common.collect.RangeSet; import org.apache.druid.timeline.partition.ShardSpec; @@ -29,9 +28,11 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; /** + * */ public class DimFilterUtils { @@ -87,14 +88,15 @@ static byte[] computeCacheKey(byte cacheIdKey, List filters) * {@link #filterShards(DimFilter, Iterable, Function, Map)} instead with a cached map * * @param dimFilter The filter to use - * @param input The iterable of objects to be filtered + * @param input The iterable of objects to be filtered * @param converter The function to convert T to ShardSpec that can be filtered by - * @param This can be any type, as long as transform function is provided to convert this to ShardSpec + * @param This can be any type, as long as transform function is provided to convert this to ShardSpec + * * @return The set of filtered object, in the same order as input */ public static Set filterShards(DimFilter dimFilter, Iterable input, Function converter) { - return filterShards(dimFilter, input, converter, new HashMap>>()); + return filterShards(dimFilter, input, converter, new HashMap<>()); } /** @@ -106,15 +108,20 @@ public static Set filterShards(DimFilter dimFilter, Iterable input, Fu * between calls with the same dimFilter to save redundant calls of {@link DimFilter#getDimensionRangeSet(String)} * on same dimensions. * - * @param dimFilter The filter to use - * @param input The iterable of objects to be filtered - * @param converter The function to convert T to ShardSpec that can be filtered by + * @param dimFilter The filter to use + * @param input The iterable of objects to be filtered + * @param converter The function to convert T to ShardSpec that can be filtered by * @param dimensionRangeCache The cache of RangeSets of different dimensions for the dimFilter - * @param This can be any type, as long as transform function is provided to convert this to ShardSpec + * @param This can be any type, as long as transform function is provided to convert this to ShardSpec + * * @return The set of filtered object, in the same order as input */ - public static Set filterShards(DimFilter dimFilter, Iterable input, Function converter, - Map>> dimensionRangeCache) + public static Set filterShards( + final DimFilter dimFilter, + final Iterable input, + final Function converter, + final Map>> dimensionRangeCache + ) { Set retSet = new LinkedHashSet<>(); @@ -127,7 +134,7 @@ public static Set filterShards(DimFilter dimFilter, Iterable input, Fu List dimensions = shard.getDomainDimensions(); for (String dimension : dimensions) { Optional> optFilterRangeSet = dimensionRangeCache - .computeIfAbsent(dimension, d -> Optional.fromNullable(dimFilter.getDimensionRangeSet(d))); + .computeIfAbsent(dimension, d -> Optional.ofNullable(dimFilter.getDimensionRangeSet(d))); if (optFilterRangeSet.isPresent()) { filterDomain.put(dimension, optFilterRangeSet.get()); diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java index acfe77688e6a..ebae09f184f2 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -663,6 +663,26 @@ public ResultRow apply(Object input) }; } + @Override + public boolean canPerformSubquery(Query subquery) + { + Query current = subquery; + + while (current != null) { + if (!(current instanceof GroupByQuery)) { + return false; + } + + if (current.getDataSource() instanceof QueryDataSource) { + current = ((QueryDataSource) current.getDataSource()).getQuery(); + } else { + current = null; + } + } + + return true; + } + @Override public List resultArrayFields(final GroupByQuery query) { diff --git a/processing/src/main/java/org/apache/druid/query/spec/MultipleSpecificSegmentSpec.java b/processing/src/main/java/org/apache/druid/query/spec/MultipleSpecificSegmentSpec.java index 34a458d2aaf3..5d0c853823ad 100644 --- a/processing/src/main/java/org/apache/druid/query/spec/MultipleSpecificSegmentSpec.java +++ b/processing/src/main/java/org/apache/druid/query/spec/MultipleSpecificSegmentSpec.java @@ -30,6 +30,7 @@ import org.joda.time.Interval; import java.util.List; +import java.util.Objects; /** */ @@ -93,24 +94,13 @@ public boolean equals(Object o) if (o == null || getClass() != o.getClass()) { return false; } - MultipleSpecificSegmentSpec that = (MultipleSpecificSegmentSpec) o; - - if (descriptors != null ? !descriptors.equals(that.descriptors) : that.descriptors != null) { - return false; - } - if (intervals != null ? !intervals.equals(that.intervals) : that.intervals != null) { - return false; - } - - return true; + return Objects.equals(descriptors, that.descriptors); } @Override public int hashCode() { - int result = descriptors != null ? descriptors.hashCode() : 0; - result = 31 * result + (intervals != null ? intervals.hashCode() : 0); - return result; + return Objects.hash(descriptors); } } diff --git a/processing/src/main/java/org/apache/druid/query/spec/SpecificSegmentQueryRunner.java b/processing/src/main/java/org/apache/druid/query/spec/SpecificSegmentQueryRunner.java index 625f0325229e..0cea0dbf325a 100644 --- a/processing/src/main/java/org/apache/druid/query/spec/SpecificSegmentQueryRunner.java +++ b/processing/src/main/java/org/apache/druid/query/spec/SpecificSegmentQueryRunner.java @@ -28,6 +28,7 @@ import org.apache.druid.java.util.common.guava.Yielder; import org.apache.druid.java.util.common.guava.Yielders; import org.apache.druid.java.util.common.guava.YieldingAccumulator; +import org.apache.druid.query.Queries; import org.apache.druid.query.Query; import org.apache.druid.query.QueryPlus; import org.apache.druid.query.QueryRunner; @@ -38,6 +39,7 @@ import java.util.Collections; /** + * */ public class SpecificSegmentQueryRunner implements QueryRunner { @@ -56,7 +58,13 @@ public SpecificSegmentQueryRunner( @Override public Sequence run(final QueryPlus input, final ResponseContext responseContext) { - final QueryPlus queryPlus = input.withQuerySegmentSpec(specificSpec); + final QueryPlus queryPlus = input.withQuery( + Queries.withSpecificSegments( + input.getQuery(), + Collections.singletonList(specificSpec.getDescriptor()) + ) + ); + final Query query = queryPlus.getQuery(); final Thread currThread = Thread.currentThread(); diff --git a/processing/src/test/java/org/apache/druid/query/QueriesTest.java b/processing/src/test/java/org/apache/druid/query/QueriesTest.java index fd5abf238297..16c4783619f8 100644 --- a/processing/src/test/java/org/apache/druid/query/QueriesTest.java +++ b/processing/src/test/java/org/apache/druid/query/QueriesTest.java @@ -20,6 +20,8 @@ package org.apache.druid.query; import com.google.common.collect.ImmutableList; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory; @@ -27,17 +29,26 @@ import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator; import org.apache.druid.query.aggregation.post.ConstantPostAggregator; import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator; +import org.apache.druid.query.spec.MultipleSpecificSegmentSpec; +import org.apache.druid.query.timeseries.TimeseriesQuery; +import org.apache.druid.query.timeseries.TimeseriesResultValue; import org.junit.Assert; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import java.util.Arrays; import java.util.Collections; import java.util.List; /** + * */ public class QueriesTest { + @Rule + public ExpectedException expectedException = ExpectedException.none(); + @Test public void testVerifyAggregations() { @@ -209,4 +220,114 @@ public void testVerifyAggregationsMultiLevelMissingVal() Assert.assertTrue(exceptionOccured); } + + @Test + public void testWithSpecificSegmentsBasic() + { + final ImmutableList descriptors = ImmutableList.of( + new SegmentDescriptor(Intervals.of("2000/3000"), "0", 0), + new SegmentDescriptor(Intervals.of("2000/3000"), "0", 1) + ); + + Assert.assertEquals( + Druids.newTimeseriesQueryBuilder() + .dataSource("foo") + .intervals( + new MultipleSpecificSegmentSpec( + ImmutableList.of( + new SegmentDescriptor(Intervals.of("2000/3000"), "0", 0), + new SegmentDescriptor(Intervals.of("2000/3000"), "0", 1) + ) + ) + ) + .granularity(Granularities.ALL) + .build(), + Queries.withSpecificSegments( + Druids.newTimeseriesQueryBuilder() + .dataSource("foo") + .intervals("2000/3000") + .granularity(Granularities.ALL) + .build(), + descriptors + ) + ); + } + + @Test + public void testWithSpecificSegmentsSubQueryStack() + { + final ImmutableList descriptors = ImmutableList.of( + new SegmentDescriptor(Intervals.of("2000/3000"), "0", 0), + new SegmentDescriptor(Intervals.of("2000/3000"), "0", 1) + ); + + Assert.assertEquals( + Druids.newTimeseriesQueryBuilder() + .dataSource( + new QueryDataSource( + Druids.newTimeseriesQueryBuilder() + .dataSource( + new QueryDataSource( + Druids.newTimeseriesQueryBuilder() + .dataSource("foo") + .intervals(new MultipleSpecificSegmentSpec(descriptors)) + .granularity(Granularities.ALL) + .build() + ) + ) + .intervals("2000/3000") + .granularity(Granularities.ALL) + .build() + ) + ) + .intervals("2000/3000") + .granularity(Granularities.ALL) + .build(), + Queries.withSpecificSegments( + Druids.newTimeseriesQueryBuilder() + .dataSource( + new QueryDataSource( + Druids.newTimeseriesQueryBuilder() + .dataSource( + new QueryDataSource( + Druids.newTimeseriesQueryBuilder() + .dataSource("foo") + .intervals("2000/3000") + .granularity(Granularities.ALL) + .build() + ) + ) + .intervals("2000/3000") + .granularity(Granularities.ALL) + .build() + ) + ) + .intervals("2000/3000") + .granularity(Granularities.ALL) + .build(), + descriptors + ) + ); + } + + @Test + public void testWithSpecificSegmentsOnUnionIsAnError() + { + final ImmutableList descriptors = ImmutableList.of( + new SegmentDescriptor(Intervals.of("2000/3000"), "0", 0), + new SegmentDescriptor(Intervals.of("2000/3000"), "0", 1) + ); + + final TimeseriesQuery query = + Druids.newTimeseriesQueryBuilder() + .dataSource(new LookupDataSource("lookyloo")) + .intervals("2000/3000") + .granularity(Granularities.ALL) + .build(); + + expectedException.expect(IllegalStateException.class); + expectedException.expectMessage("Unable to apply specific segments to non-table-based dataSource"); + + final Query> ignored = Queries.withSpecificSegments(query, descriptors); + } } diff --git a/processing/src/test/java/org/apache/druid/query/QueryRunnerTestHelper.java b/processing/src/test/java/org/apache/druid/query/QueryRunnerTestHelper.java index 04f86d8ad249..474f7963c58d 100644 --- a/processing/src/test/java/org/apache/druid/query/QueryRunnerTestHelper.java +++ b/processing/src/test/java/org/apache/druid/query/QueryRunnerTestHelper.java @@ -471,12 +471,14 @@ public Sequence run(QueryPlus queryPlus, ResponseContext responseContext) segments )) { Segment segment = holder.getObject().getChunk(0).getObject(); - QueryPlus queryPlusRunning = queryPlus.withQuerySegmentSpec( - new SpecificSegmentSpec( - new SegmentDescriptor( - holder.getInterval(), - holder.getVersion(), - 0 + QueryPlus queryPlusRunning = queryPlus.withQuery( + queryPlus.getQuery().withQuerySegmentSpec( + new SpecificSegmentSpec( + new SegmentDescriptor( + holder.getInterval(), + holder.getVersion(), + 0 + ) ) ) ); diff --git a/processing/src/test/java/org/apache/druid/query/filter/DimFilterUtilsTest.java b/processing/src/test/java/org/apache/druid/query/filter/DimFilterUtilsTest.java index b8fa4c2b3112..778db21b0a3e 100644 --- a/processing/src/test/java/org/apache/druid/query/filter/DimFilterUtilsTest.java +++ b/processing/src/test/java/org/apache/druid/query/filter/DimFilterUtilsTest.java @@ -20,7 +20,6 @@ package org.apache.druid.query.filter; import com.google.common.base.Function; -import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableRangeSet; import com.google.common.collect.ImmutableSet; @@ -36,6 +35,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; public class DimFilterUtilsTest diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java index ef112f408c52..bdb771c01273 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java @@ -31,6 +31,8 @@ import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.query.CacheStrategy; +import org.apache.druid.query.Druids; +import org.apache.druid.query.QueryDataSource; import org.apache.druid.query.QueryRunnerTestHelper; import org.apache.druid.query.QueryToolChestTestHelper; import org.apache.druid.query.aggregation.AggregatorFactory; @@ -757,6 +759,64 @@ public void testResultsAsArraysDayGran() ); } + @Test + public void testCanPerformSubqueryOnGroupBys() + { + Assert.assertTrue( + new GroupByQueryQueryToolChest(null, null).canPerformSubquery( + new GroupByQuery.Builder() + .setDataSource( + new QueryDataSource( + new GroupByQuery.Builder() + .setDataSource(QueryRunnerTestHelper.DATA_SOURCE) + .setInterval(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC) + .setGranularity(Granularities.ALL) + .build() + ) + ) + .setInterval(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC) + .setGranularity(Granularities.ALL) + .build() + ) + ); + } + + @Test + public void testCanPerformSubqueryOnTimeseries() + { + Assert.assertFalse( + new GroupByQueryQueryToolChest(null, null).canPerformSubquery( + Druids.newTimeseriesQueryBuilder() + .dataSource(QueryRunnerTestHelper.DATA_SOURCE) + .intervals(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC) + .granularity(Granularities.ALL) + .build() + ) + ); + } + + @Test + public void testCanPerformSubqueryOnGroupByOfTimeseries() + { + Assert.assertFalse( + new GroupByQueryQueryToolChest(null, null).canPerformSubquery( + new GroupByQuery.Builder() + .setDataSource( + new QueryDataSource( + Druids.newTimeseriesQueryBuilder() + .dataSource(QueryRunnerTestHelper.DATA_SOURCE) + .intervals(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC) + .granularity(Granularities.ALL) + .build() + ) + ) + .setInterval(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC) + .setGranularity(Granularities.ALL) + .build() + ) + ); + } + private AggregatorFactory getComplexAggregatorFactoryForValueType(final ValueType valueType) { switch (valueType) { diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java index b8662e01f1df..037b2b75a973 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java @@ -2983,11 +2983,15 @@ public void testMergeResults() public Sequence run(QueryPlus queryPlus, ResponseContext responseContext) { // simulate two daily segments - final QueryPlus queryPlus1 = queryPlus.withQuerySegmentSpec( - new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-02/2011-04-03"))) + final QueryPlus queryPlus1 = queryPlus.withQuery( + queryPlus.getQuery().withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-02/2011-04-03"))) + ) ); - final QueryPlus queryPlus2 = queryPlus.withQuerySegmentSpec( - new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-03/2011-04-04"))) + final QueryPlus queryPlus2 = queryPlus.withQuery( + queryPlus.getQuery().withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-03/2011-04-04"))) + ) ); return new MergeSequence( queryPlus.getQuery().getResultOrdering(), @@ -3326,11 +3330,15 @@ private void doTestMergeResultsWithOrderBy( public Sequence run(QueryPlus queryPlus, ResponseContext responseContext) { // simulate two daily segments - final QueryPlus queryPlus1 = queryPlus.withQuerySegmentSpec( - new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-02/2011-04-03"))) + final QueryPlus queryPlus1 = queryPlus.withQuery( + queryPlus.getQuery().withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-02/2011-04-03"))) + ) ); - final QueryPlus queryPlus2 = queryPlus.withQuerySegmentSpec( - new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-03/2011-04-04"))) + final QueryPlus queryPlus2 = queryPlus.withQuery( + queryPlus.getQuery().withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-03/2011-04-04"))) + ) ); return new MergeSequence( queryPlus.getQuery().getResultOrdering(), @@ -4083,11 +4091,15 @@ public void testPostAggMergedHavingSpec() public Sequence run(QueryPlus queryPlus, ResponseContext responseContext) { // simulate two daily segments - final QueryPlus queryPlus1 = queryPlus.withQuerySegmentSpec( - new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-02/2011-04-03"))) + final QueryPlus queryPlus1 = queryPlus.withQuery( + queryPlus.getQuery().withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-02/2011-04-03"))) + ) ); - final QueryPlus queryPlus2 = queryPlus.withQuerySegmentSpec( - new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-03/2011-04-04"))) + final QueryPlus queryPlus2 = queryPlus.withQuery( + queryPlus.getQuery().withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-03/2011-04-04"))) + ) ); return new MergeSequence( queryPlus.getQuery().getResultOrdering(), @@ -4349,11 +4361,15 @@ public void testMergedHavingSpec() public Sequence run(QueryPlus queryPlus, ResponseContext responseContext) { // simulate two daily segments - final QueryPlus queryPlus1 = queryPlus.withQuerySegmentSpec( - new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-02/2011-04-03"))) + final QueryPlus queryPlus1 = queryPlus.withQuery( + queryPlus.getQuery().withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-02/2011-04-03"))) + ) ); - final QueryPlus queryPlus2 = queryPlus.withQuerySegmentSpec( - new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-03/2011-04-04"))) + final QueryPlus queryPlus2 = queryPlus.withQuery( + queryPlus.getQuery().withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-03/2011-04-04"))) + ) ); return new MergeSequence( queryPlus.getQuery().getResultOrdering(), @@ -4426,11 +4442,15 @@ public void testMergedPostAggHavingSpec() public Sequence run(QueryPlus queryPlus, ResponseContext responseContext) { // simulate two daily segments - final QueryPlus queryPlus1 = queryPlus.withQuerySegmentSpec( - new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-02/2011-04-03"))) + final QueryPlus queryPlus1 = queryPlus.withQuery( + queryPlus.getQuery().withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-02/2011-04-03"))) + ) ); - final QueryPlus queryPlus2 = queryPlus.withQuerySegmentSpec( - new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-03/2011-04-04"))) + final QueryPlus queryPlus2 = queryPlus.withQuery( + queryPlus.getQuery().withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-03/2011-04-04"))) + ) ); return new MergeSequence( queryPlus.getQuery().getResultOrdering(), @@ -9968,11 +9988,15 @@ public void testMergeResultsWithLimitPushDown() public Sequence run(QueryPlus queryPlus, ResponseContext responseContext) { // simulate two daily segments - final QueryPlus queryPlus1 = queryPlus.withQuerySegmentSpec( - new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-02/2011-04-03"))) + final QueryPlus queryPlus1 = queryPlus.withQuery( + queryPlus.getQuery().withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-02/2011-04-03"))) + ) ); - final QueryPlus queryPlus2 = queryPlus.withQuerySegmentSpec( - new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-03/2011-04-04"))) + final QueryPlus queryPlus2 = queryPlus.withQuery( + queryPlus.getQuery().withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-03/2011-04-04"))) + ) ); return factory.getToolchest().mergeResults( @@ -10034,11 +10058,15 @@ public void testMergeResultsWithLimitPushDownSortByAgg() public Sequence run(QueryPlus queryPlus, ResponseContext responseContext) { // simulate two daily segments - final QueryPlus queryPlus1 = queryPlus.withQuerySegmentSpec( - new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-02/2011-04-03"))) + final QueryPlus queryPlus1 = queryPlus.withQuery( + queryPlus.getQuery().withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-02/2011-04-03"))) + ) ); - final QueryPlus queryPlus2 = queryPlus.withQuerySegmentSpec( - new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-03/2011-04-04"))) + final QueryPlus queryPlus2 = queryPlus.withQuery( + queryPlus.getQuery().withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-03/2011-04-04"))) + ) ); return factory.getToolchest().mergeResults( @@ -10102,11 +10130,15 @@ public void testMergeResultsWithLimitPushDownSortByDimDim() public Sequence run(QueryPlus queryPlus, ResponseContext responseContext) { // simulate two daily segments - final QueryPlus queryPlus1 = queryPlus.withQuerySegmentSpec( - new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-02/2011-04-03"))) + final QueryPlus queryPlus1 = queryPlus.withQuery( + queryPlus.getQuery().withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-02/2011-04-03"))) + ) ); - final QueryPlus queryPlus2 = queryPlus.withQuerySegmentSpec( - new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-03/2011-04-04"))) + final QueryPlus queryPlus2 = queryPlus.withQuery( + queryPlus.getQuery().withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-03/2011-04-04"))) + ) ); return factory.getToolchest().mergeResults( @@ -10183,11 +10215,15 @@ public void testMergeResultsWithLimitPushDownSortByDimAggDim() public Sequence run(QueryPlus queryPlus, ResponseContext responseContext) { // simulate two daily segments - final QueryPlus queryPlus1 = queryPlus.withQuerySegmentSpec( - new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-02/2011-04-03"))) + final QueryPlus queryPlus1 = queryPlus.withQuery( + queryPlus.getQuery().withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-02/2011-04-03"))) + ) ); - final QueryPlus queryPlus2 = queryPlus.withQuerySegmentSpec( - new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-03/2011-04-04"))) + final QueryPlus queryPlus2 = queryPlus.withQuery( + queryPlus.getQuery().withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-03/2011-04-04"))) + ) ); return factory.getToolchest().mergeResults( diff --git a/processing/src/test/java/org/apache/druid/query/search/SearchQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/search/SearchQueryRunnerTest.java index 6b503a541116..aebcf257b3e5 100644 --- a/processing/src/test/java/org/apache/druid/query/search/SearchQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/search/SearchQueryRunnerTest.java @@ -72,6 +72,7 @@ import java.util.List; /** + * */ @RunWith(Parameterized.class) public class SearchQueryRunnerTest extends InitializedNullHandlingTest @@ -167,11 +168,15 @@ public Sequence> run( ResponseContext responseContext ) { - final QueryPlus> queryPlus1 = queryPlus.withQuerySegmentSpec( - new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-01-12/2011-02-28"))) + final QueryPlus> queryPlus1 = queryPlus.withQuery( + queryPlus.getQuery().withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-01-12/2011-02-28"))) + ) ); - final QueryPlus> queryPlus2 = queryPlus.withQuerySegmentSpec( - new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-03-01/2011-04-15"))) + final QueryPlus> queryPlus2 = queryPlus.withQuery( + queryPlus.getQuery().withQuerySegmentSpec( + new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-03-01/2011-04-15"))) + ) ); return Sequences.concat(runner.run(queryPlus1, responseContext), runner.run(queryPlus2, responseContext)); } diff --git a/server/src/main/java/org/apache/druid/client/BrokerServerView.java b/server/src/main/java/org/apache/druid/client/BrokerServerView.java index 4b365b7ce89e..5bf4ee9c911e 100644 --- a/server/src/main/java/org/apache/druid/client/BrokerServerView.java +++ b/server/src/main/java/org/apache/druid/client/BrokerServerView.java @@ -21,7 +21,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Predicate; -import com.google.common.collect.Iterables; import com.google.common.collect.Ordering; import com.google.inject.Inject; import org.apache.druid.client.selector.QueryableDruidServer; @@ -30,26 +29,28 @@ import org.apache.druid.guice.ManageLifecycle; import org.apache.druid.guice.annotations.EscalatedClient; import org.apache.druid.guice.annotations.Smile; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.http.client.HttpClient; -import org.apache.druid.query.DataSource; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryToolChestWarehouse; import org.apache.druid.query.QueryWatcher; +import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.partition.PartitionChunk; -import javax.annotation.Nullable; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; @@ -289,14 +290,15 @@ private void serverRemovedSegment(DruidServerMetadata server, DataSegment segmen } } - - @Nullable @Override - public VersionedIntervalTimeline getTimeline(DataSource dataSource) + public Optional> getTimeline(final DataSourceAnalysis analysis) { - String table = Iterables.getOnlyElement(dataSource.getTableNames()); + final TableDataSource tableDataSource = + analysis.getBaseTableDataSource() + .orElseThrow(() -> new ISE("Cannot handle datasource: %s", analysis.getDataSource())); + synchronized (lock) { - return timelines.get(table); + return Optional.ofNullable(timelines.get(tableDataSource.getName())); } } diff --git a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java index 9fbc354109cb..ba8b3ec09523 100644 --- a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java @@ -22,7 +22,6 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; -import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Maps; @@ -53,6 +52,7 @@ import org.apache.druid.query.BySegmentResultValueClass; import org.apache.druid.query.CacheStrategy; import org.apache.druid.query.DruidProcessingConfig; +import org.apache.druid.query.Queries; import org.apache.druid.query.Query; import org.apache.druid.query.QueryContexts; import org.apache.druid.query.QueryMetrics; @@ -66,7 +66,8 @@ import org.apache.druid.query.aggregation.MetricManipulatorFns; import org.apache.druid.query.context.ResponseContext; import org.apache.druid.query.filter.DimFilterUtils; -import org.apache.druid.query.spec.MultipleSpecificSegmentSpec; +import org.apache.druid.query.planning.DataSourceAnalysis; +import org.apache.druid.query.spec.QuerySegmentSpec; import org.apache.druid.server.QueryResource; import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.timeline.DataSegment; @@ -88,6 +89,7 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; @@ -97,6 +99,7 @@ import java.util.stream.Collectors; /** + * */ public class CachingClusteredClient implements QuerySegmentWalker { @@ -231,6 +234,7 @@ private class SpecificQueryRunnable private final int uncoveredIntervalsLimit; private final Query downstreamQuery; private final Map cachePopulatorKeyMap = new HashMap<>(); + private final DataSourceAnalysis dataSourceAnalysis; private final List intervals; SpecificQueryRunnable(final QueryPlus queryPlus, final ResponseContext responseContext) @@ -248,8 +252,11 @@ private class SpecificQueryRunnable // and might blow up in some cases https://github.com/apache/druid/issues/2108 this.uncoveredIntervalsLimit = QueryContexts.getUncoveredIntervalsLimit(query); this.downstreamQuery = query.withOverriddenContext(makeDownstreamQueryContext()); + this.dataSourceAnalysis = DataSourceAnalysis.forDataSource(query.getDataSource()); // For nested queries, we need to look at the intervals of the inner most query. - this.intervals = query.getIntervalsOfInnerMostQuery(); + this.intervals = dataSourceAnalysis.getBaseQuerySegmentSpec() + .map(QuerySegmentSpec::getIntervals) + .orElse(query.getIntervals()); } private ImmutableMap makeDownstreamQueryContext() @@ -269,12 +276,14 @@ private ImmutableMap makeDownstreamQueryContext() Sequence run(final UnaryOperator> timelineConverter) { - @Nullable - TimelineLookup timeline = serverView.getTimeline(query.getDataSource()); - if (timeline == null) { + final Optional> maybeTimeline = serverView.getTimeline( + dataSourceAnalysis + ); + if (!maybeTimeline.isPresent()) { return Sequences.empty(); } - timeline = timelineConverter.apply(timeline); + + final TimelineLookup timeline = timelineConverter.apply(maybeTimeline.get()); if (uncoveredIntervalsLimit > 0) { computeUncoveredIntervals(timeline); } @@ -598,19 +607,17 @@ private void addSequencesFromServer( return; } - final MultipleSpecificSegmentSpec segmentsOfServerSpec = new MultipleSpecificSegmentSpec(segmentsOfServer); - // Divide user-provided maxQueuedBytes by the number of servers, and limit each server to that much. final long maxQueuedBytes = QueryContexts.getMaxQueuedBytes(query, httpClientConfig.getMaxQueuedBytes()); final long maxQueuedBytesPerServer = maxQueuedBytes / segmentsByServer.size(); final Sequence serverResults; if (isBySegment) { - serverResults = getBySegmentServerResults(serverRunner, segmentsOfServerSpec, maxQueuedBytesPerServer); + serverResults = getBySegmentServerResults(serverRunner, segmentsOfServer, maxQueuedBytesPerServer); } else if (!server.segmentReplicatable() || !populateCache) { - serverResults = getSimpleServerResults(serverRunner, segmentsOfServerSpec, maxQueuedBytesPerServer); + serverResults = getSimpleServerResults(serverRunner, segmentsOfServer, maxQueuedBytesPerServer); } else { - serverResults = getAndCacheServerResults(serverRunner, segmentsOfServerSpec, maxQueuedBytesPerServer); + serverResults = getAndCacheServerResults(serverRunner, segmentsOfServer, maxQueuedBytesPerServer); } listOfSequences.add(serverResults); }); @@ -619,13 +626,15 @@ private void addSequencesFromServer( @SuppressWarnings("unchecked") private Sequence getBySegmentServerResults( final QueryRunner serverRunner, - final MultipleSpecificSegmentSpec segmentsOfServerSpec, + final List segmentsOfServer, long maxQueuedBytesPerServer ) { Sequence>> resultsBySegments = serverRunner .run( - queryPlus.withQuerySegmentSpec(segmentsOfServerSpec).withMaxQueuedBytes(maxQueuedBytesPerServer), + queryPlus.withQuery( + Queries.withSpecificSegments(queryPlus.getQuery(), segmentsOfServer) + ).withMaxQueuedBytes(maxQueuedBytesPerServer), responseContext ); // bySegment results need to be de-serialized, see DirectDruidClient.run() @@ -640,27 +649,33 @@ private Sequence getBySegmentServerResults( @SuppressWarnings("unchecked") private Sequence getSimpleServerResults( final QueryRunner serverRunner, - final MultipleSpecificSegmentSpec segmentsOfServerSpec, + final List segmentsOfServer, long maxQueuedBytesPerServer ) { return serverRunner.run( - queryPlus.withQuerySegmentSpec(segmentsOfServerSpec).withMaxQueuedBytes(maxQueuedBytesPerServer), + queryPlus.withQuery( + Queries.withSpecificSegments(queryPlus.getQuery(), segmentsOfServer) + ).withMaxQueuedBytes(maxQueuedBytesPerServer), responseContext ); } private Sequence getAndCacheServerResults( final QueryRunner serverRunner, - final MultipleSpecificSegmentSpec segmentsOfServerSpec, + final List segmentsOfServer, long maxQueuedBytesPerServer ) { @SuppressWarnings("unchecked") final Sequence>> resultsBySegments = serverRunner.run( queryPlus - .withQuery((Query>>) downstreamQuery) - .withQuerySegmentSpec(segmentsOfServerSpec) + .withQuery( + Queries.withSpecificSegments( + (Query>>) downstreamQuery, + segmentsOfServer + ) + ) .withMaxQueuedBytes(maxQueuedBytesPerServer), responseContext ); diff --git a/server/src/main/java/org/apache/druid/client/ServerViewUtil.java b/server/src/main/java/org/apache/druid/client/ServerViewUtil.java index 8a52cacf2995..b9f1f91a5d35 100644 --- a/server/src/main/java/org/apache/druid/client/ServerViewUtil.java +++ b/server/src/main/java/org/apache/druid/client/ServerViewUtil.java @@ -24,6 +24,7 @@ import org.apache.druid.query.LocatedSegmentDescriptor; import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.timeline.TimelineLookup; import org.apache.druid.timeline.TimelineObjectHolder; @@ -33,6 +34,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Optional; /** */ @@ -55,13 +57,14 @@ public static List getTargetLocations( int numCandidates ) { - TimelineLookup timeline = serverView.getTimeline(datasource); - if (timeline == null) { + final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(datasource); + final Optional> maybeTimeline = serverView.getTimeline(analysis); + if (!maybeTimeline.isPresent()) { return Collections.emptyList(); } List located = new ArrayList<>(); for (Interval interval : intervals) { - for (TimelineObjectHolder holder : timeline.lookup(interval)) { + for (TimelineObjectHolder holder : maybeTimeline.get().lookup(interval)) { for (PartitionChunk chunk : holder.getObject()) { ServerSelector selector = chunk.getObject(); final SegmentDescriptor descriptor = new SegmentDescriptor( diff --git a/server/src/main/java/org/apache/druid/client/TimelineServerView.java b/server/src/main/java/org/apache/druid/client/TimelineServerView.java index ed1d4dfb7313..477882342425 100644 --- a/server/src/main/java/org/apache/druid/client/TimelineServerView.java +++ b/server/src/main/java/org/apache/druid/client/TimelineServerView.java @@ -20,22 +20,31 @@ package org.apache.druid.client; import org.apache.druid.client.selector.ServerSelector; -import org.apache.druid.query.DataSource; import org.apache.druid.query.QueryRunner; +import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.TimelineLookup; -import javax.annotation.Nullable; import java.util.List; +import java.util.Optional; import java.util.concurrent.Executor; /** */ public interface TimelineServerView extends ServerView { - @Nullable - TimelineLookup getTimeline(DataSource dataSource); + /** + * Returns the timeline for a datasource, if it exists. The analysis object passed in must represent a scan-based + * datasource of a single table. + * + * @param analysis data source analysis information + * + * @return timeline, if it exists + * + * @throws IllegalStateException if 'analysis' does not represent a scan-based datasource of a single table + */ + Optional> getTimeline(DataSourceAnalysis analysis); /** * Returns a list of {@link ImmutableDruidServer} diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java index 451e079b26b4..fb7b8067008f 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java @@ -20,7 +20,6 @@ package org.apache.druid.segment.realtime.appenderator; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.collect.Iterables; import org.apache.druid.client.CachingQueryRunner; @@ -42,6 +41,7 @@ import org.apache.druid.query.MetricsEmittingQueryRunner; import org.apache.druid.query.NoopQueryRunner; import org.apache.druid.query.Query; +import org.apache.druid.query.QueryDataSource; import org.apache.druid.query.QueryMetrics; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryRunnerFactory; @@ -60,16 +60,19 @@ import org.apache.druid.segment.realtime.FireHydrant; import org.apache.druid.segment.realtime.plumber.Sink; import org.apache.druid.timeline.SegmentId; -import org.apache.druid.timeline.TimelineObjectHolder; import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.partition.PartitionChunk; import org.apache.druid.timeline.partition.PartitionHolder; import org.joda.time.Interval; import java.io.Closeable; +import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicLong; +/** + * Query handler for indexing tasks. + */ public class SinkQuerySegmentWalker implements QuerySegmentWalker { private static final EmittingLogger log = new EmittingLogger(SinkQuerySegmentWalker.class); @@ -118,40 +121,17 @@ public QueryRunner getQueryRunnerForIntervals(final Query query, final { final Iterable specs = FunctionalIterable .create(intervals) + .transformCat(sinkTimeline::lookup) .transformCat( - new Function>>() - { - @Override - public Iterable> apply(final Interval interval) - { - return sinkTimeline.lookup(interval); - } - } - ) - .transformCat( - new Function, Iterable>() - { - @Override - public Iterable apply(final TimelineObjectHolder holder) - { - return FunctionalIterable - .create(holder.getObject()) - .transform( - new Function, SegmentDescriptor>() - { - @Override - public SegmentDescriptor apply(final PartitionChunk chunk) - { - return new SegmentDescriptor( - holder.getInterval(), - holder.getVersion(), - chunk.getChunkNumber() - ); - } - } - ); - } - } + holder -> FunctionalIterable + .create(holder.getObject()) + .transform( + chunk -> new SegmentDescriptor( + holder.getInterval(), + holder.getVersion(), + chunk.getChunkNumber() + ) + ) ); return getQueryRunnerForSegments(query, specs); @@ -161,16 +141,15 @@ public SegmentDescriptor apply(final PartitionChunk chunk) public QueryRunner getQueryRunnerForSegments(final Query query, final Iterable specs) { // We only handle one particular dataSource. Make sure that's what we have, then ignore from here on out. - if (!(query.getDataSource() instanceof TableDataSource) - || !dataSource.equals(((TableDataSource) query.getDataSource()).getName())) { - log.makeAlert("Received query for unknown dataSource") - .addData("dataSource", query.getDataSource()) - .emit(); - return new NoopQueryRunner<>(); + final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource()); + final Optional baseTableDataSource = analysis.getBaseTableDataSource(); + + if (!baseTableDataSource.isPresent() || !dataSource.equals(baseTableDataSource.get().getName())) { + // Report error, since we somehow got a query for a datasource we can't handle. + throw new ISE("Cannot handle datasource: %s", analysis.getDataSource()); } // Sanity check: we cannot actually handle joins yet, so detect them and throw an error. - final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource()); if (!analysis.getPreJoinableClauses().isEmpty()) { throw new ISE("Cannot handle join dataSource"); } @@ -184,6 +163,11 @@ public QueryRunner getQueryRunnerForSegments(final Query query, final final boolean skipIncrementalSegment = query.getContextValue(CONTEXT_SKIP_INCREMENTAL_SEGMENT, false); final AtomicLong cpuTimeAccumulator = new AtomicLong(0L); + // Make sure this query type can handle the subquery, if present. + if (analysis.isQuery() && !toolChest.canPerformSubquery(((QueryDataSource) analysis.getDataSource()).getQuery())) { + throw new ISE("Cannot handle subquery: %s", analysis.getDataSource()); + } + Iterable> perSegmentRunners = Iterables.transform( specs, descriptor -> { diff --git a/server/src/main/java/org/apache/druid/server/ClientInfoResource.java b/server/src/main/java/org/apache/druid/server/ClientInfoResource.java index 24f6c4435d2f..08032a5efe39 100644 --- a/server/src/main/java/org/apache/druid/server/ClientInfoResource.java +++ b/server/src/main/java/org/apache/druid/server/ClientInfoResource.java @@ -37,6 +37,7 @@ import org.apache.druid.query.LocatedSegmentDescriptor; import org.apache.druid.query.TableDataSource; import org.apache.druid.query.metadata.SegmentMetadataQueryConfig; +import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.server.http.security.DatasourceResourceFilter; import org.apache.druid.server.security.AuthConfig; import org.apache.druid.server.security.AuthorizationUtils; @@ -64,12 +65,14 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.TreeMap; import java.util.stream.Collectors; import java.util.stream.Stream; /** + * */ @Path("/druid/v2/datasources") public class ClientInfoResource @@ -152,12 +155,12 @@ KEY_METRICS, getDataSourceMetrics(dataSourceName, interval) theInterval = Intervals.of(interval); } - TimelineLookup timeline = timelineServerView.getTimeline(new TableDataSource(dataSourceName)); - Iterable> serversLookup = timeline != null ? timeline.lookup( - theInterval - ) : null; - if (serversLookup == null || Iterables.isEmpty(serversLookup)) { - return Collections.EMPTY_MAP; + final Optional> maybeTimeline = + timelineServerView.getTimeline(DataSourceAnalysis.forDataSource(new TableDataSource(dataSourceName))); + final Optional>> maybeServersLookup = + maybeTimeline.map(timeline -> timeline.lookup(theInterval)); + if (!maybeServersLookup.isPresent() || Iterables.isEmpty(maybeServersLookup.get())) { + return Collections.emptyMap(); } Map servedIntervals = new TreeMap<>( new Comparator() @@ -174,7 +177,7 @@ public int compare(Interval o1, Interval o2) } ); - for (TimelineObjectHolder holder : serversLookup) { + for (TimelineObjectHolder holder : maybeServersLookup.get()) { final Set dimensions = new HashSet<>(); final Set metrics = new HashSet<>(); final PartitionHolder partitionHolder = holder.getObject(); diff --git a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java index ddc15aedef07..b484486c03bf 100644 --- a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java @@ -43,7 +43,7 @@ import org.joda.time.Interval; /** - * + * Query handler for Broker processes (see CliBroker). */ public class ClientQuerySegmentWalker implements QuerySegmentWalker { @@ -56,7 +56,6 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker private final Cache cache; private final CacheConfig cacheConfig; - @Inject public ClientQuerySegmentWalker( ServiceEmitter emitter, @@ -82,25 +81,27 @@ public ClientQuerySegmentWalker( @Override public QueryRunner getQueryRunnerForIntervals(Query query, Iterable intervals) { - // Sanity check: we cannot actually handle joins yet, so detect them and throw an error. final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource()); - if (!analysis.getPreJoinableClauses().isEmpty()) { - throw new ISE("Cannot handle join dataSource"); - } - return makeRunner(query, baseClient.getQueryRunnerForIntervals(query, intervals)); + if (analysis.isConcreteTableBased()) { + return makeRunner(query, baseClient.getQueryRunnerForIntervals(query, intervals)); + } else { + // In the future, we will check here to see if parts of the query are inlinable, and if that inlining would + // be able to create a concrete table-based query that we can run through the distributed query stack. + throw new ISE("Query dataSource is not table-based, cannot run"); + } } @Override public QueryRunner getQueryRunnerForSegments(Query query, Iterable specs) { - // Sanity check: we cannot actually handle joins yet, so detect them and throw an error. final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource()); - if (!analysis.getPreJoinableClauses().isEmpty()) { - throw new ISE("Cannot handle join dataSource"); - } - return makeRunner(query, baseClient.getQueryRunnerForSegments(query, specs)); + if (analysis.isConcreteTableBased()) { + return makeRunner(query, baseClient.getQueryRunnerForSegments(query, specs)); + } else { + throw new ISE("Query dataSource is not table-based, cannot run"); + } } private QueryRunner makeRunner(Query query, QueryRunner baseClientRunner) @@ -126,9 +127,7 @@ private QueryRunner makeRunner( { PostProcessingOperator postProcessing = objectMapper.convertValue( query.getContextValue("postProcessing"), - new TypeReference>() - { - } + new TypeReference>() {} ); return new FluentQueryRunnerBuilder<>(toolChest) diff --git a/server/src/main/java/org/apache/druid/server/SegmentManager.java b/server/src/main/java/org/apache/druid/server/SegmentManager.java index dbb48474defc..45b6538bb610 100644 --- a/server/src/main/java/org/apache/druid/server/SegmentManager.java +++ b/server/src/main/java/org/apache/druid/server/SegmentManager.java @@ -23,7 +23,10 @@ import com.google.common.collect.Ordering; import com.google.inject.Inject; import org.apache.druid.common.guava.SettableSupplier; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.segment.ReferenceCountingSegment; import org.apache.druid.segment.Segment; import org.apache.druid.segment.loading.SegmentLoader; @@ -35,8 +38,8 @@ import org.apache.druid.timeline.partition.ShardSpec; import org.apache.druid.utils.CollectionUtils; -import javax.annotation.Nullable; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; /** @@ -134,19 +137,30 @@ public boolean isSegmentCached(final DataSegment segment) return segmentLoader.isSegmentLoaded(segment); } - @Nullable - public VersionedIntervalTimeline getTimeline(String dataSource) + /** + * Returns the timeline for a datasource, if it exists. The analysis object passed in must represent a scan-based + * datasource of a single table. + * + * @param analysis data source analysis information + * + * @return timeline, if it exists + * + * @throws IllegalStateException if 'analysis' does not represent a scan-based datasource of a single table + */ + public Optional> getTimeline(DataSourceAnalysis analysis) { - final DataSourceState dataSourceState = dataSources.get(dataSource); - return dataSourceState == null ? null : dataSourceState.getTimeline(); + final TableDataSource tableDataSource = + analysis.getBaseTableDataSource() + .orElseThrow(() -> new ISE("Cannot handle datasource: %s", analysis.getDataSource())); + + return Optional.ofNullable(dataSources.get(tableDataSource.getName())).map(DataSourceState::getTimeline); } /** * Load a single segment. * * @param segment segment to load - * - * @param lazy whether to lazy load columns metadata + * @param lazy whether to lazy load columns metadata * * @return true if the segment was newly loaded, false if it was already loaded * diff --git a/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java b/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java index 41c71160e5ec..d4c672c91f4b 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java +++ b/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java @@ -20,8 +20,6 @@ package org.apache.druid.server.coordination; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Function; -import com.google.common.collect.Iterables; import com.google.inject.Inject; import org.apache.druid.client.CachingQueryRunner; import org.apache.druid.client.cache.Cache; @@ -35,7 +33,6 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.query.BySegmentQueryRunner; import org.apache.druid.query.CPUTimeMetricQueryRunner; -import org.apache.druid.query.DataSource; import org.apache.druid.query.FinalizeResultsQueryRunner; import org.apache.druid.query.MetricsEmittingQueryRunner; import org.apache.druid.query.NoopQueryRunner; @@ -52,7 +49,6 @@ import org.apache.druid.query.ReferenceCountingSegmentQueryRunner; import org.apache.druid.query.ReportTimelineMissingSegmentQueryRunner; import org.apache.druid.query.SegmentDescriptor; -import org.apache.druid.query.TableDataSource; import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.query.spec.SpecificSegmentQueryRunner; import org.apache.druid.query.spec.SpecificSegmentSpec; @@ -61,17 +57,19 @@ import org.apache.druid.server.SetAndVerifyContextQueryRunner; import org.apache.druid.server.initialization.ServerConfig; import org.apache.druid.timeline.SegmentId; -import org.apache.druid.timeline.TimelineObjectHolder; import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.partition.PartitionChunk; import org.apache.druid.timeline.partition.PartitionHolder; import org.joda.time.Interval; -import javax.annotation.Nullable; import java.util.Collections; +import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicLong; +/** + * Query handler for Historical processes (see CliHistorical). + */ public class ServerManager implements QuerySegmentWalker { private static final EmittingLogger log = new EmittingLogger(ServerManager.class); @@ -111,110 +109,49 @@ public ServerManager( this.serverConfig = serverConfig; } - private DataSource getInnerMostDataSource(DataSource dataSource) - { - if (dataSource instanceof QueryDataSource) { - return getInnerMostDataSource(((QueryDataSource) dataSource).getQuery().getDataSource()); - } - return dataSource; - } - @Override public QueryRunner getQueryRunnerForIntervals(Query query, Iterable intervals) { - final QueryRunnerFactory> factory = conglomerate.findFactory(query); - if (factory == null) { - throw new ISE("Unknown query type[%s].", query.getClass()); - } - - // Sanity check: we cannot actually handle joins yet, so detect them and throw an error. final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource()); if (!analysis.getPreJoinableClauses().isEmpty()) { throw new ISE("Cannot handle join dataSource"); } - final QueryToolChest> toolChest = factory.getToolchest(); - final AtomicLong cpuTimeAccumulator = new AtomicLong(0L); - - DataSource dataSource = getInnerMostDataSource(query.getDataSource()); - if (!(dataSource instanceof TableDataSource)) { - throw new UnsupportedOperationException("data source type '" + dataSource.getClass().getName() + "' unsupported"); - } - String dataSourceName = getDataSourceName(dataSource); - - final VersionedIntervalTimeline timeline = segmentManager.getTimeline( - dataSourceName - ); + final VersionedIntervalTimeline timeline; + final Optional> maybeTimeline = + segmentManager.getTimeline(analysis); - if (timeline == null) { - return new NoopQueryRunner(); + if (maybeTimeline.isPresent()) { + timeline = maybeTimeline.get(); + } else { + // Note: this is not correct when there's a right or full outer join going on. + // See https://github.com/apache/druid/issues/9229 for details. + return new NoopQueryRunner<>(); } - FunctionalIterable> queryRunners = FunctionalIterable + FunctionalIterable segmentDescriptors = FunctionalIterable .create(intervals) + .transformCat(timeline::lookup) .transformCat( - new Function>>() - { - @Override - public Iterable> apply(Interval input) - { - return timeline.lookup(input); - } - } - ) - .transformCat( - new Function, Iterable>>() - { - @Override - public Iterable> apply( - @Nullable final TimelineObjectHolder holder - ) - { - if (holder == null) { - return null; - } - - return FunctionalIterable - .create(holder.getObject()) - .transform( - new Function, QueryRunner>() - { - @Override - public QueryRunner apply(PartitionChunk input) - { - return buildAndDecorateQueryRunner( - factory, - toolChest, - input.getObject(), - new SegmentDescriptor( - holder.getInterval(), - holder.getVersion(), - input.getChunkNumber() - ), - cpuTimeAccumulator - ); - } - } - ); + holder -> { + if (holder == null) { + return null; } + + return FunctionalIterable + .create(holder.getObject()) + .transform( + partitionChunk -> + new SegmentDescriptor( + holder.getInterval(), + holder.getVersion(), + partitionChunk.getChunkNumber() + ) + ); } ); - return CPUTimeMetricQueryRunner.safeBuild( - new FinalizeResultsQueryRunner<>( - toolChest.mergeResults(factory.mergeRunners(exec, queryRunners)), - toolChest - ), - toolChest, - emitter, - cpuTimeAccumulator, - true - ); - } - - private String getDataSourceName(DataSource dataSource) - { - return Iterables.getOnlyElement(dataSource.getTableNames()); + return getQueryRunnerForSegments(query, segmentDescriptors); } @Override @@ -225,7 +162,7 @@ public QueryRunner getQueryRunnerForSegments(Query query, Iterable(); + return new NoopQueryRunner<>(); } // Sanity check: we cannot actually handle joins yet, so detect them and throw an error. @@ -235,48 +172,53 @@ public QueryRunner getQueryRunnerForSegments(Query query, Iterable> toolChest = factory.getToolchest(); + final AtomicLong cpuTimeAccumulator = new AtomicLong(0L); - String dataSourceName = getDataSourceName(query.getDataSource()); - - final VersionedIntervalTimeline timeline = segmentManager.getTimeline( - dataSourceName - ); + final VersionedIntervalTimeline timeline; + final Optional> maybeTimeline = + segmentManager.getTimeline(analysis); - if (timeline == null) { - return new NoopQueryRunner(); + // Make sure this query type can handle the subquery, if present. + if (analysis.isQuery() && !toolChest.canPerformSubquery(((QueryDataSource) analysis.getDataSource()).getQuery())) { + throw new ISE("Cannot handle subquery: %s", analysis.getDataSource()); } - final AtomicLong cpuTimeAccumulator = new AtomicLong(0L); + if (maybeTimeline.isPresent()) { + timeline = maybeTimeline.get(); + } else { + // Note: this is not correct when there's a right or full outer join going on. + // See https://github.com/apache/druid/issues/9229 for details. + return new NoopQueryRunner<>(); + } FunctionalIterable> queryRunners = FunctionalIterable .create(specs) .transformCat( - new Function>>() - { - @Override - @SuppressWarnings("unchecked") - public Iterable> apply(SegmentDescriptor input) - { - - final PartitionHolder entry = timeline.findEntry( - input.getInterval(), input.getVersion() - ); - - if (entry == null) { - return Collections.singletonList( - new ReportTimelineMissingSegmentQueryRunner(input)); - } - - final PartitionChunk chunk = entry.getChunk(input.getPartitionNumber()); - if (chunk == null) { - return Collections.singletonList(new ReportTimelineMissingSegmentQueryRunner(input)); - } - - final ReferenceCountingSegment adapter = chunk.getObject(); - return Collections.singletonList( - buildAndDecorateQueryRunner(factory, toolChest, adapter, input, cpuTimeAccumulator) - ); + descriptor -> { + final PartitionHolder entry = timeline.findEntry( + descriptor.getInterval(), + descriptor.getVersion() + ); + + if (entry == null) { + return Collections.singletonList(new ReportTimelineMissingSegmentQueryRunner<>(descriptor)); } + + final PartitionChunk chunk = entry.getChunk(descriptor.getPartitionNumber()); + if (chunk == null) { + return Collections.singletonList(new ReportTimelineMissingSegmentQueryRunner<>(descriptor)); + } + + final ReferenceCountingSegment segment = chunk.getObject(); + return Collections.singletonList( + buildAndDecorateQueryRunner( + factory, + toolChest, + segment, + descriptor, + cpuTimeAccumulator + ) + ); } ); diff --git a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java index b630b8ec6c31..dd3961e04f42 100644 --- a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java +++ b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java @@ -39,6 +39,7 @@ import org.apache.druid.query.QueryToolChestWarehouse; import org.apache.druid.query.QueryWatcher; import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.segment.TestHelper; import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.server.coordination.ServerType; @@ -114,7 +115,9 @@ public void testSingleServerAddedRemovedSegment() throws Exception Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch)); Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch)); - TimelineLookup timeline = brokerServerView.getTimeline(new TableDataSource("test_broker_server_view")); + TimelineLookup timeline = brokerServerView.getTimeline( + DataSourceAnalysis.forDataSource(new TableDataSource("test_broker_server_view")) + ).get(); List serverLookupRes = (List) timeline.lookup( Intervals.of( "2014-10-20T00:00:00Z/P1D" @@ -203,7 +206,9 @@ public DataSegment apply(Pair input) Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch)); Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch)); - TimelineLookup timeline = brokerServerView.getTimeline(new TableDataSource("test_broker_server_view")); + TimelineLookup timeline = brokerServerView.getTimeline( + DataSourceAnalysis.forDataSource(new TableDataSource("test_broker_server_view")) + ).get(); assertValues( Arrays.asList( createExpected("2011-04-01/2011-04-02", "v3", druidServers.get(4), segments.get(4)), @@ -224,7 +229,9 @@ public DataSegment apply(Pair input) // renew segmentRemovedLatch since we still have 4 segments to unannounce segmentRemovedLatch = new CountDownLatch(4); - timeline = brokerServerView.getTimeline(new TableDataSource("test_broker_server_view")); + timeline = brokerServerView.getTimeline( + DataSourceAnalysis.forDataSource(new TableDataSource("test_broker_server_view")) + ).get(); assertValues( Arrays.asList( createExpected("2011-04-01/2011-04-02", "v3", druidServers.get(4), segments.get(4)), diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java index a75ad64e080c..3efe7bc5d759 100644 --- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java +++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java @@ -38,7 +38,6 @@ import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.io.Closer; -import org.apache.druid.query.DataSource; import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.Druids; import org.apache.druid.query.Query; @@ -47,8 +46,10 @@ import org.apache.druid.query.QueryToolChestWarehouse; import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.context.ResponseContext; +import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.TimelineLookup; import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.partition.NoneShardSpec; import org.apache.druid.timeline.partition.SingleElementPartitionChunk; @@ -64,11 +65,13 @@ import java.util.Collections; import java.util.Comparator; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.ForkJoinPool; /** + * */ public class CachingClusteredClientFunctionalityTest { @@ -245,9 +248,9 @@ public void registerSegmentCallback(Executor exec, SegmentCallback callback) } @Override - public VersionedIntervalTimeline getTimeline(DataSource dataSource) + public Optional> getTimeline(DataSourceAnalysis analysis) { - return timeline; + return Optional.of(timeline); } @Nullable diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java index bd91104aac41..9c0588837ffd 100644 --- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java @@ -68,7 +68,6 @@ import org.apache.druid.java.util.common.guava.nary.TrinaryFn; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.query.BySegmentResultValueClass; -import org.apache.druid.query.DataSource; import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.Druids; import org.apache.druid.query.FinalizeResultsQueryRunner; @@ -98,6 +97,7 @@ import org.apache.druid.query.groupby.ResultRow; import org.apache.druid.query.groupby.strategy.GroupByStrategySelector; import org.apache.druid.query.ordering.StringComparators; +import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.query.search.SearchHit; import org.apache.druid.query.search.SearchQuery; import org.apache.druid.query.search.SearchQueryConfig; @@ -148,6 +148,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Random; import java.util.TreeMap; import java.util.concurrent.Callable; @@ -2391,9 +2392,9 @@ public void registerSegmentCallback(Executor exec, SegmentCallback callback) } @Override - public VersionedIntervalTimeline getTimeline(DataSource dataSource) + public Optional> getTimeline(DataSourceAnalysis analysis) { - return timeline; + return Optional.of(timeline); } @Override diff --git a/server/src/test/java/org/apache/druid/server/ClientInfoResourceTest.java b/server/src/test/java/org/apache/druid/server/ClientInfoResourceTest.java index 60b633d22624..f6c880c0e205 100644 --- a/server/src/test/java/org/apache/druid/server/ClientInfoResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/ClientInfoResourceTest.java @@ -30,8 +30,8 @@ import org.apache.druid.client.selector.RandomServerSelectorStrategy; import org.apache.druid.client.selector.ServerSelector; import org.apache.druid.java.util.common.Intervals; -import org.apache.druid.query.TableDataSource; import org.apache.druid.query.metadata.SegmentMetadataQueryConfig; +import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.security.AuthConfig; import org.apache.druid.timeline.DataSegment; @@ -48,6 +48,7 @@ import java.util.List; import java.util.Map; +import java.util.Optional; public class ClientInfoResourceTest { @@ -128,7 +129,8 @@ public void setup() EasyMock.expect(serverInventoryView.getInventory()).andReturn(ImmutableList.of(server)).anyTimes(); timelineServerView = EasyMock.createMock(TimelineServerView.class); - EasyMock.expect(timelineServerView.getTimeline(EasyMock.anyObject(TableDataSource.class))).andReturn(timeline); + EasyMock.expect(timelineServerView.getTimeline(EasyMock.anyObject(DataSourceAnalysis.class))) + .andReturn((Optional) Optional.of(timeline)); EasyMock.replay(serverInventoryView, timelineServerView); diff --git a/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java b/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java index b6c9c1671fc3..04f796e9918d 100644 --- a/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java +++ b/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java @@ -24,6 +24,8 @@ import com.google.common.collect.Ordering; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.MapUtils; +import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.segment.AbstractSegment; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.ReferenceCountingSegment; @@ -49,6 +51,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -382,7 +385,10 @@ public void testRemoveEmptyTimeline() throws SegmentLoadingException @Test public void testGetNonExistingTimeline() { - Assert.assertNull(segmentManager.getTimeline("nonExisting")); + Assert.assertEquals( + Optional.empty(), + segmentManager.getTimeline(DataSourceAnalysis.forDataSource(new TableDataSource("nonExisting"))) + ); } @Test @@ -448,7 +454,10 @@ private void assertResult(List expectedExistingSegments) throws Seg dataSources.forEach( (sourceName, dataSourceState) -> { Assert.assertEquals(expectedDataSourceCounts.get(sourceName).longValue(), dataSourceState.getNumSegments()); - Assert.assertEquals(expectedDataSourceSizes.get(sourceName).longValue(), dataSourceState.getTotalSegmentSize()); + Assert.assertEquals( + expectedDataSourceSizes.get(sourceName).longValue(), + dataSourceState.getTotalSegmentSize() + ); Assert.assertEquals( expectedDataSources.get(sourceName).getAllTimelineEntries(), dataSourceState.getTimeline().getAllTimelineEntries() diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/QueryMaker.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/QueryMaker.java index 1910bc27b654..5aacb8773f37 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/QueryMaker.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/QueryMaker.java @@ -37,8 +37,9 @@ import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.math.expr.Evals; import org.apache.druid.query.Query; -import org.apache.druid.query.QueryDataSource; import org.apache.druid.query.QueryToolChest; +import org.apache.druid.query.planning.DataSourceAnalysis; +import org.apache.druid.query.spec.QuerySegmentSpec; import org.apache.druid.query.timeseries.TimeseriesQuery; import org.apache.druid.segment.DimensionHandlerUtils; import org.apache.druid.segment.column.ColumnHolder; @@ -48,6 +49,7 @@ import org.apache.druid.sql.calcite.planner.Calcites; import org.apache.druid.sql.calcite.planner.PlannerContext; import org.joda.time.DateTime; +import org.joda.time.Interval; import java.io.IOException; import java.util.Collection; @@ -87,8 +89,7 @@ public Sequence runQuery(final DruidQuery druidQuery) final Query query = druidQuery.getQuery(); if (plannerContext.getPlannerConfig().isRequireTimeCondition()) { - final Query innerMostQuery = findInnerMostQuery(query); - if (innerMostQuery.getIntervals().equals(Intervals.ONLY_ETERNITY)) { + if (Intervals.ONLY_ETERNITY.equals(findBaseDataSourceIntervals(query))) { throw new CannotBuildQueryException( "requireTimeCondition is enabled, all queries must include a filter condition on the __time column" ); @@ -121,13 +122,12 @@ public Sequence runQuery(final DruidQuery druidQuery) ); } - private Query findInnerMostQuery(Query outerQuery) + private List findBaseDataSourceIntervals(Query query) { - Query query = outerQuery; - while (query.getDataSource() instanceof QueryDataSource) { - query = ((QueryDataSource) query.getDataSource()).getQuery(); - } - return query; + return DataSourceAnalysis.forDataSource(query.getDataSource()) + .getBaseQuerySegmentSpec() + .map(QuerySegmentSpec::getIntervals) + .orElse(query.getIntervals()); } private Sequence execute(Query query, final List newFields, final List newTypes) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java index 0f36273035a7..a8e498beb1f0 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java @@ -26,8 +26,8 @@ import org.apache.druid.client.ImmutableDruidServer; import org.apache.druid.client.TimelineServerView; import org.apache.druid.client.selector.ServerSelector; -import org.apache.druid.query.DataSource; import org.apache.druid.query.QueryRunner; +import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.timeline.DataSegment; @@ -37,6 +37,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.concurrent.Executor; /** @@ -77,7 +78,7 @@ public TestServerInventoryView(List segments, List rea } @Override - public TimelineLookup getTimeline(DataSource dataSource) + public Optional> getTimeline(DataSourceAnalysis analysis) { throw new UnsupportedOperationException(); }