From cae81a7cdc3d297abe5f2fa7f05b8dec67e35f12 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Wed, 25 Oct 2023 10:53:46 +0530 Subject: [PATCH 1/3] Fix summary row issues in case postaggregations are happening (apache#15232) --- .../msq/test/CalciteSelectQueryMSQTest.java | 12 ++ .../druid/query/groupby/GroupingEngine.java | 14 +- .../query/groupby/GroupByQueryRunnerTest.java | 7 +- .../sql/calcite/planner/CalcitePlanner.java | 3 +- .../calcite/planner/DruidSqlValidator.java | 17 ++- .../sql/calcite/planner/PlannerFactory.java | 7 +- .../druid/sql/calcite/CalciteQueryTest.java | 144 ++++++++++++++++++ .../sql/calcite/CalciteSysQueryTest.java | 3 + 8 files changed, 199 insertions(+), 8 deletions(-) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteSelectQueryMSQTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteSelectQueryMSQTest.java index e9c54cfc54b4..c83ec91f454f 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteSelectQueryMSQTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteSelectQueryMSQTest.java @@ -157,6 +157,18 @@ public void testQueryWithMoreThanMaxNumericInFilter() } + @Ignore + @Override + public void testUnSupportedNullsFirst() + { + } + + @Ignore + @Override + public void testUnSupportedNullsLast() + { + } + /** * Same query as {@link CalciteQueryTest#testArrayAggQueryOnComplexDatatypes}. ARRAY_AGG is not supported in MSQ currently. * Once support is added, this test can be removed and msqCompatible() can be added to the one in CalciteQueryTest. diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java index b242ff98555a..40c336f7a71c 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java @@ -788,11 +788,19 @@ private static boolean summaryRowPreconditions(GroupByQuery query) private static Iterator summaryRowIterator(GroupByQuery q) { List aggSpec = q.getAggregatorSpecs(); - Object[] values = new Object[aggSpec.size()]; + ResultRow resultRow = ResultRow.create(q.getResultRowSizeWithPostAggregators()); for (int i = 0; i < aggSpec.size(); i++) { - values[i] = aggSpec.get(i).factorize(new AllNullColumnSelectorFactory()).get(); + resultRow.set(i, aggSpec.get(i).factorize(new AllNullColumnSelectorFactory()).get()); } - return Collections.singleton(ResultRow.of(values)).iterator(); + Map map = resultRow.toMap(q); + for (int i = 0; i < q.getPostAggregatorSpecs().size(); i++) { + final PostAggregator postAggregator = q.getPostAggregatorSpecs().get(i); + final Object value = postAggregator.compute(map); + + resultRow.set(q.getResultRowPostAggregatorStart() + i, value); + map.put(postAggregator.getName(), value); + } + return Collections.singleton(resultRow).iterator(); } } diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java index c8f5aa7dfcac..e9cd4d0c85e6 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java @@ -12962,6 +12962,9 @@ public void testSummaryrowForEmptyInput() new FloatSumAggregatorFactory("idxFloat", "indexFloat"), new DoubleSumAggregatorFactory("idxDouble", "index") ) + .setPostAggregatorSpecs( + ImmutableList.of( + new ExpressionPostAggregator("post", "idx * 2", null, TestExprMacroTable.INSTANCE))) .setGranularity(QueryRunnerTestHelper.ALL_GRAN) .build(); @@ -12976,7 +12979,9 @@ public void testSummaryrowForEmptyInput() "idxFloat", NullHandling.replaceWithDefault() ? 0.0 : null, "idxDouble", - NullHandling.replaceWithDefault() ? 0.0 : null + NullHandling.replaceWithDefault() ? 0.0 : null, + "post", + NullHandling.replaceWithDefault() ? 0L : null ) ); diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/CalcitePlanner.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/CalcitePlanner.java index 1abec772e313..2cda011848b5 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/CalcitePlanner.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/CalcitePlanner.java @@ -407,7 +407,8 @@ private SqlValidator createSqlValidator(CalciteCatalogReader catalogReader) opTab, catalogReader, getTypeFactory(), - validatorConfig + validatorConfig, + context.unwrapOrThrow(PlannerContext.class) ); } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java index bbc4f99a1316..2844350241b9 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java @@ -30,6 +30,8 @@ import org.apache.calcite.sql.SqlOperatorTable; import org.apache.calcite.sql.parser.SqlParserPos; import org.apache.calcite.sql.validate.SqlValidatorScope; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.sql.calcite.run.EngineFeature; /** * Druid extended SQL validator. (At present, it doesn't actually @@ -37,19 +39,32 @@ */ class DruidSqlValidator extends BaseDruidSqlValidator { + private final PlannerContext plannerContext; + protected DruidSqlValidator( SqlOperatorTable opTab, CalciteCatalogReader catalogReader, JavaTypeFactory typeFactory, - Config validatorConfig + Config validatorConfig, + PlannerContext plannerContext ) { super(opTab, catalogReader, typeFactory, validatorConfig); + this.plannerContext = plannerContext; } @Override public void validateCall(SqlCall call, SqlValidatorScope scope) { + if (call.getKind() == SqlKind.OVER) { + if (!plannerContext.featureAvailable(EngineFeature.WINDOW_FUNCTIONS)) { + throw buildCalciteContextException( + StringUtils.format( + "The query contains window functions; To run these window functions, enable [%s] in query context.", + EngineFeature.WINDOW_FUNCTIONS), + call); + } + } if (call.getKind() == SqlKind.NULLS_FIRST) { SqlNode op0 = call.getOperandList().get(0); if (op0.getKind() == SqlKind.DESCENDING) { diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerFactory.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerFactory.java index 8cd7151dfb75..a5cdc028e7fc 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerFactory.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerFactory.java @@ -182,9 +182,12 @@ public SqlConformance conformance() return DruidConformance.instance(); } }; - } else { - return null; } + if (aClass.equals(PlannerContext.class)) { + return (C) plannerContext; + } + + return null; } }); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index 811227162ac7..0956e2259a46 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -14275,4 +14275,148 @@ public void testUnSupportedNullsLast() .run()); assertThat(e, invalidSqlIs("ASCENDING ordering with NULLS LAST is not supported! (line [1], column [41])")); } + + @Test + public void testWindowingErrorWithoutFeatureFlag() + { + DruidException e = assertThrows(DruidException.class, () -> testBuilder() + .queryContext(ImmutableMap.of(PlannerContext.CTX_ENABLE_WINDOW_FNS, false)) + .sql("SELECT dim1,ROW_NUMBER() OVER () from druid.foo") + .run()); + + assertThat(e, invalidSqlIs("The query contains window functions; To run these window functions, enable [WINDOW_FUNCTIONS] in query context. (line [1], column [13])")); + } + + @Test + public void testInGroupByLimitOutGroupByOrderBy() + { + skipVectorize(); + cannotVectorize(); + testQuery( + "with t AS (SELECT m2, COUNT(m1) as trend_score\n" + + "FROM \"foo\"\n" + + "GROUP BY 1 \n" + + "LIMIT 10\n" + + ")\n" + + "select m2, (MAX(trend_score)) from t\n" + + "where m2 > 2\n" + + "GROUP BY 1 \n" + + "ORDER BY 2 DESC", + QUERY_CONTEXT_DEFAULT, + ImmutableList.of( + new GroupByQuery.Builder() + .setDataSource( + new TopNQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(querySegmentSpec(Filtration.eternity())) + .dimension(new DefaultDimensionSpec("m2", "d0", ColumnType.DOUBLE)) + .threshold(10) + .aggregators(aggregators( + useDefault + ? new CountAggregatorFactory("a0") + : new FilteredAggregatorFactory( + new CountAggregatorFactory("a0"), + notNull("m1") + ) + )) + .metric(new DimensionTopNMetricSpec(null, StringComparators.NUMERIC)) + .context(OUTER_LIMIT_CONTEXT) + .build() + ) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setDimensions( + new DefaultDimensionSpec("d0", "_d0", ColumnType.DOUBLE) + ) + .setDimFilter( + useDefault ? + bound("d0", "2", null, true, false, null, StringComparators.NUMERIC) : + new RangeFilter("d0", ColumnType.LONG, 2L, null, true, false, null) + ) + .setAggregatorSpecs(aggregators( + new LongMaxAggregatorFactory("_a0", "a0") + )) + .setLimitSpec( + DefaultLimitSpec + .builder() + .orderBy(new OrderByColumnSpec("_a0", Direction.DESCENDING, StringComparators.NUMERIC)) + .build() + ) + .setContext(OUTER_LIMIT_CONTEXT) + .build() + ), + ImmutableList.of( + new Object[]{3.0D, 1L}, + new Object[]{4.0D, 1L}, + new Object[]{5.0D, 1L}, + new Object[]{6.0D, 1L} + ) + ); + } + + @Test + public void testInGroupByOrderByLimitOutGroupByOrderByLimit() + { + skipVectorize(); + cannotVectorize(); + testQuery( + "with t AS (SELECT m2 as mo, COUNT(m1) as trend_score\n" + + "FROM \"foo\"\n" + + "GROUP BY 1\n" + + "ORDER BY trend_score DESC\n" + + "LIMIT 10)\n" + + "select mo, (MAX(trend_score)) from t\n" + + "where mo > 2\n" + + "GROUP BY 1 \n" + + "ORDER BY 2 DESC LIMIT 2\n", + QUERY_CONTEXT_DEFAULT, + ImmutableList.of( + new GroupByQuery.Builder() + .setDataSource( + new TopNQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(querySegmentSpec(Filtration.eternity())) + .dimension(new DefaultDimensionSpec("m2", "d0", ColumnType.DOUBLE)) + .threshold(10) + .aggregators(aggregators( + useDefault + ? new CountAggregatorFactory("a0") + : new FilteredAggregatorFactory( + new CountAggregatorFactory("a0"), + notNull("m1") + ) + )) + .metric(new NumericTopNMetricSpec("a0")) + .context(OUTER_LIMIT_CONTEXT) + .build() + ) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setDimensions( + new DefaultDimensionSpec("d0", "_d0", ColumnType.DOUBLE) + ) + .setDimFilter( + useDefault ? + bound("d0", "2", null, true, false, null, StringComparators.NUMERIC) : + new RangeFilter("d0", ColumnType.LONG, 2L, null, true, false, null) + ) + .setAggregatorSpecs(aggregators( + new LongMaxAggregatorFactory("_a0", "a0") + )) + .setLimitSpec( + DefaultLimitSpec + .builder() + .orderBy(new OrderByColumnSpec("_a0", Direction.DESCENDING, StringComparators.NUMERIC)) + .limit(2) + .build() + ) + .setContext(OUTER_LIMIT_CONTEXT) + .build() + ), + ImmutableList.of( + new Object[]{3.0D, 1L}, + new Object[]{4.0D, 1L} + ) + ); + } } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSysQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSysQueryTest.java index 0ebd26e65539..5b0a5dd82e3b 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSysQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSysQueryTest.java @@ -20,8 +20,10 @@ package org.apache.druid.sql.calcite; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import org.apache.druid.sql.calcite.NotYetSupported.Modes; import org.apache.druid.sql.calcite.NotYetSupported.NotYetSupportedProcessor; +import org.apache.druid.sql.calcite.planner.PlannerContext; import org.junit.Rule; import org.junit.Test; @@ -53,6 +55,7 @@ public void testTasksSumOver() msqIncompatible(); testBuilder() + .queryContext(ImmutableMap.of(PlannerContext.CTX_ENABLE_WINDOW_FNS, true)) .sql("select datasource, sum(duration) over () from sys.tasks group by datasource") .expectedResults(ImmutableList.of( new Object[]{"foo", 11L}, From 401a1b4ad1a25d9ec5ed3b12af0e3ec0d7c3aeee Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Wed, 25 Oct 2023 22:43:06 +0530 Subject: [PATCH 2/3] remove test added to resolve the conflict --- .../druid/sql/calcite/CalciteQueryTest.java | 133 ------------------ 1 file changed, 133 deletions(-) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index 0956e2259a46..d60d6b38ee8f 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -14286,137 +14286,4 @@ public void testWindowingErrorWithoutFeatureFlag() assertThat(e, invalidSqlIs("The query contains window functions; To run these window functions, enable [WINDOW_FUNCTIONS] in query context. (line [1], column [13])")); } - - @Test - public void testInGroupByLimitOutGroupByOrderBy() - { - skipVectorize(); - cannotVectorize(); - testQuery( - "with t AS (SELECT m2, COUNT(m1) as trend_score\n" - + "FROM \"foo\"\n" - + "GROUP BY 1 \n" - + "LIMIT 10\n" - + ")\n" - + "select m2, (MAX(trend_score)) from t\n" - + "where m2 > 2\n" - + "GROUP BY 1 \n" - + "ORDER BY 2 DESC", - QUERY_CONTEXT_DEFAULT, - ImmutableList.of( - new GroupByQuery.Builder() - .setDataSource( - new TopNQueryBuilder() - .dataSource(CalciteTests.DATASOURCE1) - .intervals(querySegmentSpec(Filtration.eternity())) - .dimension(new DefaultDimensionSpec("m2", "d0", ColumnType.DOUBLE)) - .threshold(10) - .aggregators(aggregators( - useDefault - ? new CountAggregatorFactory("a0") - : new FilteredAggregatorFactory( - new CountAggregatorFactory("a0"), - notNull("m1") - ) - )) - .metric(new DimensionTopNMetricSpec(null, StringComparators.NUMERIC)) - .context(OUTER_LIMIT_CONTEXT) - .build() - ) - .setInterval(querySegmentSpec(Filtration.eternity())) - .setGranularity(Granularities.ALL) - .setDimensions( - new DefaultDimensionSpec("d0", "_d0", ColumnType.DOUBLE) - ) - .setDimFilter( - useDefault ? - bound("d0", "2", null, true, false, null, StringComparators.NUMERIC) : - new RangeFilter("d0", ColumnType.LONG, 2L, null, true, false, null) - ) - .setAggregatorSpecs(aggregators( - new LongMaxAggregatorFactory("_a0", "a0") - )) - .setLimitSpec( - DefaultLimitSpec - .builder() - .orderBy(new OrderByColumnSpec("_a0", Direction.DESCENDING, StringComparators.NUMERIC)) - .build() - ) - .setContext(OUTER_LIMIT_CONTEXT) - .build() - ), - ImmutableList.of( - new Object[]{3.0D, 1L}, - new Object[]{4.0D, 1L}, - new Object[]{5.0D, 1L}, - new Object[]{6.0D, 1L} - ) - ); - } - - @Test - public void testInGroupByOrderByLimitOutGroupByOrderByLimit() - { - skipVectorize(); - cannotVectorize(); - testQuery( - "with t AS (SELECT m2 as mo, COUNT(m1) as trend_score\n" - + "FROM \"foo\"\n" - + "GROUP BY 1\n" - + "ORDER BY trend_score DESC\n" - + "LIMIT 10)\n" - + "select mo, (MAX(trend_score)) from t\n" - + "where mo > 2\n" - + "GROUP BY 1 \n" - + "ORDER BY 2 DESC LIMIT 2\n", - QUERY_CONTEXT_DEFAULT, - ImmutableList.of( - new GroupByQuery.Builder() - .setDataSource( - new TopNQueryBuilder() - .dataSource(CalciteTests.DATASOURCE1) - .intervals(querySegmentSpec(Filtration.eternity())) - .dimension(new DefaultDimensionSpec("m2", "d0", ColumnType.DOUBLE)) - .threshold(10) - .aggregators(aggregators( - useDefault - ? new CountAggregatorFactory("a0") - : new FilteredAggregatorFactory( - new CountAggregatorFactory("a0"), - notNull("m1") - ) - )) - .metric(new NumericTopNMetricSpec("a0")) - .context(OUTER_LIMIT_CONTEXT) - .build() - ) - .setInterval(querySegmentSpec(Filtration.eternity())) - .setGranularity(Granularities.ALL) - .setDimensions( - new DefaultDimensionSpec("d0", "_d0", ColumnType.DOUBLE) - ) - .setDimFilter( - useDefault ? - bound("d0", "2", null, true, false, null, StringComparators.NUMERIC) : - new RangeFilter("d0", ColumnType.LONG, 2L, null, true, false, null) - ) - .setAggregatorSpecs(aggregators( - new LongMaxAggregatorFactory("_a0", "a0") - )) - .setLimitSpec( - DefaultLimitSpec - .builder() - .orderBy(new OrderByColumnSpec("_a0", Direction.DESCENDING, StringComparators.NUMERIC)) - .limit(2) - .build() - ) - .setContext(OUTER_LIMIT_CONTEXT) - .build() - ), - ImmutableList.of( - new Object[]{3.0D, 1L}, - new Object[]{4.0D, 1L} - ) - ); - } } From 9db32060637674e2046db7585178bb70cfa890f1 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Wed, 25 Oct 2023 23:28:05 +0530 Subject: [PATCH 3/3] Trigger Build