From 62ee8b7968d9f1ae835ad75059b980cc4ad53a70 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Wed, 26 Mar 2025 13:45:51 -0700 Subject: [PATCH] GroupBy: Fix offsets on outer queries. Prior to this patch, an offset specified on a groupBy that itself has an inner groupBy would lead to an error like "Cannot push down offsets". This happened because of a violated assumption: the processing logic assumes that offsets have been pushed into limits (so limit pushdown optimizations can safely be used). This patch adjusts processing to incorporate offsets into limits during processing of subqueries. Later on, in post-processing, offsets are applied as written. --- .../druid/query/groupby/GroupingEngine.java | 9 ++++- .../query/groupby/GroupByQueryRunnerTest.java | 33 +++++++++++++++++++ 2 files changed, 41 insertions(+), 1 deletion(-) diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java index 91629706788b..3f85bbf63e1f 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java @@ -597,7 +597,7 @@ public Sequence processSubqueryResult( GroupByRowProcessor.ResultSupplier resultSupplier = null; try { - final GroupByQuery queryToRun; + GroupByQuery queryToRun; if (wasQueryPushedDown) { // If the query was pushed down, filters would have been applied downstream, so skip it here. @@ -607,6 +607,13 @@ public Sequence processSubqueryResult( queryToRun = query; } + if (queryToRun.getLimitSpec() instanceof DefaultLimitSpec) { + // If the query has an offset, incorporate it into the limit before processing subquery results. + // This allows limit pushdown to work properly during processing. Later on, we'll use the GroupByQuery's + // postProcessingFn to apply the offset. + queryToRun = queryToRun.withLimitSpec(((DefaultLimitSpec) queryToRun.getLimitSpec()).withOffsetToLimit()); + } + resultSupplier = GroupByRowProcessor.process( queryToRun, wasQueryPushedDown ? queryToRun : subquery, diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java index 300a74c8d2d3..67e4c5b1abe0 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java @@ -6569,6 +6569,39 @@ public void testSubqueryWithPostAggregators() TestHelper.assertExpectedObjects(expectedResults, results, "subquery-postaggs"); } + @Test + public void testSubqueryWithOuterOffsetAndLimit() + { + // Granularity != ALL requires time-ordering. + assumeTimeOrdered(); + + final GroupByQuery subquery = makeQueryBuilder() + .setDataSource(QueryRunnerTestHelper.DATA_SOURCE) + .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD) + .setDimensions(new DefaultDimensionSpec("quality", "alias")) + .setAggregatorSpecs(QueryRunnerTestHelper.ROWS_COUNT) + .setGranularity(QueryRunnerTestHelper.DAY_GRAN) + .build(); + + final GroupByQuery query = makeQueryBuilder() + .setDataSource(subquery) + .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD) + .setDimensions(new DefaultDimensionSpec("alias", "alias")) + .setAggregatorSpecs(new LongSumAggregatorFactory("rows", "rows")) + .setGranularity(QueryRunnerTestHelper.DAY_GRAN) + .setLimitSpec(new DefaultLimitSpec(null, 1, 2)) + .build(); + + List expectedResults = Arrays.asList( + makeRow(query, "2011-04-01", "alias", "business", "rows", 1L), + makeRow(query, "2011-04-01", "alias", "entertainment", "rows", 1L) + ); + + // Subqueries are handled by the ToolChest + Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); + TestHelper.assertExpectedObjects(expectedResults, results, "subquery-postaggs"); + } + @Test public void testSubqueryWithPostAggregatorsAndHaving() {