diff --git a/docs/configuration/index.md b/docs/configuration/index.md index e6650b1cd38a..dc43390ed98b 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -1675,6 +1675,7 @@ The Druid SQL server is configured through the following properties on the Broke |`druid.sql.planner.maxTopNLimit`|Maximum threshold for a [TopN query](../querying/topnquery.md). Higher limits will be planned as [GroupBy queries](../querying/groupbyquery.md) instead.|100000| |`druid.sql.planner.metadataRefreshPeriod`|Throttle for metadata refreshes.|PT1M| |`druid.sql.planner.useApproximateCountDistinct`|Whether to use an approximate cardinality algorithm for `COUNT(DISTINCT foo)`.|true| +|`druid.sql.planner.useGroupingSetForExactDistinct`|Only relevant when `useApproximateCountDistinct` is disabled. If set to true, exact distinct queries are re-written using grouping sets. Otherwise, exact distinct queries are re-written using joins. This should be set to true for group by query with multiple exact distinct aggregations. This flag can be overridden per query.|false| |`druid.sql.planner.useApproximateTopN`|Whether to use approximate [TopN queries](../querying/topnquery.md) when a SQL query could be expressed as such. If false, exact [GroupBy queries](../querying/groupbyquery.md) will be used instead.|true| |`druid.sql.planner.requireTimeCondition`|Whether to require SQL to have filter conditions on __time column so that all generated native queries will have user specified intervals. If true, all queries without filter condition on __time column will fail|false| |`druid.sql.planner.sqlTimeZone`|Sets the default time zone for the server, which will affect how time functions and timestamp literals behave. Should be a time zone name like "America/Los_Angeles" or offset like "-08:00".|UTC| diff --git a/docs/querying/sql.md b/docs/querying/sql.md index 38625cb190a8..c05b648cd647 100644 --- a/docs/querying/sql.md +++ b/docs/querying/sql.md @@ -322,7 +322,7 @@ Only the COUNT aggregation can accept DISTINCT. |Function|Notes| |--------|-----| |`COUNT(*)`|Counts the number of rows.| -|`COUNT(DISTINCT expr)`|Counts distinct values of expr, which can be string, numeric, or hyperUnique. By default this is approximate, using a variant of [HyperLogLog](http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf). To get exact counts set "useApproximateCountDistinct" to "false". If you do this, expr must be string or numeric, since exact counts are not possible using hyperUnique columns. See also `APPROX_COUNT_DISTINCT(expr)`. In exact mode, only one distinct count per query is permitted.| +|`COUNT(DISTINCT expr)`|Counts distinct values of expr, which can be string, numeric, or hyperUnique. By default this is approximate, using a variant of [HyperLogLog](http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf). To get exact counts set "useApproximateCountDistinct" to "false". If you do this, expr must be string or numeric, since exact counts are not possible using hyperUnique columns. See also `APPROX_COUNT_DISTINCT(expr)`. In exact mode, only one distinct count per query is permitted unless `useGroupingSetForExactDistinct` is set to true in query contexts or broker configurations.| |`SUM(expr)`|Sums numbers.| |`MIN(expr)`|Takes the minimum of numbers.| |`MAX(expr)`|Takes the maximum of numbers.| @@ -1007,6 +1007,7 @@ Connection context can be specified as JDBC connection properties or as a "conte |`sqlQueryId`|Unique identifier given to this SQL query. For HTTP client, it will be returned in `X-Druid-SQL-Query-Id` header.|auto-generated| |`sqlTimeZone`|Sets the time zone for this connection, which will affect how time functions and timestamp literals behave. Should be a time zone name like "America/Los_Angeles" or offset like "-08:00".|druid.sql.planner.sqlTimeZone on the Broker (default: UTC)| |`useApproximateCountDistinct`|Whether to use an approximate cardinality algorithm for `COUNT(DISTINCT foo)`.|druid.sql.planner.useApproximateCountDistinct on the Broker (default: true)| +|`useGroupingSetForExactDistinct`|Whether to use grouping sets to execute queries with multiple exact distinct aggregations.|druid.sql.planner.useGroupingSetForExactDistinct on the Broker (default: false)| |`useApproximateTopN`|Whether to use approximate [TopN queries](topnquery.md) when a SQL query could be expressed as such. If false, exact [GroupBy queries](groupbyquery.md) will be used instead.|druid.sql.planner.useApproximateTopN on the Broker (default: true)| ## Metadata tables diff --git a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java index 95338f5e6134..23d904ac2ffb 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java @@ -20,6 +20,7 @@ package org.apache.druid.query.groupby.strategy; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Supplier; import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableMap; @@ -85,8 +86,8 @@ public class GroupByStrategyV2 implements GroupByStrategy public static final String CTX_KEY_FUDGE_TIMESTAMP = "fudgeTimestamp"; public static final String CTX_KEY_OUTERMOST = "groupByOutermost"; - // see countRequiredMergeBufferNum() for explanation - private static final int MAX_MERGE_BUFFER_NUM = 2; + // see countRequiredMergeBufferNumWithoutSubtotal() for explanation + private static final int MAX_MERGE_BUFFER_NUM_WITHOUT_SUBTOTAL = 2; private final DruidProcessingConfig processingConfig; private final Supplier configSupplier; @@ -116,8 +117,7 @@ public GroupByStrategyV2( @Override public GroupByQueryResource prepareResource(GroupByQuery query) { - final int requiredMergeBufferNum = countRequiredMergeBufferNum(query, 1) + - numMergeBuffersNeededForSubtotalsSpec(query); + final int requiredMergeBufferNum = countRequiredMergeBufferNum(query); if (requiredMergeBufferNum > mergeBufferPool.maxSize()) { throw new ResourceLimitExceededException( @@ -146,7 +146,13 @@ public GroupByQueryResource prepareResource(GroupByQuery query) } } - private static int countRequiredMergeBufferNum(Query query, int foundNum) + @VisibleForTesting + public static int countRequiredMergeBufferNum(GroupByQuery query) + { + return countRequiredMergeBufferNumWithoutSubtotal(query, 1) + numMergeBuffersNeededForSubtotalsSpec(query); + } + + private static int countRequiredMergeBufferNumWithoutSubtotal(Query query, int foundNum) { // Note: A broker requires merge buffers for processing the groupBy layers beyond the inner-most one. // For example, the number of required merge buffers for a nested groupBy (groupBy -> groupBy -> table) is 1. @@ -156,10 +162,10 @@ private static int countRequiredMergeBufferNum(Query query, int foundNum) // This is same for subsequent groupBy layers, and thus the maximum number of required merge buffers becomes 2. final DataSource dataSource = query.getDataSource(); - if (foundNum == MAX_MERGE_BUFFER_NUM + 1 || !(dataSource instanceof QueryDataSource)) { + if (foundNum == MAX_MERGE_BUFFER_NUM_WITHOUT_SUBTOTAL + 1 || !(dataSource instanceof QueryDataSource)) { return foundNum - 1; } else { - return countRequiredMergeBufferNum(((QueryDataSource) dataSource).getQuery(), foundNum + 1); + return countRequiredMergeBufferNumWithoutSubtotal(((QueryDataSource) dataSource).getQuery(), foundNum + 1); } } @@ -522,11 +528,20 @@ private Set getAggregatorAndPostAggregatorNames(GroupByQuery query) return aggsAndPostAggs; } - private int numMergeBuffersNeededForSubtotalsSpec(GroupByQuery query) + private static int numMergeBuffersNeededForSubtotalsSpec(GroupByQuery query) { List> subtotalSpecs = query.getSubtotalsSpec(); + final DataSource dataSource = query.getDataSource(); + int numMergeBuffersNeededForSubQuerySubtotal = 0; + if (dataSource instanceof QueryDataSource) { + Query subQuery = ((QueryDataSource) dataSource).getQuery(); + if (subQuery instanceof GroupByQuery) { + numMergeBuffersNeededForSubQuerySubtotal = numMergeBuffersNeededForSubtotalsSpec((GroupByQuery) subQuery); + } + + } if (subtotalSpecs == null || subtotalSpecs.size() == 0) { - return 0; + return numMergeBuffersNeededForSubQuerySubtotal; } List queryDimOutputNames = query.getDimensions().stream().map(DimensionSpec::getOutputName).collect( @@ -537,7 +552,7 @@ private int numMergeBuffersNeededForSubtotalsSpec(GroupByQuery query) } } - return 1; + return Math.max(1, numMergeBuffersNeededForSubQuerySubtotal); } @Override diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryMergeBufferTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryMergeBufferTest.java index 414a89fae696..faed09af603e 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryMergeBufferTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryMergeBufferTest.java @@ -51,6 +51,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -109,7 +110,7 @@ public int intermediateComputeSizeBytes() @Override public int getNumMergeBuffers() { - return 3; + return 4; } @Override @@ -211,10 +212,11 @@ public void testSimpleGroupBy() .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, TIMEOUT)) .build(); + Assert.assertEquals(0, GroupByStrategyV2.countRequiredMergeBufferNum(query)); GroupByQueryRunnerTestHelper.runQuery(FACTORY, runner, query); - Assert.assertEquals(2, MERGE_BUFFER_POOL.getMinRemainBufferNum()); - Assert.assertEquals(3, MERGE_BUFFER_POOL.getPoolSize()); + Assert.assertEquals(3, MERGE_BUFFER_POOL.getMinRemainBufferNum()); + Assert.assertEquals(4, MERGE_BUFFER_POOL.getPoolSize()); } @Test @@ -239,10 +241,11 @@ public void testNestedGroupBy() .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, TIMEOUT)) .build(); + Assert.assertEquals(1, GroupByStrategyV2.countRequiredMergeBufferNum(query)); GroupByQueryRunnerTestHelper.runQuery(FACTORY, runner, query); - Assert.assertEquals(1, MERGE_BUFFER_POOL.getMinRemainBufferNum()); - Assert.assertEquals(3, MERGE_BUFFER_POOL.getPoolSize()); + Assert.assertEquals(2, MERGE_BUFFER_POOL.getMinRemainBufferNum()); + Assert.assertEquals(4, MERGE_BUFFER_POOL.getPoolSize()); } @Test @@ -278,11 +281,12 @@ public void testDoubleNestedGroupBy() .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, TIMEOUT)) .build(); + Assert.assertEquals(2, GroupByStrategyV2.countRequiredMergeBufferNum(query)); GroupByQueryRunnerTestHelper.runQuery(FACTORY, runner, query); - // This should be 0 because the broker needs 2 buffers and the queryable node needs one. - Assert.assertEquals(0, MERGE_BUFFER_POOL.getMinRemainBufferNum()); - Assert.assertEquals(3, MERGE_BUFFER_POOL.getPoolSize()); + // This should be 1 because the broker needs 2 buffers and the queryable node needs one. + Assert.assertEquals(1, MERGE_BUFFER_POOL.getMinRemainBufferNum()); + Assert.assertEquals(4, MERGE_BUFFER_POOL.getPoolSize()); } @Test @@ -331,10 +335,157 @@ public void testTripleNestedGroupBy() .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, TIMEOUT)) .build(); + Assert.assertEquals(2, GroupByStrategyV2.countRequiredMergeBufferNum(query)); + GroupByQueryRunnerTestHelper.runQuery(FACTORY, runner, query); + + // This should be 1 because the broker needs 2 buffers and the queryable node needs one. + Assert.assertEquals(1, MERGE_BUFFER_POOL.getMinRemainBufferNum()); + Assert.assertEquals(4, MERGE_BUFFER_POOL.getPoolSize()); + } + + @Test + public void testSimpleGroupByWithSubtotals() + { + final GroupByQuery query = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.DATA_SOURCE) + .setDimensions(Arrays.asList( + DefaultDimensionSpec.of(QueryRunnerTestHelper.MARKET_DIMENSION), + DefaultDimensionSpec.of(QueryRunnerTestHelper.PLACEMENT_DIMENSION), + DefaultDimensionSpec.of(QueryRunnerTestHelper.QUALITY_DIMENSION) + )) + .setGranularity(Granularities.ALL) + .setInterval(QueryRunnerTestHelper.FIRST_TO_THIRD) + .setAggregatorSpecs(new LongSumAggregatorFactory("rows", "rows")) + .setSubtotalsSpec(Arrays.asList( + Arrays.asList(QueryRunnerTestHelper.MARKET_DIMENSION, QueryRunnerTestHelper.PLACEMENT_DIMENSION), + Arrays.asList(QueryRunnerTestHelper.MARKET_DIMENSION, QueryRunnerTestHelper.PLACEMENT_DIMENSION, QueryRunnerTestHelper.QUALITY_DIMENSION) + )) + .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, TIMEOUT)) + .build(); + + Assert.assertEquals(1, GroupByStrategyV2.countRequiredMergeBufferNum(query)); + GroupByQueryRunnerTestHelper.runQuery(FACTORY, runner, query); + + // 1 for subtotal and 1 for GroupByQueryRunnerFactory#mergeRunners + Assert.assertEquals(2, MERGE_BUFFER_POOL.getMinRemainBufferNum()); + Assert.assertEquals(4, MERGE_BUFFER_POOL.getPoolSize()); + } + + @Test + public void testSimpleGroupByWithSubtotalsWithoutPrefixMatch() + { + final GroupByQuery query = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.DATA_SOURCE) + .setDimensions(Arrays.asList( + DefaultDimensionSpec.of(QueryRunnerTestHelper.MARKET_DIMENSION), + DefaultDimensionSpec.of(QueryRunnerTestHelper.PLACEMENT_DIMENSION), + DefaultDimensionSpec.of(QueryRunnerTestHelper.QUALITY_DIMENSION) + )) + .setGranularity(Granularities.ALL) + .setInterval(QueryRunnerTestHelper.FIRST_TO_THIRD) + .setAggregatorSpecs(new LongSumAggregatorFactory("rows", "rows")) + .setSubtotalsSpec(Arrays.asList( + Arrays.asList(QueryRunnerTestHelper.MARKET_DIMENSION, QueryRunnerTestHelper.PLACEMENT_DIMENSION), + Arrays.asList(QueryRunnerTestHelper.MARKET_DIMENSION, QueryRunnerTestHelper.QUALITY_DIMENSION) + )) + .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, TIMEOUT)) + .build(); + + Assert.assertEquals(2, GroupByStrategyV2.countRequiredMergeBufferNum(query)); + GroupByQueryRunnerTestHelper.runQuery(FACTORY, runner, query); + + // 2 needed by subtotal and 1 for GroupByQueryRunnerFactory#mergeRunners + Assert.assertEquals(1, MERGE_BUFFER_POOL.getMinRemainBufferNum()); + Assert.assertEquals(4, MERGE_BUFFER_POOL.getPoolSize()); + } + + @Test + public void testNestedGroupByWithSubtotals() + { + final GroupByQuery query = GroupByQuery + .builder() + .setDataSource( + new QueryDataSource( + GroupByQuery.builder() + .setDataSource(QueryRunnerTestHelper.DATA_SOURCE) + .setInterval(QueryRunnerTestHelper.FIRST_TO_THIRD) + .setGranularity(Granularities.ALL) + .setDimensions(Arrays.asList( + DefaultDimensionSpec.of("quality"), + DefaultDimensionSpec.of("market"), + DefaultDimensionSpec.of("placement") + )) + .setAggregatorSpecs(Collections.singletonList(QueryRunnerTestHelper.ROWS_COUNT)) + .build() + ) + ) + .setGranularity(Granularities.ALL) + .setInterval(QueryRunnerTestHelper.FIRST_TO_THIRD) + .setDimensions(Arrays.asList( + DefaultDimensionSpec.of("quality"), + DefaultDimensionSpec.of("market") + )) + .setSubtotalsSpec(Arrays.asList( + Collections.singletonList("quality"), + Collections.singletonList("market") + )) + .setAggregatorSpecs(new LongSumAggregatorFactory("rows", "rows")) + .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, TIMEOUT)) + .build(); + + Assert.assertEquals(3, GroupByStrategyV2.countRequiredMergeBufferNum(query)); + GroupByQueryRunnerTestHelper.runQuery(FACTORY, runner, query); + + // 2 for subtotal, 1 for nested group by and 1 for GroupByQueryRunnerFactory#mergeRunners + Assert.assertEquals(0, MERGE_BUFFER_POOL.getMinRemainBufferNum()); + Assert.assertEquals(4, MERGE_BUFFER_POOL.getPoolSize()); + } + + @Test + public void testNestedGroupByWithNestedSubtotals() + { + final GroupByQuery query = GroupByQuery + .builder() + .setDataSource( + new QueryDataSource( + GroupByQuery.builder() + .setDataSource(QueryRunnerTestHelper.DATA_SOURCE) + .setInterval(QueryRunnerTestHelper.FIRST_TO_THIRD) + .setGranularity(Granularities.ALL) + .setDimensions(Arrays.asList( + DefaultDimensionSpec.of("quality"), + DefaultDimensionSpec.of("market"), + DefaultDimensionSpec.of("placement") + )) + .setSubtotalsSpec(Arrays.asList( + Collections.singletonList("quality"), + Collections.singletonList("market") + )) + .setAggregatorSpecs(Collections.singletonList(QueryRunnerTestHelper.ROWS_COUNT)) + .build() + ) + ) + .setGranularity(Granularities.ALL) + .setInterval(QueryRunnerTestHelper.FIRST_TO_THIRD) + .setDimensions(Arrays.asList( + DefaultDimensionSpec.of("quality"), + DefaultDimensionSpec.of("market") + )) + .setSubtotalsSpec(Arrays.asList( + Collections.singletonList("quality"), + Collections.singletonList("market") + )) + .setAggregatorSpecs(new LongSumAggregatorFactory("rows", "rows")) + .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, TIMEOUT)) + .build(); + + Assert.assertEquals(3, GroupByStrategyV2.countRequiredMergeBufferNum(query)); GroupByQueryRunnerTestHelper.runQuery(FACTORY, runner, query); - // This should be 0 because the broker needs 2 buffers and the queryable node needs one. + // 2 for subtotal, 1 for nested group by and 1 for GroupByQueryRunnerFactory#mergeRunners Assert.assertEquals(0, MERGE_BUFFER_POOL.getMinRemainBufferNum()); - Assert.assertEquals(3, MERGE_BUFFER_POOL.getPoolSize()); + Assert.assertEquals(4, MERGE_BUFFER_POOL.getPoolSize()); } } diff --git a/server/src/test/java/org/apache/druid/query/QueryRunnerBasedOnClusteredClientTestBase.java b/server/src/test/java/org/apache/druid/query/QueryRunnerBasedOnClusteredClientTestBase.java index 6805c102ba9f..ae0c269adf6b 100644 --- a/server/src/test/java/org/apache/druid/query/QueryRunnerBasedOnClusteredClientTestBase.java +++ b/server/src/test/java/org/apache/druid/query/QueryRunnerBasedOnClusteredClientTestBase.java @@ -139,7 +139,10 @@ public void setupTestBase() new ForegroundCachePopulator(objectMapper, new CachePopulatorStats(), 0), new CacheConfig(), new DruidHttpClientConfig(), - QueryStackTests.getProcessingConfig(USE_PARALLEL_MERGE_POOL_CONFIGURED), + QueryStackTests.getProcessingConfig( + USE_PARALLEL_MERGE_POOL_CONFIGURED, + DruidProcessingConfig.DEFAULT_NUM_MERGE_BUFFERS + ), ForkJoinPool.commonPool(), QueryStackTests.DEFAULT_NOOP_SCHEDULER, new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()) diff --git a/server/src/test/java/org/apache/druid/server/QueryStackTests.java b/server/src/test/java/org/apache/druid/server/QueryStackTests.java index 3c6072ad0b15..074649f26a61 100644 --- a/server/src/test/java/org/apache/druid/server/QueryStackTests.java +++ b/server/src/test/java/org/apache/druid/server/QueryStackTests.java @@ -180,7 +180,10 @@ public static LocalQuerySegmentWalker createLocalQuerySegmentWalker( ); } - public static DruidProcessingConfig getProcessingConfig(boolean useParallelMergePoolConfigured) + public static DruidProcessingConfig getProcessingConfig( + boolean useParallelMergePoolConfigured, + final int mergeBuffers + ) { return new DruidProcessingConfig() { @@ -206,9 +209,10 @@ public int getNumThreads() @Override public int getNumMergeBuffers() { - // Need 3 buffers for CalciteQueryTest.testDoubleNestedGroupby. - // Two buffers for the broker and one for the queryable. - return 3; + if (mergeBuffers == DEFAULT_NUM_MERGE_BUFFERS) { + return 2; + } + return mergeBuffers; } @Override @@ -230,9 +234,15 @@ public static QueryRunnerFactoryConglomerate createQueryRunnerFactoryConglomerat public static QueryRunnerFactoryConglomerate createQueryRunnerFactoryConglomerate( final Closer closer, final boolean useParallelMergePoolConfigured + ) { - return createQueryRunnerFactoryConglomerate(closer, getProcessingConfig(useParallelMergePoolConfigured)); + return createQueryRunnerFactoryConglomerate(closer, + getProcessingConfig( + useParallelMergePoolConfigured, + DruidProcessingConfig.DEFAULT_NUM_MERGE_BUFFERS + ) + ); } public static QueryRunnerFactoryConglomerate createQueryRunnerFactoryConglomerate( diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerConfig.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerConfig.java index 34ef9afe3652..f8da5cbb0fb8 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerConfig.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerConfig.java @@ -30,6 +30,7 @@ public class PlannerConfig { public static final String CTX_KEY_USE_APPROXIMATE_COUNT_DISTINCT = "useApproximateCountDistinct"; + public static final String CTX_KEY_USE_GROUPING_SET_FOR_EXACT_DISTINCT = "useGroupingSetForExactDistinct"; public static final String CTX_KEY_USE_APPROXIMATE_TOPN = "useApproximateTopN"; @JsonProperty @@ -59,6 +60,9 @@ public class PlannerConfig @JsonProperty private long metadataSegmentPollPeriod = 60000; + @JsonProperty + private boolean useGroupingSetForExactDistinct = false; + public long getMetadataSegmentPollPeriod() { return metadataSegmentPollPeriod; @@ -86,6 +90,11 @@ public boolean isUseApproximateCountDistinct() return useApproximateCountDistinct; } + public boolean isUseGroupingSetForExactDistinct() + { + return useGroupingSetForExactDistinct; + } + public boolean isUseApproximateTopN() { return useApproximateTopN; @@ -125,6 +134,11 @@ public PlannerConfig withOverrides(final Map context) CTX_KEY_USE_APPROXIMATE_COUNT_DISTINCT, isUseApproximateCountDistinct() ); + newConfig.useGroupingSetForExactDistinct = getContextBoolean( + context, + CTX_KEY_USE_GROUPING_SET_FOR_EXACT_DISTINCT, + isUseGroupingSetForExactDistinct() + ); newConfig.useApproximateTopN = getContextBoolean( context, CTX_KEY_USE_APPROXIMATE_TOPN, diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/Rules.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/Rules.java index f39aab5c24fd..8425d55bc307 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/Rules.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/Rules.java @@ -268,9 +268,11 @@ private static List baseRuleSet(final PlannerContext plannerContext) rules.addAll(ABSTRACT_RELATIONAL_RULES); if (!plannerConfig.isUseApproximateCountDistinct()) { - // For some reason, even though we support grouping sets, using AggregateExpandDistinctAggregatesRule.INSTANCE - // here causes CalciteQueryTest#testExactCountDistinctWithGroupingAndOtherAggregators to fail. - rules.add(AggregateExpandDistinctAggregatesRule.JOIN); + if (plannerConfig.isUseGroupingSetForExactDistinct()) { + rules.add(AggregateExpandDistinctAggregatesRule.INSTANCE); + } else { + rules.add(AggregateExpandDistinctAggregatesRule.JOIN); + } } // Rules that we wrote. diff --git a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java b/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java index 909287e1d8e5..0392c18958db 100644 --- a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java +++ b/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java @@ -432,6 +432,12 @@ public void testDatabaseMetaDataTables() throws Exception Pair.of("TABLE_NAME", CalciteTests.SOMEXDATASOURCE), Pair.of("TABLE_SCHEM", "druid"), Pair.of("TABLE_TYPE", "TABLE") + ), + row( + Pair.of("TABLE_CAT", "druid"), + Pair.of("TABLE_NAME", CalciteTests.USERVISITDATASOURCE), + Pair.of("TABLE_SCHEM", "druid"), + Pair.of("TABLE_TYPE", "TABLE") ) ), getRows( @@ -500,6 +506,12 @@ public void testDatabaseMetaDataTablesAsSuperuser() throws Exception Pair.of("TABLE_NAME", CalciteTests.SOMEXDATASOURCE), Pair.of("TABLE_SCHEM", "druid"), Pair.of("TABLE_TYPE", "TABLE") + ), + row( + Pair.of("TABLE_CAT", "druid"), + Pair.of("TABLE_NAME", CalciteTests.USERVISITDATASOURCE), + Pair.of("TABLE_SCHEM", "druid"), + Pair.of("TABLE_TYPE", "TABLE") ) ), getRows( diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java index 8e039263b32e..8396d380043c 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java @@ -891,4 +891,16 @@ protected Map withLeftDirectAccessEnabled(Map co newContext.put(QueryContexts.SQL_JOIN_LEFT_SCAN_DIRECT, true); return newContext; } + + /** + * Reset the walker and conglomerate with required number of merge buffers. Default value is 2. + */ + protected void requireMergeBuffers(int numMergeBuffers) throws IOException + { + conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate( + resourceCloser, + QueryStackTests.getProcessingConfig(true, numMergeBuffers) + ); + walker = CalciteTests.createMockWalker(conglomerate, temporaryFolder.newFolder()); + } } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCorrelatedQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCorrelatedQueryTest.java index 3a125d50eb87..46c39915ff95 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCorrelatedQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCorrelatedQueryTest.java @@ -20,12 +20,8 @@ package org.apache.druid.sql.calcite; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import junitparams.JUnitParamsRunner; import junitparams.Parameters; -import org.apache.druid.data.input.InputRow; -import org.apache.druid.data.input.MapBasedInputRow; -import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.granularity.AllGranularity; import org.apache.druid.query.QueryDataSource; @@ -42,58 +38,21 @@ import org.apache.druid.query.dimension.DefaultDimensionSpec; import org.apache.druid.query.expression.TestExprMacroTable; import org.apache.druid.query.groupby.GroupByQuery; -import org.apache.druid.segment.IndexBuilder; -import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.column.ValueType; -import org.apache.druid.segment.incremental.IncrementalIndexSchema; import org.apache.druid.segment.join.JoinType; import org.apache.druid.segment.virtual.ExpressionVirtualColumn; -import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; import org.apache.druid.sql.calcite.expression.DruidExpression; -import org.apache.druid.timeline.DataSegment; -import org.apache.druid.timeline.partition.LinearShardSpec; -import org.junit.Before; +import org.apache.druid.sql.calcite.util.CalciteTests; import org.junit.Test; import org.junit.runner.RunWith; -import java.io.File; import java.util.Arrays; import java.util.Collections; -import java.util.List; import java.util.Map; @RunWith(JUnitParamsRunner.class) public class CalciteCorrelatedQueryTest extends BaseCalciteQueryTest { - private static final IncrementalIndexSchema INDEX_SCHEMA = new IncrementalIndexSchema.Builder() - .withMetrics( - new CountAggregatorFactory("cnt") - ) - .withRollup(false) - .withMinTimestamp(DateTimes.of("2020-12-31").getMillis()) - .build(); - private static final List DIMENSIONS = ImmutableList.of("user", "country", "city"); - - @Before - public void setup() throws Exception - { - final QueryableIndex index1 = IndexBuilder - .create() - .tmpDir(new File(temporaryFolder.newFolder(), "1")) - .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance()) - .schema(INDEX_SCHEMA) - .rows(getRawRows()) - .buildMMappedIndex(); - final DataSegment segment = DataSegment.builder() - .dataSource("visits") - .interval(index1.getDataInterval()) - .version("1") - .shardSpec(new LinearShardSpec(0)) - .size(0) - .build(); - walker.add(segment, index1); - - } @Test @Parameters(source = QueryContextForJoinProvider.class) @@ -115,12 +74,12 @@ public void testCorrelatedSubquery(Map queryContext) throws Exce GroupByQuery.builder() .setDataSource( join( - new TableDataSource("visits"), + new TableDataSource(CalciteTests.USERVISITDATASOURCE), new QueryDataSource( GroupByQuery.builder() .setDataSource( GroupByQuery.builder() - .setDataSource("visits") + .setDataSource(CalciteTests.USERVISITDATASOURCE) .setQuerySegmentSpec(querySegmentSpec(Intervals.ETERNITY)) .setVirtualColumns(new ExpressionVirtualColumn( "v0", @@ -222,12 +181,12 @@ public void testCorrelatedSubqueryWithLeftFilter(Map queryContex GroupByQuery.builder() .setDataSource( join( - new TableDataSource("visits"), + new TableDataSource(CalciteTests.USERVISITDATASOURCE), new QueryDataSource( GroupByQuery.builder() .setDataSource( GroupByQuery.builder() - .setDataSource("visits") + .setDataSource(CalciteTests.USERVISITDATASOURCE) .setQuerySegmentSpec(querySegmentSpec(Intervals.ETERNITY)) .setVirtualColumns(new ExpressionVirtualColumn( "v0", @@ -304,7 +263,7 @@ public void testCorrelatedSubqueryWithLeftFilter_leftDirectAccessDisabled(Map GroupByQuery.builder() .setDataSource( join( - new TableDataSource("visits"), + new TableDataSource(CalciteTests.USERVISITDATASOURCE), new QueryDataSource( GroupByQuery.builder() .setDataSource( GroupByQuery.builder() - .setDataSource("visits") + .setDataSource(CalciteTests.USERVISITDATASOURCE) .setQuerySegmentSpec(querySegmentSpec(Intervals.ETERNITY)) .setVirtualColumns(new ExpressionVirtualColumn( "v0", @@ -477,12 +436,12 @@ public void testCorrelatedSubqueryWithCorrelatedQueryFilter_Scan(Map getRawRows() - { - return ImmutableList.of( - toRow("2021-01-01T01:00:00Z", ImmutableMap.of("user", "alice", "country", "canada", "city", "A")), - toRow("2021-01-01T02:00:00Z", ImmutableMap.of("user", "alice", "country", "canada", "city", "B")), - toRow("2021-01-01T03:00:00Z", ImmutableMap.of("user", "bob", "country", "canada", "city", "A")), - toRow("2021-01-01T04:00:00Z", ImmutableMap.of("user", "alice", "country", "India", "city", "Y")), - toRow("2021-01-02T01:00:00Z", ImmutableMap.of("user", "alice", "country", "canada", "city", "A")), - toRow("2021-01-02T02:00:00Z", ImmutableMap.of("user", "bob", "country", "canada", "city", "A")), - toRow("2021-01-02T03:00:00Z", ImmutableMap.of("user", "foo", "country", "canada", "city", "B")), - toRow("2021-01-02T04:00:00Z", ImmutableMap.of("user", "bar", "country", "canada", "city", "B")), - toRow("2021-01-02T05:00:00Z", ImmutableMap.of("user", "alice", "country", "India", "city", "X")), - toRow("2021-01-02T06:00:00Z", ImmutableMap.of("user", "bob", "country", "India", "city", "X")), - toRow("2021-01-02T07:00:00Z", ImmutableMap.of("user", "foo", "country", "India", "city", "X")), - toRow("2021-01-03T01:00:00Z", ImmutableMap.of("user", "foo", "country", "USA", "city", "M")) - ); - } - - private MapBasedInputRow toRow(String time, Map event) - { - return new MapBasedInputRow(DateTimes.ISO_DATE_OPTIONAL_TIME.parse(time), DIMENSIONS, event); - } } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index f91e7832836a..3e94f3e7d4ed 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -111,6 +111,7 @@ import org.apache.druid.sql.calcite.expression.DruidExpression; import org.apache.druid.sql.calcite.filtration.Filtration; import org.apache.druid.sql.calcite.planner.Calcites; +import org.apache.druid.sql.calcite.planner.PlannerConfig; import org.apache.druid.sql.calcite.planner.PlannerContext; import org.apache.druid.sql.calcite.rel.CannotBuildQueryException; import org.apache.druid.sql.calcite.util.CalciteTests; @@ -877,6 +878,7 @@ public void testInformationSchemaTables() throws Exception .add(new Object[]{"druid", CalciteTests.DATASOURCE3, "TABLE", "NO", "NO"}) .add(new Object[]{"druid", CalciteTests.SOME_DATASOURCE, "TABLE", "NO", "NO"}) .add(new Object[]{"druid", CalciteTests.SOMEXDATASOURCE, "TABLE", "NO", "NO"}) + .add(new Object[]{"druid", CalciteTests.USERVISITDATASOURCE, "TABLE", "NO", "NO"}) .add(new Object[]{"INFORMATION_SCHEMA", "COLUMNS", "SYSTEM_TABLE", "NO", "NO"}) .add(new Object[]{"INFORMATION_SCHEMA", "SCHEMATA", "SYSTEM_TABLE", "NO", "NO"}) .add(new Object[]{"INFORMATION_SCHEMA", "TABLES", "SYSTEM_TABLE", "NO", "NO"}) @@ -912,6 +914,7 @@ public void testInformationSchemaTables() throws Exception .add(new Object[]{"druid", CalciteTests.DATASOURCE3, "TABLE", "NO", "NO"}) .add(new Object[]{"druid", CalciteTests.SOME_DATASOURCE, "TABLE", "NO", "NO"}) .add(new Object[]{"druid", CalciteTests.SOMEXDATASOURCE, "TABLE", "NO", "NO"}) + .add(new Object[]{"druid", CalciteTests.USERVISITDATASOURCE, "TABLE", "NO", "NO"}) .add(new Object[]{"INFORMATION_SCHEMA", "COLUMNS", "SYSTEM_TABLE", "NO", "NO"}) .add(new Object[]{"INFORMATION_SCHEMA", "SCHEMATA", "SYSTEM_TABLE", "NO", "NO"}) .add(new Object[]{"INFORMATION_SCHEMA", "TABLES", "SYSTEM_TABLE", "NO", "NO"}) @@ -7607,6 +7610,72 @@ public void testExactCountDistinctWithGroupingAndOtherAggregators() throws Excep ); } + @Test + public void testMultipleExactCountDistinctWithGroupingAndOtherAggregators() throws Exception + { + requireMergeBuffers(4); + testQuery( + PLANNER_CONFIG_NO_HLL.withOverrides(ImmutableMap.of(PlannerConfig.CTX_KEY_USE_GROUPING_SET_FOR_EXACT_DISTINCT, "true")), + "SELECT FLOOR(__time to day), COUNT(distinct city), COUNT(distinct user) FROM druid.visits GROUP BY 1", + CalciteTests.REGULAR_USER_AUTH_RESULT, + ImmutableList.of( + GroupByQuery.builder() + .setDataSource( + new QueryDataSource( + GroupByQuery.builder() + .setDataSource(CalciteTests.USERVISITDATASOURCE) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setVirtualColumns(expressionVirtualColumn( + "v0", + "timestamp_floor(\"__time\",'P1D',null,'UTC')", + ValueType.LONG + )) + .setDimensions(dimensions( + new DefaultDimensionSpec("v0", "d0", ValueType.LONG), + new DefaultDimensionSpec("city", "d1"), + new DefaultDimensionSpec("user", "d2") + )) + .setAggregatorSpecs(aggregators(new GroupingAggregatorFactory( + "a0", + Arrays.asList( + "v0", + "city", + "user" + ) + ))) + .setSubtotalsSpec(ImmutableList.of( + ImmutableList.of("d0", "d1"), + ImmutableList.of("d0", "d2") + )) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ) + ) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setDimensions(dimensions(new DefaultDimensionSpec("d0", "_d0", ValueType.LONG))) + .setAggregatorSpecs(aggregators( + new FilteredAggregatorFactory( + new CountAggregatorFactory("_a0"), + and(not(selector("d1", null, null)), selector("a0", "1", null)) + ), + new FilteredAggregatorFactory( + new CountAggregatorFactory("_a1"), + and(not(selector("d2", null, null)), selector("a0", "2", null)) + ) + )) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ), + ImmutableList.of( + new Object[]{1609459200000L, 3L, 2L}, + new Object[]{1609545600000L, 3L, 4L}, + new Object[]{1609632000000L, 1L, 1L} + ) + ); + } + @Test public void testApproxCountDistinct() throws Exception { @@ -7758,6 +7827,7 @@ public void testNestedGroupBy() throws Exception @Test public void testDoubleNestedGroupBy() throws Exception { + requireMergeBuffers(3); testQuery( "SELECT SUM(cnt), COUNT(*) FROM (\n" + " SELECT dim2, SUM(t1.cnt) cnt FROM (\n" @@ -12517,6 +12587,8 @@ public void testGroupingSets() throws Exception @Test public void testGroupingAggregatorDifferentOrder() throws Exception { + requireMergeBuffers(3); + // Cannot vectorize due to virtual columns. cannotVectorize(); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java index e4a6ec03de92..5a78f0f26bb6 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java @@ -35,6 +35,7 @@ import org.apache.druid.client.DruidServer; import org.apache.druid.client.ServerInventoryView; import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.MapBasedInputRow; import org.apache.druid.data.input.impl.DimensionSchema; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.DoubleDimensionSchema; @@ -160,6 +161,7 @@ public class CalciteTests public static final String SOME_DATASOURCE = "some_datasource"; public static final String SOME_DATSOURCE_ESCAPED = "some\\_datasource"; public static final String SOMEXDATASOURCE = "somexdatasource"; + public static final String USERVISITDATASOURCE = "visits"; public static final String DRUID_SCHEMA_NAME = "druid"; public static final String INFORMATION_SCHEMA_NAME = "INFORMATION_SCHEMA"; public static final String SYSTEM_SCHEMA_NAME = "sys"; @@ -365,6 +367,15 @@ public AuthenticationResult createEscalatedAuthenticationResult() .withRollup(false) .build(); + private static final List USER_VISIT_DIMS = ImmutableList.of("user", "country", "city"); + private static final IncrementalIndexSchema INDEX_SCHEMA_USER_VISIT = new IncrementalIndexSchema.Builder() + .withMetrics( + new CountAggregatorFactory("cnt") + ) + .withRollup(false) + .withMinTimestamp(DateTimes.of("2020-12-31").getMillis()) + .build(); + public static final List> RAW_ROWS1 = ImmutableList.of( ImmutableMap.builder() .put("t", "2000-01-01") @@ -647,6 +658,33 @@ public AuthenticationResult createEscalatedAuthenticationResult() ) ); + private static List USER_VISIT_ROWS = ImmutableList.of( + toRow( + "2021-01-01T01:00:00Z", + USER_VISIT_DIMS, + ImmutableMap.of("user", "alice", "country", "canada", "city", "A") + ), + toRow( + "2021-01-01T02:00:00Z", + USER_VISIT_DIMS, + ImmutableMap.of("user", "alice", "country", "canada", "city", "B") + ), + toRow("2021-01-01T03:00:00Z", USER_VISIT_DIMS, ImmutableMap.of("user", "bob", "country", "canada", "city", "A")), + toRow("2021-01-01T04:00:00Z", USER_VISIT_DIMS, ImmutableMap.of("user", "alice", "country", "India", "city", "Y")), + toRow( + "2021-01-02T01:00:00Z", + USER_VISIT_DIMS, + ImmutableMap.of("user", "alice", "country", "canada", "city", "A") + ), + toRow("2021-01-02T02:00:00Z", USER_VISIT_DIMS, ImmutableMap.of("user", "bob", "country", "canada", "city", "A")), + toRow("2021-01-02T03:00:00Z", USER_VISIT_DIMS, ImmutableMap.of("user", "foo", "country", "canada", "city", "B")), + toRow("2021-01-02T04:00:00Z", USER_VISIT_DIMS, ImmutableMap.of("user", "bar", "country", "canada", "city", "B")), + toRow("2021-01-02T05:00:00Z", USER_VISIT_DIMS, ImmutableMap.of("user", "alice", "country", "India", "city", "X")), + toRow("2021-01-02T06:00:00Z", USER_VISIT_DIMS, ImmutableMap.of("user", "bob", "country", "India", "city", "X")), + toRow("2021-01-02T07:00:00Z", USER_VISIT_DIMS, ImmutableMap.of("user", "foo", "country", "India", "city", "X")), + toRow("2021-01-03T01:00:00Z", USER_VISIT_DIMS, ImmutableMap.of("user", "foo", "country", "USA", "city", "M")) + ); + private static final InlineDataSource JOINABLE_BACKING_DATA = InlineDataSource.fromIterable( RAW_ROWS1_WITH_NUMERIC_DIMS.stream().map(x -> new Object[]{ x.get("dim1"), @@ -856,6 +894,14 @@ public static SpecificSegmentsQuerySegmentWalker createMockWalker( .rows(RAW_ROWS1_X) .buildMMappedIndex(); + final QueryableIndex userVisitIndex = IndexBuilder + .create() + .tmpDir(new File(tmpDir, "8")) + .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance()) + .schema(INDEX_SCHEMA) + .rows(USER_VISIT_ROWS) + .buildMMappedIndex(); + return new SpecificSegmentsQuerySegmentWalker( conglomerate, @@ -943,9 +989,23 @@ public static SpecificSegmentsQuerySegmentWalker createMockWalker( .size(0) .build(), indexNumericDims + ).add( + DataSegment.builder() + .dataSource(USERVISITDATASOURCE) + .interval(userVisitIndex.getDataInterval()) + .version("1") + .shardSpec(new LinearShardSpec(0)) + .size(0) + .build(), + userVisitIndex ); } + private static MapBasedInputRow toRow(String time, List dimensions, Map event) + { + return new MapBasedInputRow(DateTimes.ISO_DATE_OPTIONAL_TIME.parse(time), dimensions, event); + } + public static ExprMacroTable createExprMacroTable() { final List exprMacros = new ArrayList<>(); diff --git a/website/.spelling b/website/.spelling index 50a22a561fff..c316e51237f4 100644 --- a/website/.spelling +++ b/website/.spelling @@ -1529,6 +1529,7 @@ druid.server.tier druid.sql.planner.maxSemiJoinRowsInMemory druid.sql.planner.sqlTimeZone druid.sql.planner.useApproximateCountDistinct +druid.sql.planner.useGroupingSetForExactDistinct druid.sql.planner.useApproximateTopN error_msg exprs @@ -1561,6 +1562,7 @@ timestamp_expr tls_port total_size useApproximateCountDistinct +useGroupingSetForExactDistinct useApproximateTopN wikipedia - ../docs/querying/timeseriesquery.md