Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1675,6 +1675,7 @@ The Druid SQL server is configured through the following properties on the Broke
|`druid.sql.planner.maxTopNLimit`|Maximum threshold for a [TopN query](../querying/topnquery.md). Higher limits will be planned as [GroupBy queries](../querying/groupbyquery.md) instead.|100000|
|`druid.sql.planner.metadataRefreshPeriod`|Throttle for metadata refreshes.|PT1M|
|`druid.sql.planner.useApproximateCountDistinct`|Whether to use an approximate cardinality algorithm for `COUNT(DISTINCT foo)`.|true|
|`druid.sql.planner.useGroupingSetForExactDistinct`|Only relevant when `useApproximateCountDistinct` is disabled. If set to true, exact distinct queries are re-written using grouping sets. Otherwise, exact distinct queries are re-written using joins. This should be set to true for group by query with multiple exact distinct aggregations. This flag can be overridden per query.|false|
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

naively this seems better maybe than using joins... is the reason to make it false by default in case there are any regressions I guess? I only ask because things that are cool, but off by default tend to take a long time to make it to being turned on, if ever.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't want to accidentally break any queries that are running already. At least for the backports, we do want to keep it off by default but maybe turn it on for new releases?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be ok if we could leave off by default for next release, and maybe consider turning on in the release after

|`druid.sql.planner.useApproximateTopN`|Whether to use approximate [TopN queries](../querying/topnquery.md) when a SQL query could be expressed as such. If false, exact [GroupBy queries](../querying/groupbyquery.md) will be used instead.|true|
|`druid.sql.planner.requireTimeCondition`|Whether to require SQL to have filter conditions on __time column so that all generated native queries will have user specified intervals. If true, all queries without filter condition on __time column will fail|false|
|`druid.sql.planner.sqlTimeZone`|Sets the default time zone for the server, which will affect how time functions and timestamp literals behave. Should be a time zone name like "America/Los_Angeles" or offset like "-08:00".|UTC|
Expand Down
3 changes: 2 additions & 1 deletion docs/querying/sql.md
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ Only the COUNT aggregation can accept DISTINCT.
|Function|Notes|
|--------|-----|
|`COUNT(*)`|Counts the number of rows.|
|`COUNT(DISTINCT expr)`|Counts distinct values of expr, which can be string, numeric, or hyperUnique. By default this is approximate, using a variant of [HyperLogLog](http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf). To get exact counts set "useApproximateCountDistinct" to "false". If you do this, expr must be string or numeric, since exact counts are not possible using hyperUnique columns. See also `APPROX_COUNT_DISTINCT(expr)`. In exact mode, only one distinct count per query is permitted.|
|`COUNT(DISTINCT expr)`|Counts distinct values of expr, which can be string, numeric, or hyperUnique. By default this is approximate, using a variant of [HyperLogLog](http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf). To get exact counts set "useApproximateCountDistinct" to "false". If you do this, expr must be string or numeric, since exact counts are not possible using hyperUnique columns. See also `APPROX_COUNT_DISTINCT(expr)`. In exact mode, only one distinct count per query is permitted unless `useGroupingSetForExactDistinct` is set to true in query contexts or broker configurations.|
|`SUM(expr)`|Sums numbers.|
|`MIN(expr)`|Takes the minimum of numbers.|
|`MAX(expr)`|Takes the maximum of numbers.|
Expand Down Expand Up @@ -1007,6 +1007,7 @@ Connection context can be specified as JDBC connection properties or as a "conte
|`sqlQueryId`|Unique identifier given to this SQL query. For HTTP client, it will be returned in `X-Druid-SQL-Query-Id` header.|auto-generated|
|`sqlTimeZone`|Sets the time zone for this connection, which will affect how time functions and timestamp literals behave. Should be a time zone name like "America/Los_Angeles" or offset like "-08:00".|druid.sql.planner.sqlTimeZone on the Broker (default: UTC)|
|`useApproximateCountDistinct`|Whether to use an approximate cardinality algorithm for `COUNT(DISTINCT foo)`.|druid.sql.planner.useApproximateCountDistinct on the Broker (default: true)|
|`useGroupingSetForExactDistinct`|Whether to use grouping sets to execute queries with multiple exact distinct aggregations.|druid.sql.planner.useGroupingSetForExactDistinct on the Broker (default: false)|
|`useApproximateTopN`|Whether to use approximate [TopN queries](topnquery.md) when a SQL query could be expressed as such. If false, exact [GroupBy queries](groupbyquery.md) will be used instead.|druid.sql.planner.useApproximateTopN on the Broker (default: true)|

## Metadata tables
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.query.groupby.strategy;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -85,8 +86,8 @@ public class GroupByStrategyV2 implements GroupByStrategy
public static final String CTX_KEY_FUDGE_TIMESTAMP = "fudgeTimestamp";
public static final String CTX_KEY_OUTERMOST = "groupByOutermost";

// see countRequiredMergeBufferNum() for explanation
private static final int MAX_MERGE_BUFFER_NUM = 2;
// see countRequiredMergeBufferNumWithoutSubtotal() for explanation
private static final int MAX_MERGE_BUFFER_NUM_WITHOUT_SUBTOTAL = 2;

private final DruidProcessingConfig processingConfig;
private final Supplier<GroupByQueryConfig> configSupplier;
Expand Down Expand Up @@ -116,8 +117,7 @@ public GroupByStrategyV2(
@Override
public GroupByQueryResource prepareResource(GroupByQuery query)
{
final int requiredMergeBufferNum = countRequiredMergeBufferNum(query, 1) +
numMergeBuffersNeededForSubtotalsSpec(query);
final int requiredMergeBufferNum = countRequiredMergeBufferNum(query);

if (requiredMergeBufferNum > mergeBufferPool.maxSize()) {
throw new ResourceLimitExceededException(
Expand Down Expand Up @@ -146,7 +146,13 @@ public GroupByQueryResource prepareResource(GroupByQuery query)
}
}

private static int countRequiredMergeBufferNum(Query query, int foundNum)
@VisibleForTesting
public static int countRequiredMergeBufferNum(GroupByQuery query)
{
return countRequiredMergeBufferNumWithoutSubtotal(query, 1) + numMergeBuffersNeededForSubtotalsSpec(query);
}

private static int countRequiredMergeBufferNumWithoutSubtotal(Query query, int foundNum)
{
// Note: A broker requires merge buffers for processing the groupBy layers beyond the inner-most one.
// For example, the number of required merge buffers for a nested groupBy (groupBy -> groupBy -> table) is 1.
Expand All @@ -156,10 +162,10 @@ private static int countRequiredMergeBufferNum(Query query, int foundNum)
// This is same for subsequent groupBy layers, and thus the maximum number of required merge buffers becomes 2.

final DataSource dataSource = query.getDataSource();
if (foundNum == MAX_MERGE_BUFFER_NUM + 1 || !(dataSource instanceof QueryDataSource)) {
if (foundNum == MAX_MERGE_BUFFER_NUM_WITHOUT_SUBTOTAL + 1 || !(dataSource instanceof QueryDataSource)) {
return foundNum - 1;
} else {
return countRequiredMergeBufferNum(((QueryDataSource) dataSource).getQuery(), foundNum + 1);
return countRequiredMergeBufferNumWithoutSubtotal(((QueryDataSource) dataSource).getQuery(), foundNum + 1);
}
}

Expand Down Expand Up @@ -522,11 +528,20 @@ private Set<String> getAggregatorAndPostAggregatorNames(GroupByQuery query)
return aggsAndPostAggs;
}

private int numMergeBuffersNeededForSubtotalsSpec(GroupByQuery query)
private static int numMergeBuffersNeededForSubtotalsSpec(GroupByQuery query)
{
List<List<String>> subtotalSpecs = query.getSubtotalsSpec();
final DataSource dataSource = query.getDataSource();
int numMergeBuffersNeededForSubQuerySubtotal = 0;
if (dataSource instanceof QueryDataSource) {
Query<?> subQuery = ((QueryDataSource) dataSource).getQuery();
if (subQuery instanceof GroupByQuery) {
numMergeBuffersNeededForSubQuerySubtotal = numMergeBuffersNeededForSubtotalsSpec((GroupByQuery) subQuery);
}

}
if (subtotalSpecs == null || subtotalSpecs.size() == 0) {
return 0;
return numMergeBuffersNeededForSubQuerySubtotal;
}

List<String> queryDimOutputNames = query.getDimensions().stream().map(DimensionSpec::getOutputName).collect(
Expand All @@ -537,7 +552,7 @@ private int numMergeBuffersNeededForSubtotalsSpec(GroupByQuery query)
}
}

return 1;
return Math.max(1, numMergeBuffersNeededForSubQuerySubtotal);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -109,7 +110,7 @@ public int intermediateComputeSizeBytes()
@Override
public int getNumMergeBuffers()
{
return 3;
return 4;
}

@Override
Expand Down Expand Up @@ -211,10 +212,11 @@ public void testSimpleGroupBy()
.setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, TIMEOUT))
.build();

Assert.assertEquals(0, GroupByStrategyV2.countRequiredMergeBufferNum(query));
GroupByQueryRunnerTestHelper.runQuery(FACTORY, runner, query);

Assert.assertEquals(2, MERGE_BUFFER_POOL.getMinRemainBufferNum());
Assert.assertEquals(3, MERGE_BUFFER_POOL.getPoolSize());
Assert.assertEquals(3, MERGE_BUFFER_POOL.getMinRemainBufferNum());
Assert.assertEquals(4, MERGE_BUFFER_POOL.getPoolSize());
}

@Test
Expand All @@ -239,10 +241,11 @@ public void testNestedGroupBy()
.setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, TIMEOUT))
.build();

Assert.assertEquals(1, GroupByStrategyV2.countRequiredMergeBufferNum(query));
GroupByQueryRunnerTestHelper.runQuery(FACTORY, runner, query);

Assert.assertEquals(1, MERGE_BUFFER_POOL.getMinRemainBufferNum());
Assert.assertEquals(3, MERGE_BUFFER_POOL.getPoolSize());
Assert.assertEquals(2, MERGE_BUFFER_POOL.getMinRemainBufferNum());
Assert.assertEquals(4, MERGE_BUFFER_POOL.getPoolSize());
}

@Test
Expand Down Expand Up @@ -278,11 +281,12 @@ public void testDoubleNestedGroupBy()
.setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, TIMEOUT))
.build();

Assert.assertEquals(2, GroupByStrategyV2.countRequiredMergeBufferNum(query));
GroupByQueryRunnerTestHelper.runQuery(FACTORY, runner, query);

// This should be 0 because the broker needs 2 buffers and the queryable node needs one.
Assert.assertEquals(0, MERGE_BUFFER_POOL.getMinRemainBufferNum());
Assert.assertEquals(3, MERGE_BUFFER_POOL.getPoolSize());
// This should be 1 because the broker needs 2 buffers and the queryable node needs one.
Assert.assertEquals(1, MERGE_BUFFER_POOL.getMinRemainBufferNum());
Assert.assertEquals(4, MERGE_BUFFER_POOL.getPoolSize());
}

@Test
Expand Down Expand Up @@ -331,10 +335,157 @@ public void testTripleNestedGroupBy()
.setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, TIMEOUT))
.build();

Assert.assertEquals(2, GroupByStrategyV2.countRequiredMergeBufferNum(query));
GroupByQueryRunnerTestHelper.runQuery(FACTORY, runner, query);

// This should be 1 because the broker needs 2 buffers and the queryable node needs one.
Assert.assertEquals(1, MERGE_BUFFER_POOL.getMinRemainBufferNum());
Assert.assertEquals(4, MERGE_BUFFER_POOL.getPoolSize());
}

@Test
public void testSimpleGroupByWithSubtotals()
{
final GroupByQuery query = GroupByQuery
.builder()
.setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
.setDimensions(Arrays.asList(
DefaultDimensionSpec.of(QueryRunnerTestHelper.MARKET_DIMENSION),
DefaultDimensionSpec.of(QueryRunnerTestHelper.PLACEMENT_DIMENSION),
DefaultDimensionSpec.of(QueryRunnerTestHelper.QUALITY_DIMENSION)
))
.setGranularity(Granularities.ALL)
.setInterval(QueryRunnerTestHelper.FIRST_TO_THIRD)
.setAggregatorSpecs(new LongSumAggregatorFactory("rows", "rows"))
.setSubtotalsSpec(Arrays.asList(
Arrays.asList(QueryRunnerTestHelper.MARKET_DIMENSION, QueryRunnerTestHelper.PLACEMENT_DIMENSION),
Arrays.asList(QueryRunnerTestHelper.MARKET_DIMENSION, QueryRunnerTestHelper.PLACEMENT_DIMENSION, QueryRunnerTestHelper.QUALITY_DIMENSION)
))
.setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, TIMEOUT))
.build();

Assert.assertEquals(1, GroupByStrategyV2.countRequiredMergeBufferNum(query));
GroupByQueryRunnerTestHelper.runQuery(FACTORY, runner, query);

// 1 for subtotal and 1 for GroupByQueryRunnerFactory#mergeRunners
Assert.assertEquals(2, MERGE_BUFFER_POOL.getMinRemainBufferNum());
Assert.assertEquals(4, MERGE_BUFFER_POOL.getPoolSize());
}

@Test
public void testSimpleGroupByWithSubtotalsWithoutPrefixMatch()
{
final GroupByQuery query = GroupByQuery
.builder()
.setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
.setDimensions(Arrays.asList(
DefaultDimensionSpec.of(QueryRunnerTestHelper.MARKET_DIMENSION),
DefaultDimensionSpec.of(QueryRunnerTestHelper.PLACEMENT_DIMENSION),
DefaultDimensionSpec.of(QueryRunnerTestHelper.QUALITY_DIMENSION)
))
.setGranularity(Granularities.ALL)
.setInterval(QueryRunnerTestHelper.FIRST_TO_THIRD)
.setAggregatorSpecs(new LongSumAggregatorFactory("rows", "rows"))
.setSubtotalsSpec(Arrays.asList(
Arrays.asList(QueryRunnerTestHelper.MARKET_DIMENSION, QueryRunnerTestHelper.PLACEMENT_DIMENSION),
Arrays.asList(QueryRunnerTestHelper.MARKET_DIMENSION, QueryRunnerTestHelper.QUALITY_DIMENSION)
))
.setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, TIMEOUT))
.build();

Assert.assertEquals(2, GroupByStrategyV2.countRequiredMergeBufferNum(query));
GroupByQueryRunnerTestHelper.runQuery(FACTORY, runner, query);

// 2 needed by subtotal and 1 for GroupByQueryRunnerFactory#mergeRunners
Assert.assertEquals(1, MERGE_BUFFER_POOL.getMinRemainBufferNum());
Assert.assertEquals(4, MERGE_BUFFER_POOL.getPoolSize());
}

@Test
public void testNestedGroupByWithSubtotals()
{
final GroupByQuery query = GroupByQuery
.builder()
.setDataSource(
new QueryDataSource(
GroupByQuery.builder()
.setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
.setInterval(QueryRunnerTestHelper.FIRST_TO_THIRD)
.setGranularity(Granularities.ALL)
.setDimensions(Arrays.asList(
DefaultDimensionSpec.of("quality"),
DefaultDimensionSpec.of("market"),
DefaultDimensionSpec.of("placement")
))
.setAggregatorSpecs(Collections.singletonList(QueryRunnerTestHelper.ROWS_COUNT))
.build()
)
)
.setGranularity(Granularities.ALL)
.setInterval(QueryRunnerTestHelper.FIRST_TO_THIRD)
.setDimensions(Arrays.asList(
DefaultDimensionSpec.of("quality"),
DefaultDimensionSpec.of("market")
))
.setSubtotalsSpec(Arrays.asList(
Collections.singletonList("quality"),
Collections.singletonList("market")
))
.setAggregatorSpecs(new LongSumAggregatorFactory("rows", "rows"))
.setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, TIMEOUT))
.build();

Assert.assertEquals(3, GroupByStrategyV2.countRequiredMergeBufferNum(query));
GroupByQueryRunnerTestHelper.runQuery(FACTORY, runner, query);

// 2 for subtotal, 1 for nested group by and 1 for GroupByQueryRunnerFactory#mergeRunners
Assert.assertEquals(0, MERGE_BUFFER_POOL.getMinRemainBufferNum());
Assert.assertEquals(4, MERGE_BUFFER_POOL.getPoolSize());
}

@Test
public void testNestedGroupByWithNestedSubtotals()
{
final GroupByQuery query = GroupByQuery
.builder()
.setDataSource(
new QueryDataSource(
GroupByQuery.builder()
.setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
.setInterval(QueryRunnerTestHelper.FIRST_TO_THIRD)
.setGranularity(Granularities.ALL)
.setDimensions(Arrays.asList(
DefaultDimensionSpec.of("quality"),
DefaultDimensionSpec.of("market"),
DefaultDimensionSpec.of("placement")
))
.setSubtotalsSpec(Arrays.asList(
Collections.singletonList("quality"),
Collections.singletonList("market")
))
.setAggregatorSpecs(Collections.singletonList(QueryRunnerTestHelper.ROWS_COUNT))
.build()
)
)
.setGranularity(Granularities.ALL)
.setInterval(QueryRunnerTestHelper.FIRST_TO_THIRD)
.setDimensions(Arrays.asList(
DefaultDimensionSpec.of("quality"),
DefaultDimensionSpec.of("market")
))
.setSubtotalsSpec(Arrays.asList(
Collections.singletonList("quality"),
Collections.singletonList("market")
))
.setAggregatorSpecs(new LongSumAggregatorFactory("rows", "rows"))
.setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, TIMEOUT))
.build();

Assert.assertEquals(3, GroupByStrategyV2.countRequiredMergeBufferNum(query));
GroupByQueryRunnerTestHelper.runQuery(FACTORY, runner, query);

// This should be 0 because the broker needs 2 buffers and the queryable node needs one.
// 2 for subtotal, 1 for nested group by and 1 for GroupByQueryRunnerFactory#mergeRunners
Assert.assertEquals(0, MERGE_BUFFER_POOL.getMinRemainBufferNum());
Assert.assertEquals(3, MERGE_BUFFER_POOL.getPoolSize());
Assert.assertEquals(4, MERGE_BUFFER_POOL.getPoolSize());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,10 @@ public void setupTestBase()
new ForegroundCachePopulator(objectMapper, new CachePopulatorStats(), 0),
new CacheConfig(),
new DruidHttpClientConfig(),
QueryStackTests.getProcessingConfig(USE_PARALLEL_MERGE_POOL_CONFIGURED),
QueryStackTests.getProcessingConfig(
USE_PARALLEL_MERGE_POOL_CONFIGURED,
DruidProcessingConfig.DEFAULT_NUM_MERGE_BUFFERS
),
ForkJoinPool.commonPool(),
QueryStackTests.DEFAULT_NOOP_SCHEDULER,
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of())
Expand Down
20 changes: 15 additions & 5 deletions server/src/test/java/org/apache/druid/server/QueryStackTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,10 @@ public static LocalQuerySegmentWalker createLocalQuerySegmentWalker(
);
}

public static DruidProcessingConfig getProcessingConfig(boolean useParallelMergePoolConfigured)
public static DruidProcessingConfig getProcessingConfig(
boolean useParallelMergePoolConfigured,
final int mergeBuffers
)
{
return new DruidProcessingConfig()
{
Expand All @@ -206,9 +209,10 @@ public int getNumThreads()
@Override
public int getNumMergeBuffers()
{
// Need 3 buffers for CalciteQueryTest.testDoubleNestedGroupby.
// Two buffers for the broker and one for the queryable.
return 3;
if (mergeBuffers == DEFAULT_NUM_MERGE_BUFFERS) {
return 2;
}
return mergeBuffers;
}

@Override
Expand All @@ -230,9 +234,15 @@ public static QueryRunnerFactoryConglomerate createQueryRunnerFactoryConglomerat
public static QueryRunnerFactoryConglomerate createQueryRunnerFactoryConglomerate(
final Closer closer,
final boolean useParallelMergePoolConfigured

)
{
return createQueryRunnerFactoryConglomerate(closer, getProcessingConfig(useParallelMergePoolConfigured));
return createQueryRunnerFactoryConglomerate(closer,
getProcessingConfig(
useParallelMergePoolConfigured,
DruidProcessingConfig.DEFAULT_NUM_MERGE_BUFFERS
)
);
}

public static QueryRunnerFactoryConglomerate createQueryRunnerFactoryConglomerate(
Expand Down
Loading