From 15c0236bcde7e13d0b5531ec8afce2b8ca0f2fdb Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Sat, 25 Aug 2018 13:56:23 -0700 Subject: [PATCH 1/2] SQL: Finalize aggregations for inner queries when necessary. (#6221) * SQL: Finalize aggregations for inner queries when necessary. Fixes #5779. * Fixed test method name. --- .../histogram/sql/QuantileSqlAggregator.java | 5 +- .../aggregation/DimensionExpression.java | 4 +- .../calcite/aggregation/SqlAggregator.java | 6 +- .../ApproxCountDistinctSqlAggregator.java | 27 ++++++-- .../aggregation/builtin/AvgSqlAggregator.java | 9 +-- .../builtin/CountSqlAggregator.java | 6 +- .../aggregation/builtin/MaxSqlAggregator.java | 3 +- .../aggregation/builtin/MinSqlAggregator.java | 3 +- .../aggregation/builtin/SumSqlAggregator.java | 3 +- .../druid/sql/calcite/planner/Calcites.java | 11 ++- .../sql/calcite/rel/DruidOuterQueryRel.java | 18 +++-- .../io/druid/sql/calcite/rel/DruidQuery.java | 34 +++++---- .../druid/sql/calcite/rel/DruidQueryRel.java | 13 ++-- .../io/druid/sql/calcite/rel/DruidRel.java | 6 +- .../druid/sql/calcite/rel/DruidSemiJoin.java | 4 +- .../sql/calcite/rel/PartialDruidQuery.java | 5 +- .../io/druid/sql/calcite/rule/DruidRules.java | 4 +- .../druid/sql/calcite/rule/GroupByRules.java | 6 +- .../druid/sql/calcite/CalciteQueryTest.java | 69 +++++++++++++++++++ .../sql/calcite/planner/CalcitesTest.java | 20 +++--- 20 files changed, 192 insertions(+), 64 deletions(-) diff --git a/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/sql/QuantileSqlAggregator.java b/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/sql/QuantileSqlAggregator.java index b9c8d3d3d46b..28c0d65b10d2 100644 --- a/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/sql/QuantileSqlAggregator.java +++ b/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/sql/QuantileSqlAggregator.java @@ -48,6 +48,7 @@ import org.apache.calcite.sql.type.SqlTypeFamily; import org.apache.calcite.sql.type.SqlTypeName; +import javax.annotation.Nullable; import java.util.ArrayList; import java.util.List; @@ -62,6 +63,7 @@ public SqlAggFunction calciteFunction() return FUNCTION_INSTANCE; } + @Nullable @Override public Aggregation toDruidAggregation( final PlannerContext plannerContext, @@ -70,7 +72,8 @@ public Aggregation toDruidAggregation( final String name, final AggregateCall aggregateCall, final Project project, - final List existingAggregations + final List existingAggregations, + final boolean finalizeAggregations ) { final DruidExpression input = Expressions.toDruidExpression( diff --git a/sql/src/main/java/io/druid/sql/calcite/aggregation/DimensionExpression.java b/sql/src/main/java/io/druid/sql/calcite/aggregation/DimensionExpression.java index d5da02d37b77..abc697c59e75 100644 --- a/sql/src/main/java/io/druid/sql/calcite/aggregation/DimensionExpression.java +++ b/sql/src/main/java/io/druid/sql/calcite/aggregation/DimensionExpression.java @@ -20,13 +20,13 @@ package io.druid.sql.calcite.aggregation; import com.google.common.collect.ImmutableList; -import io.druid.java.util.common.StringUtils; import io.druid.math.expr.ExprMacroTable; import io.druid.query.dimension.DefaultDimensionSpec; import io.druid.query.dimension.DimensionSpec; import io.druid.segment.VirtualColumn; import io.druid.segment.column.ValueType; import io.druid.sql.calcite.expression.DruidExpression; +import io.druid.sql.calcite.planner.Calcites; import javax.annotation.Nullable; import java.util.List; @@ -85,7 +85,7 @@ public List getVirtualColumns(final ExprMacroTable macroTable) @Nullable public String getVirtualColumnName() { - return expression.isSimpleExtraction() ? null : StringUtils.format("%s:v", outputName); + return expression.isSimpleExtraction() ? null : Calcites.makePrefixedName(outputName, "v"); } @Override diff --git a/sql/src/main/java/io/druid/sql/calcite/aggregation/SqlAggregator.java b/sql/src/main/java/io/druid/sql/calcite/aggregation/SqlAggregator.java index e6983ffb87ab..dcf2c4e8d20d 100644 --- a/sql/src/main/java/io/druid/sql/calcite/aggregation/SqlAggregator.java +++ b/sql/src/main/java/io/druid/sql/calcite/aggregation/SqlAggregator.java @@ -53,6 +53,9 @@ public interface SqlAggregator * @param project project that should be applied before aggregation; may be null * @param existingAggregations existing aggregations for this query; useful for re-using aggregations. May be safely * ignored if you do not want to re-use existing aggregations. + * @param finalizeAggregations true if this query should include explicit finalization for all of its + * aggregators, where required. This is set for subqueries where Druid's native query + * layer does not do this automatically. * * @return aggregation, or null if the call cannot be translated */ @@ -64,6 +67,7 @@ Aggregation toDruidAggregation( String name, AggregateCall aggregateCall, Project project, - List existingAggregations + List existingAggregations, + boolean finalizeAggregations ); } diff --git a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/ApproxCountDistinctSqlAggregator.java b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/ApproxCountDistinctSqlAggregator.java index 161c3ef9c31e..0abdb8a72060 100644 --- a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/ApproxCountDistinctSqlAggregator.java +++ b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/ApproxCountDistinctSqlAggregator.java @@ -22,9 +22,9 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import io.druid.java.util.common.ISE; -import io.druid.java.util.common.StringUtils; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.cardinality.CardinalityAggregatorFactory; +import io.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator; import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; import io.druid.query.dimension.DefaultDimensionSpec; import io.druid.query.dimension.DimensionSpec; @@ -52,6 +52,7 @@ import javax.annotation.Nullable; import java.util.ArrayList; +import java.util.Collections; import java.util.List; public class ApproxCountDistinctSqlAggregator implements SqlAggregator @@ -74,7 +75,8 @@ public Aggregation toDruidAggregation( final String name, final AggregateCall aggregateCall, final Project project, - final List existingAggregations + final List existingAggregations, + final boolean finalizeAggregations ) { // Don't use Aggregations.getArgumentsForSimpleAggregator, since it won't let us use direct column access @@ -92,14 +94,15 @@ public Aggregation toDruidAggregation( final List virtualColumns = new ArrayList<>(); final AggregatorFactory aggregatorFactory; + final String aggregatorName = finalizeAggregations ? Calcites.makePrefixedName(name, "a") : name; if (arg.isDirectColumnAccess() && rowSignature.getColumnType(arg.getDirectColumn()) == ValueType.COMPLEX) { - aggregatorFactory = new HyperUniquesAggregatorFactory(name, arg.getDirectColumn(), false, true); + aggregatorFactory = new HyperUniquesAggregatorFactory(aggregatorName, arg.getDirectColumn(), false, true); } else { final SqlTypeName sqlTypeName = rexNode.getType().getSqlTypeName(); final ValueType inputType = Calcites.getValueTypeForSqlTypeName(sqlTypeName); if (inputType == null) { - throw new ISE("Cannot translate sqlTypeName[%s] to Druid type for field[%s]", sqlTypeName, name); + throw new ISE("Cannot translate sqlTypeName[%s] to Druid type for field[%s]", sqlTypeName, aggregatorName); } final DimensionSpec dimensionSpec; @@ -108,7 +111,7 @@ public Aggregation toDruidAggregation( dimensionSpec = arg.getSimpleExtraction().toDimensionSpec(null, inputType); } else { final ExpressionVirtualColumn virtualColumn = arg.toVirtualColumn( - StringUtils.format("%s:v", name), + Calcites.makePrefixedName(name, "v"), inputType, plannerContext.getExprMacroTable() ); @@ -116,10 +119,20 @@ public Aggregation toDruidAggregation( virtualColumns.add(virtualColumn); } - aggregatorFactory = new CardinalityAggregatorFactory(name, null, ImmutableList.of(dimensionSpec), false, true); + aggregatorFactory = new CardinalityAggregatorFactory( + aggregatorName, + null, + ImmutableList.of(dimensionSpec), + false, + true + ); } - return Aggregation.create(virtualColumns, aggregatorFactory); + return Aggregation.create( + virtualColumns, + Collections.singletonList(aggregatorFactory), + finalizeAggregations ? new HyperUniqueFinalizingPostAggregator(name, aggregatorFactory.getName()) : null + ); } private static class ApproxCountDistinctSqlAggFunction extends SqlAggFunction diff --git a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/AvgSqlAggregator.java b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/AvgSqlAggregator.java index 9948c96f4b33..32de829cf4d2 100644 --- a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/AvgSqlAggregator.java +++ b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/AvgSqlAggregator.java @@ -21,7 +21,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; -import io.druid.java.util.common.StringUtils; import io.druid.math.expr.ExprMacroTable; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; @@ -32,6 +31,7 @@ import io.druid.sql.calcite.aggregation.Aggregations; import io.druid.sql.calcite.aggregation.SqlAggregator; import io.druid.sql.calcite.expression.DruidExpression; +import io.druid.sql.calcite.planner.Calcites; import io.druid.sql.calcite.planner.PlannerContext; import io.druid.sql.calcite.table.RowSignature; import org.apache.calcite.rel.core.AggregateCall; @@ -61,7 +61,8 @@ public Aggregation toDruidAggregation( final String name, final AggregateCall aggregateCall, final Project project, - final List existingAggregations + final List existingAggregations, + final boolean finalizeAggregations ) { if (aggregateCall.isDistinct()) { @@ -102,8 +103,8 @@ public Aggregation toDruidAggregation( expression = arg.getExpression(); } - final String sumName = StringUtils.format("%s:sum", name); - final String countName = StringUtils.format("%s:count", name); + final String sumName = Calcites.makePrefixedName(name, "sum"); + final String countName = Calcites.makePrefixedName(name, "count"); final AggregatorFactory sum = SumSqlAggregator.createSumAggregatorFactory( sumType, sumName, diff --git a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/CountSqlAggregator.java b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/CountSqlAggregator.java index f8a359bf1c4a..d82e581977db 100644 --- a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/CountSqlAggregator.java +++ b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/CountSqlAggregator.java @@ -60,7 +60,8 @@ public Aggregation toDruidAggregation( final String name, final AggregateCall aggregateCall, final Project project, - final List existingAggregations + final List existingAggregations, + final boolean finalizeAggregations ) { final List args = Aggregations.getArgumentsForSimpleAggregator( @@ -87,7 +88,8 @@ public Aggregation toDruidAggregation( name, aggregateCall, project, - existingAggregations + existingAggregations, + finalizeAggregations ); } else { return null; diff --git a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/MaxSqlAggregator.java b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/MaxSqlAggregator.java index 8be9bdf5538f..6072c114dc56 100644 --- a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/MaxSqlAggregator.java +++ b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/MaxSqlAggregator.java @@ -60,7 +60,8 @@ public Aggregation toDruidAggregation( final String name, final AggregateCall aggregateCall, final Project project, - final List existingAggregations + final List existingAggregations, + final boolean finalizeAggregations ) { if (aggregateCall.isDistinct()) { diff --git a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/MinSqlAggregator.java b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/MinSqlAggregator.java index f23f2d6c3b70..fb6d00983b4e 100644 --- a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/MinSqlAggregator.java +++ b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/MinSqlAggregator.java @@ -60,7 +60,8 @@ public Aggregation toDruidAggregation( final String name, final AggregateCall aggregateCall, final Project project, - final List existingAggregations + final List existingAggregations, + final boolean finalizeAggregations ) { if (aggregateCall.isDistinct()) { diff --git a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/SumSqlAggregator.java b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/SumSqlAggregator.java index bea1f254f9c0..6b7a9ba03533 100644 --- a/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/SumSqlAggregator.java +++ b/sql/src/main/java/io/druid/sql/calcite/aggregation/builtin/SumSqlAggregator.java @@ -60,7 +60,8 @@ public Aggregation toDruidAggregation( final String name, final AggregateCall aggregateCall, final Project project, - final List existingAggregations + final List existingAggregations, + final boolean finalizeAggregations ) { if (aggregateCall.isDistinct()) { diff --git a/sql/src/main/java/io/druid/sql/calcite/planner/Calcites.java b/sql/src/main/java/io/druid/sql/calcite/planner/Calcites.java index ac566a686059..bf39def562be 100644 --- a/sql/src/main/java/io/druid/sql/calcite/planner/Calcites.java +++ b/sql/src/main/java/io/druid/sql/calcite/planner/Calcites.java @@ -297,21 +297,26 @@ public static boolean isIntLiteral(final RexNode rexNode) return rexNode instanceof RexLiteral && SqlTypeName.INT_TYPES.contains(rexNode.getType().getSqlTypeName()); } - public static String findOutputNamePrefix(final String basePrefix, final NavigableSet strings) + public static String findUnusedPrefix(final String basePrefix, final NavigableSet strings) { String prefix = basePrefix; - while (!isUsablePrefix(strings, prefix)) { + while (!isUnusedPrefix(prefix, strings)) { prefix = "_" + prefix; } return prefix; } - private static boolean isUsablePrefix(final NavigableSet strings, final String prefix) + private static boolean isUnusedPrefix(final String prefix, final NavigableSet strings) { // ":" is one character after "9" final NavigableSet subSet = strings.subSet(prefix + "0", true, prefix + ":", false); return subSet.isEmpty(); } + + public static String makePrefixedName(final String prefix, final String suffix) + { + return StringUtils.format("%s:%s", prefix, suffix); + } } diff --git a/sql/src/main/java/io/druid/sql/calcite/rel/DruidOuterQueryRel.java b/sql/src/main/java/io/druid/sql/calcite/rel/DruidOuterQueryRel.java index b54cf2d0dd26..56e80c98fbaa 100644 --- a/sql/src/main/java/io/druid/sql/calcite/rel/DruidOuterQueryRel.java +++ b/sql/src/main/java/io/druid/sql/calcite/rel/DruidOuterQueryRel.java @@ -88,7 +88,11 @@ public PartialDruidQuery getPartialDruidQuery() @Override public Sequence runQuery() { - final DruidQuery query = toDruidQuery(); + // runQuery doesn't need to finalize aggregations, because the fact that runQuery is happening suggests this + // is the outermost query and it will actually get run as a native query. Druid's native query layer will + // finalize aggregations for the outermost query even if we don't explicitly ask it to. + + final DruidQuery query = toDruidQuery(false); if (query != null) { return getQueryMaker().runQuery(query); } else { @@ -116,9 +120,11 @@ public int getQueryCount() @Nullable @Override - public DruidQuery toDruidQuery() + public DruidQuery toDruidQuery(final boolean finalizeAggregations) { - final DruidQuery subQuery = ((DruidRel) sourceRel).toDruidQuery(); + // Must finalize aggregations on subqueries. + + final DruidQuery subQuery = ((DruidRel) sourceRel).toDruidQuery(true); if (subQuery == null) { return null; } @@ -128,7 +134,8 @@ public DruidQuery toDruidQuery() new QueryDataSource(subQuery.toGroupByQuery()), sourceRowSignature, getPlannerContext(), - getCluster().getRexBuilder() + getCluster().getRexBuilder(), + finalizeAggregations ); } @@ -142,7 +149,8 @@ public DruidQuery toDruidQueryForExplaining() sourceRel.getRowType() ), getPlannerContext(), - getCluster().getRexBuilder() + getCluster().getRexBuilder(), + false ); } diff --git a/sql/src/main/java/io/druid/sql/calcite/rel/DruidQuery.java b/sql/src/main/java/io/druid/sql/calcite/rel/DruidQuery.java index bca4481992ff..9740f6815514 100644 --- a/sql/src/main/java/io/druid/sql/calcite/rel/DruidQuery.java +++ b/sql/src/main/java/io/druid/sql/calcite/rel/DruidQuery.java @@ -115,7 +115,8 @@ public DruidQuery( final DataSource dataSource, final RowSignature sourceRowSignature, final PlannerContext plannerContext, - final RexBuilder rexBuilder + final RexBuilder rexBuilder, + final boolean finalizeAggregations ) { this.dataSource = dataSource; @@ -126,7 +127,7 @@ public DruidQuery( // Now the fun begins. this.filter = computeWhereFilter(partialQuery, sourceRowSignature, plannerContext); this.selectProjection = computeSelectProjection(partialQuery, plannerContext, sourceRowSignature); - this.grouping = computeGrouping(partialQuery, plannerContext, sourceRowSignature, rexBuilder); + this.grouping = computeGrouping(partialQuery, plannerContext, sourceRowSignature, rexBuilder, finalizeAggregations); if (this.selectProjection != null) { this.outputRowSignature = this.selectProjection.getOutputRowSignature(); @@ -199,7 +200,7 @@ private static SelectProjection computeSelectProjection( final List virtualColumns = new ArrayList<>(); final List rowOrder = new ArrayList<>(); - final String virtualColumnPrefix = Calcites.findOutputNamePrefix( + final String virtualColumnPrefix = Calcites.findUnusedPrefix( "v", new TreeSet<>(sourceRowSignature.getRowOrder()) ); @@ -231,7 +232,8 @@ private static Grouping computeGrouping( final PartialDruidQuery partialQuery, final PlannerContext plannerContext, final RowSignature sourceRowSignature, - final RexBuilder rexBuilder + final RexBuilder rexBuilder, + final boolean finalizeAggregations ) { final Aggregate aggregate = partialQuery.getAggregate(); @@ -246,7 +248,8 @@ private static Grouping computeGrouping( partialQuery, plannerContext, sourceRowSignature, - rexBuilder + rexBuilder, + finalizeAggregations ); final RowSignature aggregateRowSignature = RowSignature.from( @@ -340,7 +343,7 @@ private static List computeDimensions( { final Aggregate aggregate = Preconditions.checkNotNull(partialQuery.getAggregate()); final List dimensions = new ArrayList<>(); - final String outputNamePrefix = Calcites.findOutputNamePrefix("d", new TreeSet<>(sourceRowSignature.getRowOrder())); + final String outputNamePrefix = Calcites.findUnusedPrefix("d", new TreeSet<>(sourceRowSignature.getRowOrder())); int outputNameCounter = 0; for (int i : aggregate.getGroupSet()) { @@ -372,10 +375,13 @@ private static List computeDimensions( /** * Returns aggregations corresponding to {@code aggregate.getAggCallList()}, in the same order. * - * @param partialQuery partial query - * @param plannerContext planner context - * @param sourceRowSignature source row signature - * @param rexBuilder calcite RexBuilder + * @param partialQuery partial query + * @param plannerContext planner context + * @param sourceRowSignature source row signature + * @param rexBuilder calcite RexBuilder + * @param finalizeAggregations true if this query should include explicit finalization for all of its + * aggregators, where required. Useful for subqueries where Druid's native query layer + * does not do this automatically. * * @return aggregations * @@ -385,12 +391,13 @@ private static List computeAggregations( final PartialDruidQuery partialQuery, final PlannerContext plannerContext, final RowSignature sourceRowSignature, - final RexBuilder rexBuilder + final RexBuilder rexBuilder, + final boolean finalizeAggregations ) { final Aggregate aggregate = Preconditions.checkNotNull(partialQuery.getAggregate()); final List aggregations = new ArrayList<>(); - final String outputNamePrefix = Calcites.findOutputNamePrefix("a", new TreeSet<>(sourceRowSignature.getRowOrder())); + final String outputNamePrefix = Calcites.findUnusedPrefix("a", new TreeSet<>(sourceRowSignature.getRowOrder())); for (int i = 0; i < aggregate.getAggCallList().size(); i++) { final String aggName = outputNamePrefix + i; @@ -402,7 +409,8 @@ private static List computeAggregations( partialQuery.getSelectProject(), aggCall, aggregations, - aggName + aggName, + finalizeAggregations ); if (aggregation == null) { diff --git a/sql/src/main/java/io/druid/sql/calcite/rel/DruidQueryRel.java b/sql/src/main/java/io/druid/sql/calcite/rel/DruidQueryRel.java index 5d0ea438106b..c304e5babd7e 100644 --- a/sql/src/main/java/io/druid/sql/calcite/rel/DruidQueryRel.java +++ b/sql/src/main/java/io/druid/sql/calcite/rel/DruidQueryRel.java @@ -93,20 +93,21 @@ public static DruidQueryRel fullScan( @Override @Nonnull - public DruidQuery toDruidQuery() + public DruidQuery toDruidQuery(final boolean finalizeAggregations) { return partialQuery.build( druidTable.getDataSource(), druidTable.getRowSignature(), getPlannerContext(), - getCluster().getRexBuilder() + getCluster().getRexBuilder(), + finalizeAggregations ); } @Override public DruidQuery toDruidQueryForExplaining() { - return toDruidQuery(); + return toDruidQuery(false); } @Override @@ -169,7 +170,11 @@ public int getQueryCount() @Override public Sequence runQuery() { - return getQueryMaker().runQuery(toDruidQuery()); + // runQuery doesn't need to finalize aggregations, because the fact that runQuery is happening suggests this + // is the outermost query and it will actually get run as a native query. Druid's native query layer will + // finalize aggregations for the outermost query even if we don't explicitly ask it to. + + return getQueryMaker().runQuery(toDruidQuery(false)); } @Override diff --git a/sql/src/main/java/io/druid/sql/calcite/rel/DruidRel.java b/sql/src/main/java/io/druid/sql/calcite/rel/DruidRel.java index 32ff1206f4d6..9244b504b271 100644 --- a/sql/src/main/java/io/druid/sql/calcite/rel/DruidRel.java +++ b/sql/src/main/java/io/druid/sql/calcite/rel/DruidRel.java @@ -76,12 +76,16 @@ public boolean isValidDruidQuery() * * This method may return null if it knows that this rel will yield an empty result set. * + * @param finalizeAggregations true if this query should include explicit finalization for all of its + * aggregators, where required. Useful for subqueries where Druid's native query layer + * does not do this automatically. + * * @return query, or null if it is known in advance that this rel will yield an empty result set. * * @throws CannotBuildQueryException */ @Nullable - public abstract DruidQuery toDruidQuery(); + public abstract DruidQuery toDruidQuery(boolean finalizeAggregations); /** * Convert this DruidRel to a DruidQuery for purposes of explaining. This must be an inexpensive operation. For diff --git a/sql/src/main/java/io/druid/sql/calcite/rel/DruidSemiJoin.java b/sql/src/main/java/io/druid/sql/calcite/rel/DruidSemiJoin.java index 5d6abfc1f01a..ecfd8bbb2b67 100644 --- a/sql/src/main/java/io/druid/sql/calcite/rel/DruidSemiJoin.java +++ b/sql/src/main/java/io/druid/sql/calcite/rel/DruidSemiJoin.java @@ -142,10 +142,10 @@ public DruidSemiJoin withPartialQuery(final PartialDruidQuery newQueryBuilder) @Nullable @Override - public DruidQuery toDruidQuery() + public DruidQuery toDruidQuery(final boolean finalizeAggregations) { final DruidRel rel = getLeftRelWithFilter(); - return rel != null ? rel.toDruidQuery() : null; + return rel != null ? rel.toDruidQuery(finalizeAggregations) : null; } @Override diff --git a/sql/src/main/java/io/druid/sql/calcite/rel/PartialDruidQuery.java b/sql/src/main/java/io/druid/sql/calcite/rel/PartialDruidQuery.java index 263c2e09a1c0..01c960c918f2 100644 --- a/sql/src/main/java/io/druid/sql/calcite/rel/PartialDruidQuery.java +++ b/sql/src/main/java/io/druid/sql/calcite/rel/PartialDruidQuery.java @@ -246,10 +246,11 @@ public DruidQuery build( final DataSource dataSource, final RowSignature sourceRowSignature, final PlannerContext plannerContext, - final RexBuilder rexBuilder + final RexBuilder rexBuilder, + final boolean finalizeAggregations ) { - return new DruidQuery(this, dataSource, sourceRowSignature, plannerContext, rexBuilder); + return new DruidQuery(this, dataSource, sourceRowSignature, plannerContext, rexBuilder, finalizeAggregations); } public boolean canAccept(final Stage stage) diff --git a/sql/src/main/java/io/druid/sql/calcite/rule/DruidRules.java b/sql/src/main/java/io/druid/sql/calcite/rule/DruidRules.java index 5ec46abfd01e..b565aba995aa 100644 --- a/sql/src/main/java/io/druid/sql/calcite/rule/DruidRules.java +++ b/sql/src/main/java/io/druid/sql/calcite/rule/DruidRules.java @@ -101,7 +101,7 @@ public DruidQueryRule( { super( operand(relClass, operand(DruidRel.class, any())), - StringUtils.format("%s:%s", DruidQueryRule.class.getSimpleName(), stage) + StringUtils.format("%s(%s)", DruidQueryRule.class.getSimpleName(), stage) ); this.stage = stage; this.f = f; @@ -229,7 +229,7 @@ public void onMatch(final RelOptRuleCall call) public DruidOuterQueryRule(final RelOptRuleOperand op, final String description) { - super(op, StringUtils.format("%s:%s", DruidOuterQueryRel.class.getSimpleName(), description)); + super(op, StringUtils.format("%s(%s)", DruidOuterQueryRel.class.getSimpleName(), description)); } @Override diff --git a/sql/src/main/java/io/druid/sql/calcite/rule/GroupByRules.java b/sql/src/main/java/io/druid/sql/calcite/rule/GroupByRules.java index 13089816a4a4..e3b52b49703c 100644 --- a/sql/src/main/java/io/druid/sql/calcite/rule/GroupByRules.java +++ b/sql/src/main/java/io/druid/sql/calcite/rule/GroupByRules.java @@ -57,7 +57,8 @@ public static Aggregation translateAggregateCall( final Project project, final AggregateCall call, final List existingAggregations, - final String name + final String name, + final boolean finalizeAggregations ) { final DimFilter filter; @@ -125,7 +126,8 @@ public static Aggregation translateAggregateCall( name, call, project, - existingAggregationsWithSameFilter + existingAggregationsWithSameFilter, + finalizeAggregations ); if (retVal == null) { diff --git a/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java index 19722de6ee05..85ce904956dc 100644 --- a/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java @@ -48,6 +48,7 @@ import io.druid.query.aggregation.LongMinAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.query.aggregation.cardinality.CardinalityAggregatorFactory; +import io.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator; import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; import io.druid.query.aggregation.post.ArithmeticPostAggregator; import io.druid.query.aggregation.post.ExpressionPostAggregator; @@ -4032,6 +4033,74 @@ public void testExactCountDistinctUsingSubquery() throws Exception ); } + @Test + public void testAvgDailyCountDistinct() throws Exception + { + testQuery( + "SELECT\n" + + " AVG(u)\n" + + "FROM (SELECT FLOOR(__time TO DAY), APPROX_COUNT_DISTINCT(cnt) AS u FROM druid.foo GROUP BY 1)", + ImmutableList.of( + GroupByQuery.builder() + .setDataSource( + new QueryDataSource( + GroupByQuery.builder() + .setDataSource(CalciteTests.DATASOURCE1) + .setInterval(QSS(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setVirtualColumns( + EXPRESSION_VIRTUAL_COLUMN( + "d0:v", + "timestamp_floor(\"__time\",'P1D',null,'UTC')", + ValueType.LONG + ) + ) + .setDimensions(DIMS(new DefaultDimensionSpec("d0:v", "d0", ValueType.LONG))) + .setAggregatorSpecs( + AGGS( + new CardinalityAggregatorFactory( + "a0:a", + null, + DIMS(new DefaultDimensionSpec("cnt", "cnt", ValueType.LONG)), + false, + true + ) + ) + ) + .setPostAggregatorSpecs( + ImmutableList.of( + new HyperUniqueFinalizingPostAggregator("a0", "a0:a") + ) + ) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ) + ) + .setInterval(QSS(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setAggregatorSpecs(AGGS( + new LongSumAggregatorFactory("_a0:sum", "a0"), + new CountAggregatorFactory("_a0:count") + )) + .setPostAggregatorSpecs( + ImmutableList.of( + new ArithmeticPostAggregator( + "_a0", + "quotient", + ImmutableList.of( + new FieldAccessPostAggregator(null, "_a0:sum"), + new FieldAccessPostAggregator(null, "_a0:count") + ) + ) + ) + ) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ), + ImmutableList.of(new Object[]{1L}) + ); + } + @Test public void testTopNFilterJoin() throws Exception { diff --git a/sql/src/test/java/io/druid/sql/calcite/planner/CalcitesTest.java b/sql/src/test/java/io/druid/sql/calcite/planner/CalcitesTest.java index 937ba7f81096..ae0cbf2eca02 100644 --- a/sql/src/test/java/io/druid/sql/calcite/planner/CalcitesTest.java +++ b/sql/src/test/java/io/druid/sql/calcite/planner/CalcitesTest.java @@ -41,16 +41,16 @@ public void testEscapeStringLiteral() } @Test - public void testFindOutputNamePrefix() + public void testFindUnusedPrefix() { - Assert.assertEquals("x", Calcites.findOutputNamePrefix("x", ImmutableSortedSet.of("foo", "bar"))); - Assert.assertEquals("x", Calcites.findOutputNamePrefix("x", ImmutableSortedSet.of("foo", "bar", "x"))); - Assert.assertEquals("_x", Calcites.findOutputNamePrefix("x", ImmutableSortedSet.of("foo", "bar", "x0"))); - Assert.assertEquals("_x", Calcites.findOutputNamePrefix("x", ImmutableSortedSet.of("foo", "bar", "x4"))); - Assert.assertEquals("__x", Calcites.findOutputNamePrefix("x", ImmutableSortedSet.of("foo", "xa", "_x2xx", "x0"))); - Assert.assertEquals("x", Calcites.findOutputNamePrefix("x", ImmutableSortedSet.of("foo", "xa", "_x2xx", " x"))); - Assert.assertEquals("x", Calcites.findOutputNamePrefix("x", ImmutableSortedSet.of("foo", "_xbxx"))); - Assert.assertEquals("x", Calcites.findOutputNamePrefix("x", ImmutableSortedSet.of("foo", "xa", "_x"))); - Assert.assertEquals("__x", Calcites.findOutputNamePrefix("x", ImmutableSortedSet.of("foo", "x1a", "_x90"))); + Assert.assertEquals("x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "bar"))); + Assert.assertEquals("x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "bar", "x"))); + Assert.assertEquals("_x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "bar", "x0"))); + Assert.assertEquals("_x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "bar", "x4"))); + Assert.assertEquals("__x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "xa", "_x2xx", "x0"))); + Assert.assertEquals("x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "xa", "_x2xx", " x"))); + Assert.assertEquals("x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "_xbxx"))); + Assert.assertEquals("x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "xa", "_x"))); + Assert.assertEquals("__x", Calcites.findUnusedPrefix("x", ImmutableSortedSet.of("foo", "x1a", "_x90"))); } } From dd3c7df6aef62bc13d4803e4edcf719cffdfc608 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Sat, 25 Aug 2018 19:21:24 -0700 Subject: [PATCH 2/2] Fix test for 0.12 branch. --- sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java index 85ce904956dc..5bfa8a3bfab5 100644 --- a/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java @@ -4051,7 +4051,7 @@ public void testAvgDailyCountDistinct() throws Exception .setVirtualColumns( EXPRESSION_VIRTUAL_COLUMN( "d0:v", - "timestamp_floor(\"__time\",'P1D',null,'UTC')", + "timestamp_floor(\"__time\",'P1D','','UTC')", ValueType.LONG ) )