From d22eef817fa680e184b6eab1f45b889fd1126a14 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Tue, 28 Aug 2018 10:59:32 -0700 Subject: [PATCH] SQL: Fix post-aggregator naming logic for sort-project. (#6250) The old code assumes that post-aggregator prefixes are one character long followed by numbers. This isn't always true (we may pad with underscores to avoid conflicts). Instead, the new code uses a different base prefix for sort-project postaggregators ("s" instead of "p") and uses the usual Calcites.findUnusedPrefix function to avoid conflicts. --- .../io/druid/sql/calcite/rel/DruidQuery.java | 20 +++--- .../druid/sql/calcite/CalciteQueryTest.java | 68 ++++++++++++++++++- 2 files changed, 76 insertions(+), 12 deletions(-) diff --git a/sql/src/main/java/io/druid/sql/calcite/rel/DruidQuery.java b/sql/src/main/java/io/druid/sql/calcite/rel/DruidQuery.java index d10751010bdd..5a3948e9e139 100644 --- a/sql/src/main/java/io/druid/sql/calcite/rel/DruidQuery.java +++ b/sql/src/main/java/io/druid/sql/calcite/rel/DruidQuery.java @@ -89,7 +89,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.OptionalInt; import java.util.TreeSet; import java.util.stream.Collectors; @@ -285,7 +284,7 @@ private static Grouping computeGrouping( plannerContext, aggregateRowSignature, aggregateProject, - 0 + "p" ); projectRowOrderAndPostAggregations.postAggregations.forEach( postAggregator -> aggregations.add(Aggregation.create(postAggregator)) @@ -324,17 +323,11 @@ private SortProject computeSortProject( if (sortProject == null) { return null; } else { - final List postAggregators = grouping.getPostAggregators(); - final OptionalInt maybeMaxCounter = postAggregators - .stream() - .mapToInt(postAggregator -> Integer.parseInt(postAggregator.getName().substring(1))) - .max(); - final ProjectRowOrderAndPostAggregations projectRowOrderAndPostAggregations = computePostAggregations( plannerContext, sortingInputRowSignature, sortProject, - maybeMaxCounter.orElse(-1) + 1 // 0 if max doesn't exist + "s" ); return new SortProject( @@ -361,12 +354,17 @@ private static ProjectRowOrderAndPostAggregations computePostAggregations( PlannerContext plannerContext, RowSignature inputRowSignature, Project project, - int outputNameCounter + String basePrefix ) { final List rowOrder = new ArrayList<>(); final List aggregations = new ArrayList<>(); + final String outputNamePrefix = Calcites.findUnusedPrefix( + basePrefix, + new TreeSet<>(inputRowSignature.getRowOrder()) + ); + int outputNameCounter = 0; for (final RexNode postAggregatorRexNode : project.getChildExps()) { // Attempt to convert to PostAggregator. final DruidExpression postAggregatorExpression = Expressions.toDruidExpression( @@ -384,7 +382,7 @@ private static ProjectRowOrderAndPostAggregations computePostAggregations( // (There might be a SQL-level type cast that we don't care about) rowOrder.add(postAggregatorExpression.getDirectColumn()); } else { - final String postAggregatorName = "p" + outputNameCounter++; + final String postAggregatorName = outputNamePrefix + outputNameCounter++; final PostAggregator postAggregator = new ExpressionPostAggregator( postAggregatorName, postAggregatorExpression.getExpression(), diff --git a/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java index 842a76a8aa52..8683fe415898 100644 --- a/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java @@ -4124,6 +4124,69 @@ public void testExactCountDistinctUsingSubquery() throws Exception ); } + @Test + public void testMinMaxAvgDailyCountWithLimit() throws Exception + { + testQuery( + "SELECT * FROM (" + + " SELECT max(cnt), min(cnt), avg(cnt), TIME_EXTRACT(max(t), 'EPOCH') last_time, count(1) num_days FROM (\n" + + " SELECT TIME_FLOOR(__time, 'P1D') AS t, count(1) cnt\n" + + " FROM \"foo\"\n" + + " GROUP BY 1\n" + + " )" + + ") LIMIT 1\n", + ImmutableList.of( + GroupByQuery.builder() + .setDataSource( + new QueryDataSource( + GroupByQuery.builder() + .setDataSource(CalciteTests.DATASOURCE1) + .setInterval(QSS(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setVirtualColumns( + EXPRESSION_VIRTUAL_COLUMN( + "d0:v", + "timestamp_floor(\"__time\",'P1D','','UTC')", + ValueType.LONG + ) + ) + .setDimensions(DIMS(new DefaultDimensionSpec("d0:v", "d0", ValueType.LONG))) + .setAggregatorSpecs(AGGS(new CountAggregatorFactory("a0"))) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ) + ) + .setInterval(QSS(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setAggregatorSpecs(AGGS( + new LongMaxAggregatorFactory("_a0", "a0"), + new LongMinAggregatorFactory("_a1", "a0"), + new LongSumAggregatorFactory("_a2:sum", "a0"), + new CountAggregatorFactory("_a2:count"), + new LongMaxAggregatorFactory("_a3", "d0"), + new CountAggregatorFactory("_a4") + )) + .setPostAggregatorSpecs( + ImmutableList.of( + new ArithmeticPostAggregator( + "_a2", + "quotient", + ImmutableList.of( + new FieldAccessPostAggregator(null, "_a2:sum"), + new FieldAccessPostAggregator(null, "_a2:count") + ) + ), + EXPRESSION_POST_AGG("s0", "timestamp_extract(\"_a3\",'EPOCH','UTC')") + ) + ) + .setLimit(1) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ), + ImmutableList.of(new Object[]{1L, 1L, 1L, 978480000L, 6L}) + ); + } + @Test public void testAvgDailyCountDistinct() throws Exception { @@ -6595,7 +6658,10 @@ public void testProjectAfterSort2() throws Exception .setAggregatorSpecs( AGGS(new CountAggregatorFactory("a0"), new DoubleSumAggregatorFactory("a1", "m2")) ) - .setPostAggregatorSpecs(Collections.singletonList(EXPRESSION_POST_AGG("p0", "(\"a1\" / \"a0\")"))) + .setPostAggregatorSpecs(Collections.singletonList(EXPRESSION_POST_AGG( + "s0", + "(\"a1\" / \"a0\")" + ))) .setLimitSpec( new DefaultLimitSpec( Collections.singletonList(