From c0b28aefbf5da0f1b57c3fb01f197d2f408850a3 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Tue, 14 Dec 2021 17:47:30 +0530 Subject: [PATCH 1/7] skipEmptyBuckets=true when GroupBy is optimized to Timeseries --- .../druid/sql/calcite/rel/DruidQuery.java | 4 +- .../druid/sql/calcite/rel/Grouping.java | 57 ++++++++++++++++++- .../druid/sql/calcite/CalciteQueryTest.java | 36 ++++++++++-- 3 files changed, 89 insertions(+), 8 deletions(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java index 2292781adaae..3bfbb84a8d14 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java @@ -856,7 +856,9 @@ private TimeseriesQuery toTimeseriesQuery(final QueryFeatureInspector queryFeatu // An aggregation query should return one row per group, with no grouping (e.g. ALL granularity), the entire table // is the group, so we should not skip empty buckets. When there are no results, this means we return the // initialized state for given aggregators instead of nothing. - if (!Granularities.ALL.equals(queryGranularity)) { + // Alternatively, the timeseries query should return empty buckets, even with ALL granularity when timeseries query + // was originally a groupBy query, but with the grouping dimensions removed away in Grouping#applyProject + if (!Granularities.ALL.equals(queryGranularity) || grouping.isDroppedDimensionsWhileApplyingProject()) { theContext.put(TimeseriesQuery.SKIP_EMPTY_BUCKETS, true); } theContext.putAll(plannerContext.getQueryContext()); diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/Grouping.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/Grouping.java index 0a954729f703..f1cb0db28ba4 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/Grouping.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/Grouping.java @@ -62,6 +62,7 @@ public class Grouping @Nullable private final DimFilter havingFilter; private final RowSignature outputRowSignature; + private final boolean droppedDimensionsWhileApplyingProject; private Grouping( final List dimensions, @@ -70,12 +71,25 @@ private Grouping( @Nullable final DimFilter havingFilter, final RowSignature outputRowSignature ) + { + this(dimensions, subtotals, aggregations, havingFilter, outputRowSignature, false); + } + + private Grouping( + final List dimensions, + final Subtotals subtotals, + final List aggregations, + @Nullable final DimFilter havingFilter, + final RowSignature outputRowSignature, + final boolean droppedDimensionsWhileApplyingProject + ) { this.dimensions = ImmutableList.copyOf(dimensions); this.subtotals = Preconditions.checkNotNull(subtotals, "subtotals"); this.aggregations = ImmutableList.copyOf(aggregations); this.havingFilter = havingFilter; this.outputRowSignature = Preconditions.checkNotNull(outputRowSignature, "outputRowSignature"); + this.droppedDimensionsWhileApplyingProject = droppedDimensionsWhileApplyingProject; // Verify no collisions between dimensions, aggregations, post-aggregations. final Set seen = new HashSet<>(); @@ -103,6 +117,27 @@ private Grouping( } } + // This method is private since droppedDimensionsWhileApplyingProject should only be deviated from default in + // applyProject + private static Grouping create( + final List dimensions, + final Subtotals subtotals, + final List aggregations, + @Nullable final DimFilter havingFilter, + final RowSignature outputRowSignature, + final boolean droppedDimensionsWhileApplyingProject + ) + { + return new Grouping( + dimensions, + subtotals, + aggregations, + havingFilter, + outputRowSignature, + droppedDimensionsWhileApplyingProject + ); + } + public static Grouping create( final List dimensions, final Subtotals subtotals, @@ -160,6 +195,11 @@ public RowSignature getOutputRowSignature() return outputRowSignature; } + public boolean isDroppedDimensionsWhileApplyingProject() + { + return droppedDimensionsWhileApplyingProject; + } + /** * Applies a post-grouping projection. * @@ -187,11 +227,13 @@ public Grouping applyProject(final PlannerContext plannerContext, final Project // actually want to include a dimension 'dummy'. final ImmutableBitSet aggregateProjectBits = RelOptUtil.InputFinder.bits(project.getChildExps(), null); final int[] newDimIndexes = new int[dimensions.size()]; + boolean droppedDimensions = false; for (int i = 0; i < dimensions.size(); i++) { final DimensionExpression dimension = dimensions.get(i); if (Parser.parse(dimension.getDruidExpression().getExpression(), plannerContext.getExprMacroTable()) .isLiteral() && !aggregateProjectBits.get(i)) { + droppedDimensions = true; newDimIndexes[i] = -1; } else { newDimIndexes[i] = newDimensions.size(); @@ -225,7 +267,8 @@ public Grouping applyProject(final PlannerContext plannerContext, final Project newSubtotals, newAggregations, havingFilter, - postAggregationProjection.getOutputRowSignature() + postAggregationProjection.getOutputRowSignature(), + droppedDimensions ); } @@ -243,12 +286,20 @@ public boolean equals(Object o) subtotals.equals(grouping.subtotals) && aggregations.equals(grouping.aggregations) && Objects.equals(havingFilter, grouping.havingFilter) && - outputRowSignature.equals(grouping.outputRowSignature); + outputRowSignature.equals(grouping.outputRowSignature) && + droppedDimensionsWhileApplyingProject == grouping.droppedDimensionsWhileApplyingProject; } @Override public int hashCode() { - return Objects.hash(dimensions, subtotals, aggregations, havingFilter, outputRowSignature); + return Objects.hash( + dimensions, + subtotals, + aggregations, + havingFilter, + outputRowSignature, + droppedDimensionsWhileApplyingProject + ); } } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index 90b291289908..a8e7ee7f4945 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -3911,12 +3911,10 @@ public void testGroupByWithFilterMatchingNothingWithGroupByLiteral() throws Exce new CountAggregatorFactory("a0"), new LongMaxAggregatorFactory("a1", "cnt") )) - .context(QUERY_CONTEXT_DEFAULT) + .context(QUERY_CONTEXT_DO_SKIP_EMPTY_BUCKETS) .build() ), - ImmutableList.of( - new Object[]{0L, NullHandling.sqlCompatible() ? null : Long.MIN_VALUE} - ) + ImmutableList.of() ); } @@ -13342,4 +13340,34 @@ public void testCommonVirtualExpressionWithDifferentValueType() throws Exception ImmutableList.of() ); } + + // When optimization in Grouping#applyProject is applied, and it reduces a Group By query to a timeseries, we + // want it to return empty bucket if no row matches + @Test + public void testReturnEmptyRowWhenGroupByIsConvertedToTimeseries() throws Exception + { + skipVectorize(); + testQuery( + "SELECT 'A', dim1 from foo WHERE m1 = 50 AND dim1 = 'wat' GROUP BY dim1", + ImmutableList.of( + Druids.newTimeseriesQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(querySegmentSpec(Filtration.eternity())) + .filters( + and( + selector("m1", "50", null), + selector("dim1", "wat", null) + ) + ) + .granularity(Granularities.ALL) + .postAggregators( + new ExpressionPostAggregator("p0", "'A'", null, ExprMacroTable.nil()), + new ExpressionPostAggregator("p1", "'wat'", null, ExprMacroTable.nil()) + ) + .context(QUERY_CONTEXT_DO_SKIP_EMPTY_BUCKETS) + .build() + ), + ImmutableList.of() + ); + } } From 4b15f14a5e8cc41ce2698c4a4da98427361c6bfb Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Wed, 15 Dec 2021 00:30:02 +0530 Subject: [PATCH 2/7] Trigger Build From 42efe63b55a82a2ebb54515ebd6e26fec1cd65fc Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Wed, 15 Dec 2021 09:18:09 +0530 Subject: [PATCH 3/7] Trigger Build From 6c23bd6875b28bda151ed75825422df7cf91e632 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Wed, 15 Dec 2021 12:41:08 +0530 Subject: [PATCH 4/7] Trigger Build From 3b94cbfa55f62b1d676db95d82d6903913de5aed Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Tue, 4 Jan 2022 11:45:49 +0530 Subject: [PATCH 5/7] Rename the flag --- .../druid/sql/calcite/rel/Grouping.java | 24 +++++++++++-------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/Grouping.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/Grouping.java index f1cb0db28ba4..77fada7f6f41 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/Grouping.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/Grouping.java @@ -62,7 +62,11 @@ public class Grouping @Nullable private final DimFilter havingFilter; private final RowSignature outputRowSignature; - private final boolean droppedDimensionsWhileApplyingProject; + + // Denotes whether the original Grouping had more dimensions which were dropped while applying projection to optimize + // the grouping. Used for returning result which is consistent with most SQL implementations, by correspondingly + // setting/unsetting the SKIP_EMPTY_BUCKETS flag, if the GroupBy query can be reduced to a timeseries query. + private final boolean optimizedWhileGrouping; private Grouping( final List dimensions, @@ -81,7 +85,7 @@ private Grouping( final List aggregations, @Nullable final DimFilter havingFilter, final RowSignature outputRowSignature, - final boolean droppedDimensionsWhileApplyingProject + final boolean optimizedWhileGrouping ) { this.dimensions = ImmutableList.copyOf(dimensions); @@ -89,7 +93,7 @@ private Grouping( this.aggregations = ImmutableList.copyOf(aggregations); this.havingFilter = havingFilter; this.outputRowSignature = Preconditions.checkNotNull(outputRowSignature, "outputRowSignature"); - this.droppedDimensionsWhileApplyingProject = droppedDimensionsWhileApplyingProject; + this.optimizedWhileGrouping = optimizedWhileGrouping; // Verify no collisions between dimensions, aggregations, post-aggregations. final Set seen = new HashSet<>(); @@ -117,7 +121,7 @@ private Grouping( } } - // This method is private since droppedDimensionsWhileApplyingProject should only be deviated from default in + // This method is private since optimizedWhileGrouping should only be deviated from default in // applyProject private static Grouping create( final List dimensions, @@ -125,7 +129,7 @@ private static Grouping create( final List aggregations, @Nullable final DimFilter havingFilter, final RowSignature outputRowSignature, - final boolean droppedDimensionsWhileApplyingProject + final boolean optimizedWhileGrouping ) { return new Grouping( @@ -134,7 +138,7 @@ private static Grouping create( aggregations, havingFilter, outputRowSignature, - droppedDimensionsWhileApplyingProject + optimizedWhileGrouping ); } @@ -195,9 +199,9 @@ public RowSignature getOutputRowSignature() return outputRowSignature; } - public boolean isDroppedDimensionsWhileApplyingProject() + public boolean isOptimizedWhileGrouping() { - return droppedDimensionsWhileApplyingProject; + return optimizedWhileGrouping; } /** @@ -287,7 +291,7 @@ public boolean equals(Object o) aggregations.equals(grouping.aggregations) && Objects.equals(havingFilter, grouping.havingFilter) && outputRowSignature.equals(grouping.outputRowSignature) && - droppedDimensionsWhileApplyingProject == grouping.droppedDimensionsWhileApplyingProject; + optimizedWhileGrouping == grouping.optimizedWhileGrouping; } @Override @@ -299,7 +303,7 @@ public int hashCode() aggregations, havingFilter, outputRowSignature, - droppedDimensionsWhileApplyingProject + optimizedWhileGrouping ); } } From c1417d808fa8ceb34efe781021044d7ed98b2785 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Tue, 4 Jan 2022 11:46:51 +0530 Subject: [PATCH 6/7] Rename the flag, DruidQuery --- .../main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java index 3bfbb84a8d14..ca6fbf416b74 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java @@ -858,7 +858,7 @@ private TimeseriesQuery toTimeseriesQuery(final QueryFeatureInspector queryFeatu // initialized state for given aggregators instead of nothing. // Alternatively, the timeseries query should return empty buckets, even with ALL granularity when timeseries query // was originally a groupBy query, but with the grouping dimensions removed away in Grouping#applyProject - if (!Granularities.ALL.equals(queryGranularity) || grouping.isDroppedDimensionsWhileApplyingProject()) { + if (!Granularities.ALL.equals(queryGranularity) || grouping.isOptimizedWhileGrouping()) { theContext.put(TimeseriesQuery.SKIP_EMPTY_BUCKETS, true); } theContext.putAll(plannerContext.getQueryContext()); From 8a4d8e60c625ace2efc50a135e76e0cf64ea1276 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Fri, 7 Jan 2022 11:50:56 +0530 Subject: [PATCH 7/7] Add test cases, improve flagname --- .../druid/sql/calcite/rel/DruidQuery.java | 2 +- .../druid/sql/calcite/rel/Grouping.java | 20 ++--- .../druid/sql/calcite/CalciteQueryTest.java | 79 ++++++++++++++++++- 3 files changed, 89 insertions(+), 12 deletions(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java index ca6fbf416b74..0def266b266f 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java @@ -858,7 +858,7 @@ private TimeseriesQuery toTimeseriesQuery(final QueryFeatureInspector queryFeatu // initialized state for given aggregators instead of nothing. // Alternatively, the timeseries query should return empty buckets, even with ALL granularity when timeseries query // was originally a groupBy query, but with the grouping dimensions removed away in Grouping#applyProject - if (!Granularities.ALL.equals(queryGranularity) || grouping.isOptimizedWhileGrouping()) { + if (!Granularities.ALL.equals(queryGranularity) || grouping.hasGroupingDimensionsDropped()) { theContext.put(TimeseriesQuery.SKIP_EMPTY_BUCKETS, true); } theContext.putAll(plannerContext.getQueryContext()); diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/Grouping.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/Grouping.java index 77fada7f6f41..d6931fca47b2 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/Grouping.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/Grouping.java @@ -66,7 +66,7 @@ public class Grouping // Denotes whether the original Grouping had more dimensions which were dropped while applying projection to optimize // the grouping. Used for returning result which is consistent with most SQL implementations, by correspondingly // setting/unsetting the SKIP_EMPTY_BUCKETS flag, if the GroupBy query can be reduced to a timeseries query. - private final boolean optimizedWhileGrouping; + private final boolean groupingDimensionsDropped; private Grouping( final List dimensions, @@ -85,7 +85,7 @@ private Grouping( final List aggregations, @Nullable final DimFilter havingFilter, final RowSignature outputRowSignature, - final boolean optimizedWhileGrouping + final boolean groupingDimensionsDropped ) { this.dimensions = ImmutableList.copyOf(dimensions); @@ -93,7 +93,7 @@ private Grouping( this.aggregations = ImmutableList.copyOf(aggregations); this.havingFilter = havingFilter; this.outputRowSignature = Preconditions.checkNotNull(outputRowSignature, "outputRowSignature"); - this.optimizedWhileGrouping = optimizedWhileGrouping; + this.groupingDimensionsDropped = groupingDimensionsDropped; // Verify no collisions between dimensions, aggregations, post-aggregations. final Set seen = new HashSet<>(); @@ -121,7 +121,7 @@ private Grouping( } } - // This method is private since optimizedWhileGrouping should only be deviated from default in + // This method is private since groupingDimensionsDropped should only be deviated from default in // applyProject private static Grouping create( final List dimensions, @@ -129,7 +129,7 @@ private static Grouping create( final List aggregations, @Nullable final DimFilter havingFilter, final RowSignature outputRowSignature, - final boolean optimizedWhileGrouping + final boolean groupingDimensionsDropped ) { return new Grouping( @@ -138,7 +138,7 @@ private static Grouping create( aggregations, havingFilter, outputRowSignature, - optimizedWhileGrouping + groupingDimensionsDropped ); } @@ -199,9 +199,9 @@ public RowSignature getOutputRowSignature() return outputRowSignature; } - public boolean isOptimizedWhileGrouping() + public boolean hasGroupingDimensionsDropped() { - return optimizedWhileGrouping; + return groupingDimensionsDropped; } /** @@ -291,7 +291,7 @@ public boolean equals(Object o) aggregations.equals(grouping.aggregations) && Objects.equals(havingFilter, grouping.havingFilter) && outputRowSignature.equals(grouping.outputRowSignature) && - optimizedWhileGrouping == grouping.optimizedWhileGrouping; + groupingDimensionsDropped == grouping.groupingDimensionsDropped; } @Override @@ -303,7 +303,7 @@ public int hashCode() aggregations, havingFilter, outputRowSignature, - optimizedWhileGrouping + groupingDimensionsDropped ); } } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index a8e7ee7f4945..429acfb917bf 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -13344,7 +13344,59 @@ public void testCommonVirtualExpressionWithDifferentValueType() throws Exception // When optimization in Grouping#applyProject is applied, and it reduces a Group By query to a timeseries, we // want it to return empty bucket if no row matches @Test - public void testReturnEmptyRowWhenGroupByIsConvertedToTimeseries() throws Exception + public void testReturnEmptyRowWhenGroupByIsConvertedToTimeseriesWithSingleConstantDimension() throws Exception + { + skipVectorize(); + testQuery( + "SELECT 'A' from foo WHERE m1 = 50 AND dim1 = 'wat' GROUP BY 'foobar'", + ImmutableList.of( + Druids.newTimeseriesQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(querySegmentSpec(Filtration.eternity())) + .filters( + and( + selector("m1", "50", null), + selector("dim1", "wat", null) + ) + ) + .granularity(Granularities.ALL) + .postAggregators( + new ExpressionPostAggregator("p0", "'A'", null, ExprMacroTable.nil()) + ) + .context(QUERY_CONTEXT_DO_SKIP_EMPTY_BUCKETS) + .build() + ), + ImmutableList.of() + ); + + + // dim1 is not getting reduced to 'wat' in this case in Calcite (ProjectMergeRule is not getting applied), + // therefore the query is not optimized to a timeseries query + testQuery( + "SELECT 'A' from foo WHERE dim1 = 'wat' GROUP BY dim1", + ImmutableList.of( + GroupByQuery.builder() + .setDataSource( + "foo" + ) + .setInterval(querySegmentSpec(Intervals.ETERNITY)) + .setGranularity(Granularities.ALL) + .addDimension(new DefaultDimensionSpec("dim1", "d0", ColumnType.STRING)) + .setDimFilter(selector("dim1", "wat", null)) + .setPostAggregatorSpecs( + ImmutableList.of( + new ExpressionPostAggregator("p0", "'A'", null, ExprMacroTable.nil()) + ) + ) + + .build() + ), + ImmutableList.of() + ); + } + + @Test + public void testReturnEmptyRowWhenGroupByIsConvertedToTimeseriesWithMutlipleConstantDimensions() throws Exception { skipVectorize(); testQuery( @@ -13369,5 +13421,30 @@ public void testReturnEmptyRowWhenGroupByIsConvertedToTimeseries() throws Except ), ImmutableList.of() ); + + // Sanity test, that even when dimensions are reduced, but should produce a valid result (i.e. when filters are + // correct, then they should + testQuery( + "SELECT 'A', dim1 from foo WHERE m1 = 2.0 AND dim1 = '10.1' GROUP BY dim1", + ImmutableList.of( + Druids.newTimeseriesQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(querySegmentSpec(Filtration.eternity())) + .filters( + and( + selector("m1", "2.0", null), + selector("dim1", "10.1", null) + ) + ) + .granularity(Granularities.ALL) + .postAggregators( + new ExpressionPostAggregator("p0", "'A'", null, ExprMacroTable.nil()), + new ExpressionPostAggregator("p1", "'10.1'", null, ExprMacroTable.nil()) + ) + .context(QUERY_CONTEXT_DO_SKIP_EMPTY_BUCKETS) + .build() + ), + ImmutableList.of(new Object[]{"A", "10.1"}) + ); } }