From 2e51c6c848212a77dde3bd05fccbb5442f8c5727 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Mon, 6 Aug 2018 20:36:14 -0700 Subject: [PATCH 1/2] SQL: Finalize aggregations for inner queries when necessary. Fixes #5779. --- .../histogram/sql/QuantileSqlAggregator.java | 5 +- .../aggregation/DimensionExpression.java | 4 +- .../calcite/aggregation/SqlAggregator.java | 6 +- .../ApproxCountDistinctSqlAggregator.java | 27 ++++-- .../aggregation/builtin/AvgSqlAggregator.java | 9 +- .../builtin/CountSqlAggregator.java | 6 +- .../aggregation/builtin/MaxSqlAggregator.java | 3 +- .../aggregation/builtin/MinSqlAggregator.java | 3 +- .../aggregation/builtin/SumSqlAggregator.java | 3 +- .../druid/sql/calcite/planner/Calcites.java | 11 ++- .../sql/calcite/rel/DruidOuterQueryRel.java | 18 ++-- .../io/druid/sql/calcite/rel/DruidQuery.java | 34 +++++--- .../druid/sql/calcite/rel/DruidQueryRel.java | 13 ++- .../io/druid/sql/calcite/rel/DruidRel.java | 6 +- .../druid/sql/calcite/rel/DruidSemiJoin.java | 4 +- .../sql/calcite/rel/PartialDruidQuery.java | 5 +- .../io/druid/sql/calcite/rule/DruidRules.java | 4 +- .../druid/sql/calcite/rule/GroupByRules.java | 6 +- .../druid/sql/calcite/CalciteQueryTest.java | 86 +++++++++++++++++-- .../sql/calcite/planner/CalcitesTest.java | 18 ++-- 20 files changed, 203 insertions(+), 68 deletions(-) diff --git a/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/sql/QuantileSqlAggregator.java b/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/sql/QuantileSqlAggregator.java index 830771d4a951..af6473bac07f 100644 --- a/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/sql/QuantileSqlAggregator.java +++ b/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/sql/QuantileSqlAggregator.java @@ -48,6 +48,7 @@ import org.apache.calcite.sql.type.SqlTypeFamily; import org.apache.calcite.sql.type.SqlTypeName; +import javax.annotation.Nullable; import java.util.ArrayList; import java.util.List; @@ -62,6 +63,7 @@ public SqlAggFunction calciteFunction() return FUNCTION_INSTANCE; } + @Nullable @Override public Aggregation toDruidAggregation( final PlannerContext plannerContext, @@ -70,7 +72,8 @@ public Aggregation toDruidAggregation( final String name, final AggregateCall aggregateCall, final Project project, - final List existingAggregations + final List existingAggregations, + final boolean finalizeAggregations ) { final DruidExpression input = Expressions.toDruidExpression( diff --git a/sql/src/main/java/io/druid/sql/calcite/aggregation/DimensionExpression.java b/sql/src/main/java/io/druid/sql/calcite/aggregation/DimensionExpression.java index d2573e880ed5..6d4ef48081fb 100644 --- a/sql/src/main/java/io/druid/sql/calcite/aggregation/DimensionExpression.java +++ b/sql/src/main/java/io/druid/sql/calcite/aggregation/DimensionExpression.java @@ -20,13 +20,13 @@ package io.druid.sql.calcite.aggregation; import com.google.common.collect.ImmutableList; -import io.druid.java.util.common.StringUtils; import io.druid.math.expr.ExprMacroTable; import io.druid.query.dimension.DefaultDimensionSpec; import io.druid.query.dimension.DimensionSpec; import io.druid.segment.VirtualColumn; import io.druid.segment.column.ValueType; import io.druid.sql.calcite.expression.DruidExpression; +import io.druid.sql.calcite.planner.Calcites; import javax.annotation.Nullable; import java.util.List; @@ -80,7 +80,7 @@ public List getVirtualColumns(final ExprMacroTable macroTable) @Nullable public String getVirtualColumnName() { - return expression.isSimpleExtraction() ? null : StringUtils.format("%s:v", outputName); + return expression.isSimpleExtraction() ? null : Calcites.makePrefixedName(outputName, "v"); } @Override diff --git a/sql/src/main/java/io/druid/sql/calcite/aggregation/SqlAggregator.java b/sql/src/main/java/io/druid/sql/calcite/aggregation/SqlAggregator.java index 83ac84ae5c80..4c8777d4af9b 100644 --- a/sql/src/main/java/io/druid/sql/calcite/aggregation/SqlAggregator.java +++ b/sql/src/main/java/io/druid/sql/calcite/aggregation/SqlAggregator.java @@ -53,6 +53,9 @@ public interface SqlAggregator * @param project project that should be applied before aggregation; may be null * @param existingAggregations existing aggregations for this query; useful for re-using aggregations. May be safely * ignored if you do not want to re-use existing aggregations. + * @param finalizeAggregations true if this query should include explicit finalization for all of its + * aggregators, where required. This is set for subqueries where Druid's native query + * layer does not do this automatically. * * @return aggregation, or null if the call cannot be translated */ @@ -64,6 +67,7 @@ Aggregation toDruidAggregation( String name, AggregateCall aggregateCall, Project project, - List existingAggregations + List existingAggregations, + boolean finalizeAggregations ); } diff --git a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/ApproxCountDistinctSqlAggregator.java b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/ApproxCountDistinctSqlAggregator.java index dfb1c37f477a..208609aaac60 100644 --- a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/ApproxCountDistinctSqlAggregator.java +++ b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/ApproxCountDistinctSqlAggregator.java @@ -22,9 +22,9 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import io.druid.java.util.common.ISE; -import io.druid.java.util.common.StringUtils; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.cardinality.CardinalityAggregatorFactory; +import io.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator; import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; import io.druid.query.dimension.DefaultDimensionSpec; import io.druid.query.dimension.DimensionSpec; @@ -52,6 +52,7 @@ import javax.annotation.Nullable; import java.util.ArrayList; +import java.util.Collections; import java.util.List; public class ApproxCountDistinctSqlAggregator implements SqlAggregator @@ -74,7 +75,8 @@ public Aggregation toDruidAggregation( final String name, final AggregateCall aggregateCall, final Project project, - final List existingAggregations + final List existingAggregations, + final boolean finalizeAggregations ) { // Don't use Aggregations.getArgumentsForSimpleAggregator, since it won't let us use direct column access @@ -92,14 +94,15 @@ public Aggregation toDruidAggregation( final List virtualColumns = new ArrayList<>(); final AggregatorFactory aggregatorFactory; + final String aggregatorName = finalizeAggregations ? Calcites.makePrefixedName(name, "a") : name; if (arg.isDirectColumnAccess() && rowSignature.getColumnType(arg.getDirectColumn()) == ValueType.COMPLEX) { - aggregatorFactory = new HyperUniquesAggregatorFactory(name, arg.getDirectColumn(), false, true); + aggregatorFactory = new HyperUniquesAggregatorFactory(aggregatorName, arg.getDirectColumn(), false, true); } else { final SqlTypeName sqlTypeName = rexNode.getType().getSqlTypeName(); final ValueType inputType = Calcites.getValueTypeForSqlTypeName(sqlTypeName); if (inputType == null) { - throw new ISE("Cannot translate sqlTypeName[%s] to Druid type for field[%s]", sqlTypeName, name); + throw new ISE("Cannot translate sqlTypeName[%s] to Druid type for field[%s]", sqlTypeName, aggregatorName); } final DimensionSpec dimensionSpec; @@ -108,7 +111,7 @@ public Aggregation toDruidAggregation( dimensionSpec = arg.getSimpleExtraction().toDimensionSpec(null, inputType); } else { final ExpressionVirtualColumn virtualColumn = arg.toVirtualColumn( - StringUtils.format("%s:v", name), + Calcites.makePrefixedName(name, "v"), inputType, plannerContext.getExprMacroTable() ); @@ -116,10 +119,20 @@ public Aggregation toDruidAggregation( virtualColumns.add(virtualColumn); } - aggregatorFactory = new CardinalityAggregatorFactory(name, null, ImmutableList.of(dimensionSpec), false, true); + aggregatorFactory = new CardinalityAggregatorFactory( + aggregatorName, + null, + ImmutableList.of(dimensionSpec), + false, + true + ); } - return Aggregation.create(virtualColumns, aggregatorFactory); + return Aggregation.create( + virtualColumns, + Collections.singletonList(aggregatorFactory), + finalizeAggregations ? new HyperUniqueFinalizingPostAggregator(name, aggregatorFactory.getName()) : null + ); } private static class ApproxCountDistinctSqlAggFunction extends SqlAggFunction diff --git a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/AvgSqlAggregator.java b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/AvgSqlAggregator.java index e490f0fb8caf..5d695bb26559 100644 --- a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/AvgSqlAggregator.java +++ b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/AvgSqlAggregator.java @@ -21,7 +21,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; -import io.druid.java.util.common.StringUtils; import io.druid.math.expr.ExprMacroTable; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; @@ -32,6 +31,7 @@ import io.druid.sql.calcite.aggregation.Aggregations; import io.druid.sql.calcite.aggregation.SqlAggregator; import io.druid.sql.calcite.expression.DruidExpression; +import io.druid.sql.calcite.planner.Calcites; import io.druid.sql.calcite.planner.PlannerContext; import io.druid.sql.calcite.table.RowSignature; import org.apache.calcite.rel.core.AggregateCall; @@ -61,7 +61,8 @@ public Aggregation toDruidAggregation( final String name, final AggregateCall aggregateCall, final Project project, - final List existingAggregations + final List existingAggregations, + final boolean finalizeAggregations ) { if (aggregateCall.isDistinct()) { @@ -102,8 +103,8 @@ public Aggregation toDruidAggregation( expression = arg.getExpression(); } - final String sumName = StringUtils.format("%s:sum", name); - final String countName = StringUtils.format("%s:count", name); + final String sumName = Calcites.makePrefixedName(name, "sum"); + final String countName = Calcites.makePrefixedName(name, "count"); final AggregatorFactory sum = SumSqlAggregator.createSumAggregatorFactory( sumType, sumName, diff --git a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/CountSqlAggregator.java b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/CountSqlAggregator.java index 61aac896e154..8b47037c2414 100644 --- a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/CountSqlAggregator.java +++ b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/CountSqlAggregator.java @@ -60,7 +60,8 @@ public Aggregation toDruidAggregation( final String name, final AggregateCall aggregateCall, final Project project, - final List existingAggregations + final List existingAggregations, + final boolean finalizeAggregations ) { final List args = Aggregations.getArgumentsForSimpleAggregator( @@ -87,7 +88,8 @@ public Aggregation toDruidAggregation( name, aggregateCall, project, - existingAggregations + existingAggregations, + finalizeAggregations ); } else { return null; diff --git a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/MaxSqlAggregator.java b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/MaxSqlAggregator.java index 9dff97d6dd58..5e96057f4ad0 100644 --- a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/MaxSqlAggregator.java +++ b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/MaxSqlAggregator.java @@ -60,7 +60,8 @@ public Aggregation toDruidAggregation( final String name, final AggregateCall aggregateCall, final Project project, - final List existingAggregations + final List existingAggregations, + final boolean finalizeAggregations ) { if (aggregateCall.isDistinct()) { diff --git a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/MinSqlAggregator.java b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/MinSqlAggregator.java index 0950cad1410b..81a26d3a5697 100644 --- a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/MinSqlAggregator.java +++ b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/MinSqlAggregator.java @@ -60,7 +60,8 @@ public Aggregation toDruidAggregation( final String name, final AggregateCall aggregateCall, final Project project, - final List existingAggregations + final List existingAggregations, + final boolean finalizeAggregations ) { if (aggregateCall.isDistinct()) { diff --git a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/SumSqlAggregator.java b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/SumSqlAggregator.java index f8076527aaf3..61a36c8a7e35 100644 --- a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/SumSqlAggregator.java +++ b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/SumSqlAggregator.java @@ -60,7 +60,8 @@ public Aggregation toDruidAggregation( final String name, final AggregateCall aggregateCall, final Project project, - final List existingAggregations + final List existingAggregations, + final boolean finalizeAggregations ) { if (aggregateCall.isDistinct()) { diff --git a/sql/src/main/java/io/druid/sql/calcite/planner/Calcites.java b/sql/src/main/java/io/druid/sql/calcite/planner/Calcites.java index e59c554027a0..df401b351ecc 100644 --- a/sql/src/main/java/io/druid/sql/calcite/planner/Calcites.java +++ b/sql/src/main/java/io/druid/sql/calcite/planner/Calcites.java @@ -339,21 +339,26 @@ public static boolean isIntLiteral(final RexNode rexNode) return rexNode instanceof RexLiteral && SqlTypeName.INT_TYPES.contains(rexNode.getType().getSqlTypeName()); } - public static String findOutputNamePrefix(final String basePrefix, final NavigableSet strings) + public static String findUnusedPrefix(final String basePrefix, final NavigableSet strings) { String prefix = basePrefix; - while (!isUsablePrefix(strings, prefix)) { + while (!isUnusedPrefix(prefix, strings)) { prefix = "_" + prefix; } return prefix; } - private static boolean isUsablePrefix(final NavigableSet strings, final String prefix) + private static boolean isUnusedPrefix(final String prefix, final NavigableSet strings) { // ":" is one character after "9" final NavigableSet subSet = strings.subSet(prefix + "0", true, prefix + ":", false); return subSet.isEmpty(); } + + public static String makePrefixedName(final String prefix, final String suffix) + { + return StringUtils.format("%s:%s", prefix, suffix); + } } diff --git a/sql/src/main/java/io/druid/sql/calcite/rel/DruidOuterQueryRel.java b/sql/src/main/java/io/druid/sql/calcite/rel/DruidOuterQueryRel.java index 6caab8d544f1..7d0666cd3bf5 100644 --- a/sql/src/main/java/io/druid/sql/calcite/rel/DruidOuterQueryRel.java +++ b/sql/src/main/java/io/druid/sql/calcite/rel/DruidOuterQueryRel.java @@ -88,7 +88,11 @@ public PartialDruidQuery getPartialDruidQuery() @Override public Sequence runQuery() { - final DruidQuery query = toDruidQuery(); + // runQuery doesn't need to finalize aggregations, because the fact that runQuery is happening suggests this + // is the outermost query and it will actually get run as a native query. Druid's native query layer will + // finalize aggregations for the outermost query even if we don't explicitly ask it to. + + final DruidQuery query = toDruidQuery(false); if (query != null) { return getQueryMaker().runQuery(query); } else { @@ -116,9 +120,11 @@ public int getQueryCount() @Nullable @Override - public DruidQuery toDruidQuery() + public DruidQuery toDruidQuery(final boolean finalizeAggregations) { - final DruidQuery subQuery = ((DruidRel) sourceRel).toDruidQuery(); + // Must finalize aggregations on subqueries. + + final DruidQuery subQuery = ((DruidRel) sourceRel).toDruidQuery(true); if (subQuery == null) { return null; } @@ -128,7 +134,8 @@ public DruidQuery toDruidQuery() new QueryDataSource(subQuery.toGroupByQuery()), sourceRowSignature, getPlannerContext(), - getCluster().getRexBuilder() + getCluster().getRexBuilder(), + finalizeAggregations ); } @@ -142,7 +149,8 @@ public DruidQuery toDruidQueryForExplaining() sourceRel.getRowType() ), getPlannerContext(), - getCluster().getRexBuilder() + getCluster().getRexBuilder(), + false ); } diff --git a/sql/src/main/java/io/druid/sql/calcite/rel/DruidQuery.java b/sql/src/main/java/io/druid/sql/calcite/rel/DruidQuery.java index 29801265ed05..e6d5ba5e7676 100644 --- a/sql/src/main/java/io/druid/sql/calcite/rel/DruidQuery.java +++ b/sql/src/main/java/io/druid/sql/calcite/rel/DruidQuery.java @@ -131,7 +131,8 @@ public DruidQuery( final DataSource dataSource, final RowSignature sourceRowSignature, final PlannerContext plannerContext, - final RexBuilder rexBuilder + final RexBuilder rexBuilder, + final boolean finalizeAggregations ) { this.dataSource = dataSource; @@ -142,7 +143,7 @@ public DruidQuery( // Now the fun begins. this.filter = computeWhereFilter(partialQuery, sourceRowSignature, plannerContext); this.selectProjection = computeSelectProjection(partialQuery, plannerContext, sourceRowSignature); - this.grouping = computeGrouping(partialQuery, plannerContext, sourceRowSignature, rexBuilder); + this.grouping = computeGrouping(partialQuery, plannerContext, sourceRowSignature, rexBuilder, finalizeAggregations); final RowSignature sortingInputRowSignature; @@ -222,7 +223,7 @@ private static SelectProjection computeSelectProjection( final List virtualColumns = new ArrayList<>(); final List rowOrder = new ArrayList<>(); - final String virtualColumnPrefix = Calcites.findOutputNamePrefix( + final String virtualColumnPrefix = Calcites.findUnusedPrefix( "v", new TreeSet<>(sourceRowSignature.getRowOrder()) ); @@ -254,7 +255,8 @@ private static Grouping computeGrouping( final PartialDruidQuery partialQuery, final PlannerContext plannerContext, final RowSignature sourceRowSignature, - final RexBuilder rexBuilder + final RexBuilder rexBuilder, + final boolean finalizeAggregations ) { final Aggregate aggregate = partialQuery.getAggregate(); @@ -269,7 +271,8 @@ private static Grouping computeGrouping( partialQuery, plannerContext, sourceRowSignature, - rexBuilder + rexBuilder, + finalizeAggregations ); final RowSignature aggregateRowSignature = RowSignature.from( @@ -428,7 +431,7 @@ private static List computeDimensions( { final Aggregate aggregate = Preconditions.checkNotNull(partialQuery.getAggregate()); final List dimensions = new ArrayList<>(); - final String outputNamePrefix = Calcites.findOutputNamePrefix("d", new TreeSet<>(sourceRowSignature.getRowOrder())); + final String outputNamePrefix = Calcites.findUnusedPrefix("d", new TreeSet<>(sourceRowSignature.getRowOrder())); int outputNameCounter = 0; for (int i : aggregate.getGroupSet()) { @@ -460,10 +463,13 @@ private static List computeDimensions( /** * Returns aggregations corresponding to {@code aggregate.getAggCallList()}, in the same order. * - * @param partialQuery partial query - * @param plannerContext planner context - * @param sourceRowSignature source row signature - * @param rexBuilder calcite RexBuilder + * @param partialQuery partial query + * @param plannerContext planner context + * @param sourceRowSignature source row signature + * @param rexBuilder calcite RexBuilder + * @param finalizeAggregations true if this query should include explicit finalization for all of its + * aggregators, where required. Useful for subqueries where Druid's native query layer + * does not do this automatically. * * @return aggregations * @@ -473,12 +479,13 @@ private static List computeAggregations( final PartialDruidQuery partialQuery, final PlannerContext plannerContext, final RowSignature sourceRowSignature, - final RexBuilder rexBuilder + final RexBuilder rexBuilder, + final boolean finalizeAggregations ) { final Aggregate aggregate = Preconditions.checkNotNull(partialQuery.getAggregate()); final List aggregations = new ArrayList<>(); - final String outputNamePrefix = Calcites.findOutputNamePrefix("a", new TreeSet<>(sourceRowSignature.getRowOrder())); + final String outputNamePrefix = Calcites.findUnusedPrefix("a", new TreeSet<>(sourceRowSignature.getRowOrder())); for (int i = 0; i < aggregate.getAggCallList().size(); i++) { final String aggName = outputNamePrefix + i; @@ -490,7 +497,8 @@ private static List computeAggregations( partialQuery.getSelectProject(), aggCall, aggregations, - aggName + aggName, + finalizeAggregations ); if (aggregation == null) { diff --git a/sql/src/main/java/io/druid/sql/calcite/rel/DruidQueryRel.java b/sql/src/main/java/io/druid/sql/calcite/rel/DruidQueryRel.java index 0b1088a8ac2d..57dea9b91aec 100644 --- a/sql/src/main/java/io/druid/sql/calcite/rel/DruidQueryRel.java +++ b/sql/src/main/java/io/druid/sql/calcite/rel/DruidQueryRel.java @@ -93,20 +93,21 @@ public static DruidQueryRel fullScan( @Override @Nonnull - public DruidQuery toDruidQuery() + public DruidQuery toDruidQuery(final boolean finalizeAggregations) { return partialQuery.build( druidTable.getDataSource(), druidTable.getRowSignature(), getPlannerContext(), - getCluster().getRexBuilder() + getCluster().getRexBuilder(), + finalizeAggregations ); } @Override public DruidQuery toDruidQueryForExplaining() { - return toDruidQuery(); + return toDruidQuery(false); } @Override @@ -169,7 +170,11 @@ public int getQueryCount() @Override public Sequence runQuery() { - return getQueryMaker().runQuery(toDruidQuery()); + // runQuery doesn't need to finalize aggregations, because the fact that runQuery is happening suggests this + // is the outermost query and it will actually get run as a native query. Druid's native query layer will + // finalize aggregations for the outermost query even if we don't explicitly ask it to. + + return getQueryMaker().runQuery(toDruidQuery(false)); } @Override diff --git a/sql/src/main/java/io/druid/sql/calcite/rel/DruidRel.java b/sql/src/main/java/io/druid/sql/calcite/rel/DruidRel.java index 7739c9b3e143..12053a75b997 100644 --- a/sql/src/main/java/io/druid/sql/calcite/rel/DruidRel.java +++ b/sql/src/main/java/io/druid/sql/calcite/rel/DruidRel.java @@ -76,12 +76,16 @@ public boolean isValidDruidQuery() * * This method may return null if it knows that this rel will yield an empty result set. * + * @param finalizeAggregations true if this query should include explicit finalization for all of its + * aggregators, where required. Useful for subqueries where Druid's native query layer + * does not do this automatically. + * * @return query, or null if it is known in advance that this rel will yield an empty result set. * * @throws CannotBuildQueryException */ @Nullable - public abstract DruidQuery toDruidQuery(); + public abstract DruidQuery toDruidQuery(boolean finalizeAggregations); /** * Convert this DruidRel to a DruidQuery for purposes of explaining. This must be an inexpensive operation. For diff --git a/sql/src/main/java/io/druid/sql/calcite/rel/DruidSemiJoin.java b/sql/src/main/java/io/druid/sql/calcite/rel/DruidSemiJoin.java index 65fe55f9545e..f54fd51c68ed 100644 --- a/sql/src/main/java/io/druid/sql/calcite/rel/DruidSemiJoin.java +++ b/sql/src/main/java/io/druid/sql/calcite/rel/DruidSemiJoin.java @@ -143,10 +143,10 @@ public DruidSemiJoin withPartialQuery(final PartialDruidQuery newQueryBuilder) @Nullable @Override - public DruidQuery toDruidQuery() + public DruidQuery toDruidQuery(final boolean finalizeAggregations) { final DruidRel rel = getLeftRelWithFilter(); - return rel != null ? rel.toDruidQuery() : null; + return rel != null ? rel.toDruidQuery(finalizeAggregations) : null; } @Override diff --git a/sql/src/main/java/io/druid/sql/calcite/rel/PartialDruidQuery.java b/sql/src/main/java/io/druid/sql/calcite/rel/PartialDruidQuery.java index 9483fa25f90c..5adfacf69ecf 100644 --- a/sql/src/main/java/io/druid/sql/calcite/rel/PartialDruidQuery.java +++ b/sql/src/main/java/io/druid/sql/calcite/rel/PartialDruidQuery.java @@ -278,10 +278,11 @@ public DruidQuery build( final DataSource dataSource, final RowSignature sourceRowSignature, final PlannerContext plannerContext, - final RexBuilder rexBuilder + final RexBuilder rexBuilder, + final boolean finalizeAggregations ) { - return new DruidQuery(this, dataSource, sourceRowSignature, plannerContext, rexBuilder); + return new DruidQuery(this, dataSource, sourceRowSignature, plannerContext, rexBuilder, finalizeAggregations); } public boolean canAccept(final Stage stage) diff --git a/sql/src/main/java/io/druid/sql/calcite/rule/DruidRules.java b/sql/src/main/java/io/druid/sql/calcite/rule/DruidRules.java index 7c44c7bd2106..642c8860c9d4 100644 --- a/sql/src/main/java/io/druid/sql/calcite/rule/DruidRules.java +++ b/sql/src/main/java/io/druid/sql/calcite/rule/DruidRules.java @@ -107,7 +107,7 @@ public DruidQueryRule( { super( operand(relClass, operand(DruidRel.class, any())), - StringUtils.format("%s:%s", DruidQueryRule.class.getSimpleName(), stage) + StringUtils.format("%s(%s)", DruidQueryRule.class.getSimpleName(), stage) ); this.stage = stage; this.f = f; @@ -261,7 +261,7 @@ public void onMatch(RelOptRuleCall call) public DruidOuterQueryRule(final RelOptRuleOperand op, final String description) { - super(op, StringUtils.format("%s:%s", DruidOuterQueryRel.class.getSimpleName(), description)); + super(op, StringUtils.format("%s(%s)", DruidOuterQueryRel.class.getSimpleName(), description)); } @Override diff --git a/sql/src/main/java/io/druid/sql/calcite/rule/GroupByRules.java b/sql/src/main/java/io/druid/sql/calcite/rule/GroupByRules.java index 368bfac87b32..c42fba6e45be 100644 --- a/sql/src/main/java/io/druid/sql/calcite/rule/GroupByRules.java +++ b/sql/src/main/java/io/druid/sql/calcite/rule/GroupByRules.java @@ -57,7 +57,8 @@ public static Aggregation translateAggregateCall( final Project project, final AggregateCall call, final List existingAggregations, - final String name + final String name, + final boolean finalizeAggregations ) { final DimFilter filter; @@ -125,7 +126,8 @@ public static Aggregation translateAggregateCall( name, call, project, - existingAggregationsWithSameFilter + existingAggregationsWithSameFilter, + finalizeAggregations ); if (retVal == null) { diff --git a/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java index 7a703412fec9..d8b01567ba6e 100644 --- a/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java @@ -50,6 +50,7 @@ import io.druid.query.aggregation.LongMinAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.query.aggregation.cardinality.CardinalityAggregatorFactory; +import io.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator; import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; import io.druid.query.aggregation.post.ArithmeticPostAggregator; import io.druid.query.aggregation.post.ExpressionPostAggregator; @@ -2560,8 +2561,10 @@ public void testGroupByWithSortOnPostAggregationNoTopNConfig() throws Exception .setInterval(QSS(Filtration.eternity())) .setGranularity(Granularities.ALL) .setDimensions(DIMS(new DefaultDimensionSpec("dim1", "d0"))) - .setAggregatorSpecs(new FloatMinAggregatorFactory("a0", "m1"), - new FloatMaxAggregatorFactory("a1", "m1")) + .setAggregatorSpecs( + new FloatMinAggregatorFactory("a0", "m1"), + new FloatMaxAggregatorFactory("a1", "m1") + ) .setPostAggregatorSpecs(ImmutableList.of(EXPRESSION_POST_AGG("p0", "(\"a0\" + \"a1\")"))) .setLimitSpec( new DefaultLimitSpec( @@ -2602,8 +2605,10 @@ public void testGroupByWithSortOnPostAggregationNoTopNContext() throws Exception .setInterval(QSS(Filtration.eternity())) .setGranularity(Granularities.ALL) .setDimensions(DIMS(new DefaultDimensionSpec("dim1", "d0"))) - .setAggregatorSpecs(new FloatMinAggregatorFactory("a0", "m1"), - new FloatMaxAggregatorFactory("a1", "m1")) + .setAggregatorSpecs( + new FloatMinAggregatorFactory("a0", "m1"), + new FloatMaxAggregatorFactory("a1", "m1") + ) .setPostAggregatorSpecs( ImmutableList.of( EXPRESSION_POST_AGG("p0", "(\"a0\" + \"a1\")") @@ -4384,6 +4389,74 @@ public void testExactCountDistinctUsingSubquery() throws Exception ); } + @Test + public void testAvgDailyCountDistinct() throws Exception + { + testQuery( + "SELECT\n" + + " AVG(u)\n" + + "FROM (SELECT FLOOR(__time TO DAY), APPROX_COUNT_DISTINCT(cnt) AS u FROM druid.foo GROUP BY 1)", + ImmutableList.of( + GroupByQuery.builder() + .setDataSource( + new QueryDataSource( + GroupByQuery.builder() + .setDataSource(CalciteTests.DATASOURCE1) + .setInterval(QSS(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setVirtualColumns( + EXPRESSION_VIRTUAL_COLUMN( + "d0:v", + "timestamp_floor(\"__time\",'P1D',null,'UTC')", + ValueType.LONG + ) + ) + .setDimensions(DIMS(new DefaultDimensionSpec("d0:v", "d0", ValueType.LONG))) + .setAggregatorSpecs( + AGGS( + new CardinalityAggregatorFactory( + "a0:a", + null, + DIMS(new DefaultDimensionSpec("cnt", "cnt", ValueType.LONG)), + false, + true + ) + ) + ) + .setPostAggregatorSpecs( + ImmutableList.of( + new HyperUniqueFinalizingPostAggregator("a0", "a0:a") + ) + ) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ) + ) + .setInterval(QSS(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setAggregatorSpecs(AGGS( + new LongSumAggregatorFactory("_a0:sum", "a0"), + new CountAggregatorFactory("_a0:count") + )) + .setPostAggregatorSpecs( + ImmutableList.of( + new ArithmeticPostAggregator( + "_a0", + "quotient", + ImmutableList.of( + new FieldAccessPostAggregator(null, "_a0:sum"), + new FieldAccessPostAggregator(null, "_a0:count") + ) + ) + ) + ) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ), + ImmutableList.of(new Object[]{1L}) + ); + } + @Test public void testTopNFilterJoin() throws Exception { @@ -6897,7 +6970,10 @@ public void testProjectAfterSort2() throws Exception .setAggregatorSpecs( AGGS(new CountAggregatorFactory("a0"), new DoubleSumAggregatorFactory("a1", "m2")) ) - .setPostAggregatorSpecs(Collections.singletonList(EXPRESSION_POST_AGG("p0", "(\"a1\" / \"a0\")"))) + .setPostAggregatorSpecs(Collections.singletonList(EXPRESSION_POST_AGG( + "p0", + "(\"a1\" / \"a0\")" + ))) .setLimitSpec( new DefaultLimitSpec( Collections.singletonList( diff --git a/sql/src/test/java/io/druid/sql/calcite/planner/CalcitesTest.java b/sql/src/test/java/io/druid/sql/calcite/planner/CalcitesTest.java index beb503ddaa45..c104cd63a19d 100644 --- a/sql/src/test/java/io/druid/sql/calcite/planner/CalcitesTest.java +++ b/sql/src/test/java/io/druid/sql/calcite/planner/CalcitesTest.java @@ -42,14 +42,14 @@ public void testEscapeStringLiteral() @Test public void testFindOutputNamePrefix() { - Assert.assertEquals("x", Calcites.findOutputNamePrefix("x", ImmutableSortedSet.of("foo", "bar"))); - Assert.assertEquals("x", Calcites.findOutputNamePrefix("x", ImmutableSortedSet.of("foo", "bar", "x"))); - Assert.assertEquals("_x", Calcites.findOutputNamePrefix("x", ImmutableSortedSet.of("foo", "bar", "x0"))); - Assert.assertEquals("_x", Calcites.findOutputNamePrefix("x", ImmutableSortedSet.of("foo", "bar", "x4"))); - Assert.assertEquals("__x", Calcites.findOutputNamePrefix("x", ImmutableSortedSet.of("foo", "xa", "_x2xx", "x0"))); - Assert.assertEquals("x", Calcites.findOutputNamePrefix("x", ImmutableSortedSet.of("foo", "xa", "_x2xx", " x"))); - Assert.assertEquals("x", Calcites.findOutputNamePrefix("x", ImmutableSortedSet.of("foo", "_xbxx"))); - Assert.assertEquals("x", Calcites.findOutputNamePrefix("x", ImmutableSortedSet.of("foo", "xa", "_x"))); - Assert.assertEquals("__x", Calcites.findOutputNamePrefix("x", ImmutableSortedSet.of("foo", "x1a", "_x90"))); + Assert.assertEquals("x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "bar"))); + Assert.assertEquals("x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "bar", "x"))); + Assert.assertEquals("_x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "bar", "x0"))); + Assert.assertEquals("_x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "bar", "x4"))); + Assert.assertEquals("__x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "xa", "_x2xx", "x0"))); + Assert.assertEquals("x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "xa", "_x2xx", " x"))); + Assert.assertEquals("x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "_xbxx"))); + Assert.assertEquals("x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "xa", "_x"))); + Assert.assertEquals("__x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "x1a", "_x90"))); } } From b19c620036273e23280616874809354b4c784727 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Thu, 23 Aug 2018 09:01:30 -0700 Subject: [PATCH 2/2] Fixed test method name. --- .../test/java/io/druid/sql/calcite/planner/CalcitesTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/src/test/java/io/druid/sql/calcite/planner/CalcitesTest.java b/sql/src/test/java/io/druid/sql/calcite/planner/CalcitesTest.java index c104cd63a19d..9542b03062d8 100644 --- a/sql/src/test/java/io/druid/sql/calcite/planner/CalcitesTest.java +++ b/sql/src/test/java/io/druid/sql/calcite/planner/CalcitesTest.java @@ -40,7 +40,7 @@ public void testEscapeStringLiteral() } @Test - public void testFindOutputNamePrefix() + public void testFindUnusedPrefix() { Assert.assertEquals("x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "bar"))); Assert.assertEquals("x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "bar", "x")));