From e3afb31635a587a280c90f342b6292dbb9af844b Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Sat, 2 Nov 2019 19:21:02 -0700 Subject: [PATCH 1/6] SQL: EARLIEST, LATEST aggregators. I chose these names instead of FIRST, LAST because those are already reserved functions in Calcite that mean something different. I think these are also better names anyway. --- docs/querying/sql.md | 4 + .../sql/calcite/aggregation/Aggregations.java | 20 +- .../builtin/EarliestLatestSqlAggregator.java | 221 ++++++++++++++++++ .../calcite/planner/DruidOperatorTable.java | 3 + .../druid/sql/calcite/CalciteQueryTest.java | 154 ++++++++++-- 5 files changed, 382 insertions(+), 20 deletions(-) create mode 100644 sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestSqlAggregator.java diff --git a/docs/querying/sql.md b/docs/querying/sql.md index be420de1e4d6..a932f5e7bdd1 100644 --- a/docs/querying/sql.md +++ b/docs/querying/sql.md @@ -195,6 +195,10 @@ Only the COUNT aggregation can accept DISTINCT. |`STDDEV_POP(expr)`|Computes standard deviation population of `expr`. See [stats extension](../development/extensions-core/stats.html) documentation for additional details.| |`STDDEV_SAMP(expr)`|Computes standard deviation sample of `expr`. See [stats extension](../development/extensions-core/stats.html) documentation for additional details.| |`STDDEV(expr)`|Computes standard deviation sample of `expr`. See [stats extension](../development/extensions-core/stats.html) documentation for additional details.| +|`EARLIEST(expr)`|Returns the earliest value of `expr`, which must be numeric. If `expr` comes from a relation with a timestamp column (like a Druid datasource) then "earliest" is the value first encountered with the minimum overall timestamp of all values being aggregated. If `expr` does not come from a relation with a timestamp, then it is simply the first value encountered.| +|`EARLIEST(expr, maxBytesPerString)`|Like `EARLIEST(expr)`, but for strings. The `maxBytesPerString` parameter determines how much aggregation space to allocate per string. Strings longer than this limit will be truncated. This parameter should be set as low as possible, since high values will lead to wasted memory.| +|`LATEST(expr)`|Returns the latest value of `expr`, which must be numeric. If `expr` comes from a relation with a timestamp column (like a Druid datasource) then "latest" is the value last encountered with the maximum overall timestamp of all values being aggregated. If `expr` does not come from a relation with a timestamp, then it is simply the last value encountered.| +|`LATEST(expr, maxBytesPerString)`|Like `LATEST(expr)`, but for strings. The `maxBytesPerString` parameter determines how much aggregation space to allocate per string. Strings longer than this limit will be truncated. This parameter should be set as low as possible, since high values will lead to wasted memory.| For advice on choosing approximate aggregation functions, check out our [approximate aggregations documentation](aggregations.html#approx). diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/Aggregations.java b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/Aggregations.java index c7181424f694..d7f1b34bc8bf 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/Aggregations.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/Aggregations.java @@ -30,6 +30,7 @@ import javax.annotation.Nullable; import java.util.List; +import java.util.Objects; import java.util.function.Function; import java.util.stream.Collectors; @@ -48,10 +49,18 @@ public static List getArgumentsForSimpleAggregator( final Project project ) { - return call.getArgList().stream() - .map(i -> Expressions.fromFieldAccess(rowSignature, project, i)) - .map(rexNode -> toDruidExpressionForSimpleAggregator(plannerContext, rowSignature, rexNode)) - .collect(Collectors.toList()); + final List args = call + .getArgList() + .stream() + .map(i -> Expressions.fromFieldAccess(rowSignature, project, i)) + .map(rexNode -> toDruidExpressionForSimpleAggregator(plannerContext, rowSignature, rexNode)) + .collect(Collectors.toList()); + + if (args.stream().noneMatch(Objects::isNull)) { + return args; + } else { + return null; + } } private static DruidExpression toDruidExpressionForSimpleAggregator( @@ -68,7 +77,8 @@ private static DruidExpression toDruidExpressionForSimpleAggregator( if (druidExpression.isSimpleExtraction() && (!druidExpression.isDirectColumnAccess() || rowSignature.getColumnType(druidExpression.getDirectColumn()) == ValueType.STRING)) { - // Aggregators are unable to implicitly cast strings to numbers. So remove the simple extraction in this case. + // Aggregators are unable to implicitly cast strings to numbers. + // So remove the simple extraction, which forces the expression to be used instead of the direct column access. return druidExpression.map(simpleExtraction -> null, Function.identity()); } else { return druidExpression; diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestSqlAggregator.java b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestSqlAggregator.java new file mode 100644 index 000000000000..f24b81b02342 --- /dev/null +++ b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestSqlAggregator.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.calcite.aggregation.builtin; + +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlAggFunction; +import org.apache.calcite.sql.SqlFunctionCategory; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.type.InferTypes; +import org.apache.calcite.sql.type.OperandTypes; +import org.apache.calcite.sql.type.ReturnTypes; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.first.DoubleFirstAggregatorFactory; +import org.apache.druid.query.aggregation.first.FloatFirstAggregatorFactory; +import org.apache.druid.query.aggregation.first.LongFirstAggregatorFactory; +import org.apache.druid.query.aggregation.first.StringFirstAggregatorFactory; +import org.apache.druid.query.aggregation.last.DoubleLastAggregatorFactory; +import org.apache.druid.query.aggregation.last.FloatLastAggregatorFactory; +import org.apache.druid.query.aggregation.last.LongLastAggregatorFactory; +import org.apache.druid.query.aggregation.last.StringLastAggregatorFactory; +import org.apache.druid.query.aggregation.post.FinalizingFieldAccessPostAggregator; +import org.apache.druid.segment.VirtualColumn; +import org.apache.druid.segment.column.ValueType; +import org.apache.druid.sql.calcite.aggregation.Aggregation; +import org.apache.druid.sql.calcite.aggregation.SqlAggregator; +import org.apache.druid.sql.calcite.expression.DruidExpression; +import org.apache.druid.sql.calcite.expression.Expressions; +import org.apache.druid.sql.calcite.planner.Calcites; +import org.apache.druid.sql.calcite.planner.PlannerContext; +import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry; +import org.apache.druid.sql.calcite.table.RowSignature; + +import javax.annotation.Nullable; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class EarliestLatestSqlAggregator implements SqlAggregator +{ + public static SqlAggregator EARLIEST = new EarliestLatestSqlAggregator(EarliestOrLatest.EARLIEST); + public static SqlAggregator LATEST = new EarliestLatestSqlAggregator(EarliestOrLatest.LATEST); + + enum EarliestOrLatest + { + EARLIEST { + @Override + AggregatorFactory createAggregatorFactory(String name, String fieldName, ValueType type, int maxStringBytes) + { + switch (type) { + case LONG: + return new LongFirstAggregatorFactory(name, fieldName); + case FLOAT: + return new FloatFirstAggregatorFactory(name, fieldName); + case DOUBLE: + return new DoubleFirstAggregatorFactory(name, fieldName); + case STRING: + return new StringFirstAggregatorFactory(name, fieldName, maxStringBytes); + default: + throw new ISE("Cannot build aggregatorFactory for type[%s]", type); + } + } + }, + + LATEST { + @Override + AggregatorFactory createAggregatorFactory(String name, String fieldName, ValueType type, int maxStringBytes) + { + switch (type) { + case LONG: + return new LongLastAggregatorFactory(name, fieldName); + case FLOAT: + return new FloatLastAggregatorFactory(name, fieldName); + case DOUBLE: + return new DoubleLastAggregatorFactory(name, fieldName); + case STRING: + return new StringLastAggregatorFactory(name, fieldName, maxStringBytes); + default: + throw new ISE("Cannot build aggregatorFactory for type[%s]", type); + } + } + }; + + abstract AggregatorFactory createAggregatorFactory( + String name, + String fieldName, + ValueType outputType, + int maxStringBytes + ); + } + + private final EarliestOrLatest earliestOrLatest; + private final SqlAggFunction function; + + private EarliestLatestSqlAggregator(final EarliestOrLatest earliestOrLatest) + { + this.earliestOrLatest = earliestOrLatest; + this.function = new EarliestLatestSqlAggFunction(earliestOrLatest); + } + + @Override + public SqlAggFunction calciteFunction() + { + return function; + } + + @Nullable + @Override + public Aggregation toDruidAggregation( + final PlannerContext plannerContext, + final RowSignature rowSignature, + final VirtualColumnRegistry virtualColumnRegistry, + final RexBuilder rexBuilder, + final String name, + final AggregateCall aggregateCall, + final Project project, + final List existingAggregations, + final boolean finalizeAggregations + ) + { + final List rexNodes = aggregateCall + .getArgList() + .stream() + .map(i -> Expressions.fromFieldAccess(rowSignature, project, i)) + .collect(Collectors.toList()); + + final List args = Expressions.toDruidExpressions(plannerContext, rowSignature, rexNodes); + + if (args == null) { + return null; + } + + final String aggregatorName = finalizeAggregations ? Calcites.makePrefixedName(name, "a") : name; + final String fieldName; + + if (args.get(0).isDirectColumnAccess()) { + fieldName = args.get(0).getDirectColumn(); + } else { + final SqlTypeName sqlTypeName = rexNodes.get(0).getType().getSqlTypeName(); + final VirtualColumn virtualColumn = + virtualColumnRegistry.getOrCreateVirtualColumnForExpression(plannerContext, args.get(0), sqlTypeName); + fieldName = virtualColumn.getOutputName(); + } + + // Second arg must be a literal, if it exists (the type signature below requires it). + final int maxBytes = rexNodes.size() > 1 ? RexLiteral.intValue(rexNodes.get(1)) : -1; + + final ValueType outputType = Calcites.getValueTypeForSqlTypeName(aggregateCall.getType().getSqlTypeName()); + if (outputType == null) { + throw new ISE( + "Cannot translate output sqlTypeName[%s] to Druid type for aggregator[%s]", + aggregateCall.getType().getSqlTypeName(), + aggregateCall.getName() + ); + } + + return Aggregation.create( + Stream.of(virtualColumnRegistry.getVirtualColumn(fieldName)) + .filter(Objects::nonNull) + .collect(Collectors.toList()), + Collections.singletonList( + earliestOrLatest.createAggregatorFactory( + aggregatorName, + fieldName, + outputType, + maxBytes + ) + ), + finalizeAggregations ? new FinalizingFieldAccessPostAggregator(name, aggregatorName) : null + ); + } + + private static class EarliestLatestSqlAggFunction extends SqlAggFunction + { + EarliestLatestSqlAggFunction(EarliestOrLatest earliestOrLatest) + { + super( + earliestOrLatest.name(), + null, + SqlKind.OTHER_FUNCTION, + ReturnTypes.ARG0, + InferTypes.RETURN_TYPE, + OperandTypes.or( + OperandTypes.or(OperandTypes.NUMERIC, OperandTypes.BOOLEAN), + OperandTypes.sequence( + "'" + earliestOrLatest.name() + "(expr, maxBytesPerString)'\n", + OperandTypes.STRING, + OperandTypes.and(OperandTypes.NUMERIC, OperandTypes.LITERAL) + ) + ), + SqlFunctionCategory.STRING, + false, + false + ); + } + } +} diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidOperatorTable.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidOperatorTable.java index a6b7ce842a50..e010be990195 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidOperatorTable.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidOperatorTable.java @@ -35,6 +35,7 @@ import org.apache.druid.sql.calcite.aggregation.builtin.ApproxCountDistinctSqlAggregator; import org.apache.druid.sql.calcite.aggregation.builtin.AvgSqlAggregator; import org.apache.druid.sql.calcite.aggregation.builtin.CountSqlAggregator; +import org.apache.druid.sql.calcite.aggregation.builtin.EarliestLatestSqlAggregator; import org.apache.druid.sql.calcite.aggregation.builtin.MaxSqlAggregator; import org.apache.druid.sql.calcite.aggregation.builtin.MinSqlAggregator; import org.apache.druid.sql.calcite.aggregation.builtin.SumSqlAggregator; @@ -117,6 +118,8 @@ public class DruidOperatorTable implements SqlOperatorTable .add(new ApproxCountDistinctSqlAggregator()) .add(new AvgSqlAggregator()) .add(new CountSqlAggregator()) + .add(EarliestLatestSqlAggregator.EARLIEST) + .add(EarliestLatestSqlAggregator.LATEST) .add(new MinSqlAggregator()) .add(new MaxSqlAggregator()) .add(new SumSqlAggregator()) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index 4b411da4a858..39ebe8602c11 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -43,10 +43,17 @@ import org.apache.druid.query.aggregation.LongMinAggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; import org.apache.druid.query.aggregation.cardinality.CardinalityAggregatorFactory; +import org.apache.druid.query.aggregation.first.FloatFirstAggregatorFactory; +import org.apache.druid.query.aggregation.first.LongFirstAggregatorFactory; +import org.apache.druid.query.aggregation.first.StringFirstAggregatorFactory; import org.apache.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator; import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; +import org.apache.druid.query.aggregation.last.FloatLastAggregatorFactory; +import org.apache.druid.query.aggregation.last.LongLastAggregatorFactory; +import org.apache.druid.query.aggregation.last.StringLastAggregatorFactory; import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator; import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator; +import org.apache.druid.query.aggregation.post.FinalizingFieldAccessPostAggregator; import org.apache.druid.query.dimension.DefaultDimensionSpec; import org.apache.druid.query.dimension.ExtractionDimensionSpec; import org.apache.druid.query.extraction.RegexDimExtractionFn; @@ -314,21 +321,21 @@ public void testInformationSchemaTables() throws Exception + "WHERE TABLE_TYPE IN ('SYSTEM_TABLE', 'TABLE', 'VIEW')", ImmutableList.of(), ImmutableList.builder() - .add(new Object[]{"druid", CalciteTests.DATASOURCE1, "TABLE"}) - .add(new Object[]{"druid", CalciteTests.DATASOURCE2, "TABLE"}) - .add(new Object[]{"druid", CalciteTests.DATASOURCE4, "TABLE"}) - .add(new Object[]{"druid", CalciteTests.DATASOURCE3, "TABLE"}) - .add(new Object[]{"druid", "aview", "VIEW"}) - .add(new Object[]{"druid", "bview", "VIEW"}) - .add(new Object[]{"INFORMATION_SCHEMA", "COLUMNS", "SYSTEM_TABLE"}) - .add(new Object[]{"INFORMATION_SCHEMA", "SCHEMATA", "SYSTEM_TABLE"}) - .add(new Object[]{"INFORMATION_SCHEMA", "TABLES", "SYSTEM_TABLE"}) - .add(new Object[]{"sys", "segments", "SYSTEM_TABLE"}) - .add(new Object[]{"sys", "server_segments", "SYSTEM_TABLE"}) - .add(new Object[]{"sys", "servers", "SYSTEM_TABLE"}) - .add(new Object[]{"sys", "supervisors", "SYSTEM_TABLE"}) - .add(new Object[]{"sys", "tasks", "SYSTEM_TABLE"}) - .build() + .add(new Object[]{"druid", CalciteTests.DATASOURCE1, "TABLE"}) + .add(new Object[]{"druid", CalciteTests.DATASOURCE2, "TABLE"}) + .add(new Object[]{"druid", CalciteTests.DATASOURCE4, "TABLE"}) + .add(new Object[]{"druid", CalciteTests.DATASOURCE3, "TABLE"}) + .add(new Object[]{"druid", "aview", "VIEW"}) + .add(new Object[]{"druid", "bview", "VIEW"}) + .add(new Object[]{"INFORMATION_SCHEMA", "COLUMNS", "SYSTEM_TABLE"}) + .add(new Object[]{"INFORMATION_SCHEMA", "SCHEMATA", "SYSTEM_TABLE"}) + .add(new Object[]{"INFORMATION_SCHEMA", "TABLES", "SYSTEM_TABLE"}) + .add(new Object[]{"sys", "segments", "SYSTEM_TABLE"}) + .add(new Object[]{"sys", "server_segments", "SYSTEM_TABLE"}) + .add(new Object[]{"sys", "servers", "SYSTEM_TABLE"}) + .add(new Object[]{"sys", "supervisors", "SYSTEM_TABLE"}) + .add(new Object[]{"sys", "tasks", "SYSTEM_TABLE"}) + .build() ); testQuery( @@ -889,6 +896,123 @@ public void testGroupBySingleColumnDescendingNoTopN() throws Exception ); } + @Test + public void testEarliestAggregators() throws Exception + { + // Cannot vectorize EARLIEST aggregator. + skipVectorize(); + + testQuery( + "SELECT " + + "EARLIEST(cnt), EARLIEST(m1), EARLIEST(dim1, 10), " + + "EARLIEST(cnt + 1), EARLIEST(m1 + 1), EARLIEST(dim1 || CAST(cnt AS VARCHAR), 10) " + + "FROM druid.foo", + ImmutableList.of( + Druids.newTimeseriesQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(querySegmentSpec(Filtration.eternity())) + .granularity(Granularities.ALL) + .virtualColumns( + expressionVirtualColumn("v0", "(\"cnt\" + 1)", ValueType.LONG), + expressionVirtualColumn("v1", "(\"m1\" + 1)", ValueType.FLOAT), + expressionVirtualColumn("v2", "concat(\"dim1\",CAST(\"cnt\", 'STRING'))", ValueType.STRING) + ) + .aggregators( + aggregators( + new LongFirstAggregatorFactory("a0", "cnt"), + new FloatFirstAggregatorFactory("a1", "m1"), + new StringFirstAggregatorFactory("a2", "dim1", 10), + new LongFirstAggregatorFactory("a3", "v0"), + new FloatFirstAggregatorFactory("a4", "v1"), + new StringFirstAggregatorFactory("a5", "v2", 10) + ) + ) + .context(TIMESERIES_CONTEXT_DEFAULT) + .build() + ), + ImmutableList.of( + new Object[]{1L, 1.0f, "", 2L, 2.0f, "1"} + ) + ); + } + + @Test + public void testLatestAggregators() throws Exception + { + // Cannot vectorize LATEST aggregator. + skipVectorize(); + + testQuery( + "SELECT " + + "LATEST(cnt), LATEST(m1), LATEST(dim1, 10), " + + "LATEST(cnt + 1), LATEST(m1 + 1), LATEST(dim1 || CAST(cnt AS VARCHAR), 10) " + + "FROM druid.foo", + ImmutableList.of( + Druids.newTimeseriesQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(querySegmentSpec(Filtration.eternity())) + .granularity(Granularities.ALL) + .virtualColumns( + expressionVirtualColumn("v0", "(\"cnt\" + 1)", ValueType.LONG), + expressionVirtualColumn("v1", "(\"m1\" + 1)", ValueType.FLOAT), + expressionVirtualColumn("v2", "concat(\"dim1\",CAST(\"cnt\", 'STRING'))", ValueType.STRING) + ) + .aggregators( + aggregators( + new LongLastAggregatorFactory("a0", "cnt"), + new FloatLastAggregatorFactory("a1", "m1"), + new StringLastAggregatorFactory("a2", "dim1", 10), + new LongLastAggregatorFactory("a3", "v0"), + new FloatLastAggregatorFactory("a4", "v1"), + new StringLastAggregatorFactory("a5", "v2", 10) + ) + ) + .context(TIMESERIES_CONTEXT_DEFAULT) + .build() + ), + ImmutableList.of( + new Object[]{1L, 6.0f, "abc", 2L, 7.0f, "abc1"} + ) + ); + } + + @Test + public void testLatestInSubquery() throws Exception + { + // Cannot vectorize LATEST aggregator. + skipVectorize(); + + testQuery( + "SELECT SUM(val) FROM (SELECT dim2, LATEST(m1) AS val FROM foo GROUP BY dim2)", + ImmutableList.of( + GroupByQuery.builder() + .setDataSource( + GroupByQuery.builder() + .setDataSource(CalciteTests.DATASOURCE1) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setDimensions(dimensions(new DefaultDimensionSpec("dim2", "d0"))) + .setAggregatorSpecs(aggregators(new FloatLastAggregatorFactory("a0:a", "m1"))) + .setPostAggregatorSpecs( + ImmutableList.of( + new FinalizingFieldAccessPostAggregator("a0", "a0:a") + ) + ) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setAggregatorSpecs(aggregators(new DoubleSumAggregatorFactory("_a0", "a0"))) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ), + ImmutableList.of( + new Object[]{15.0} + ) + ); + } + @Test public void testGroupByLong() throws Exception { From 45472e74f2d1889089be32f5c8fa3b2beeb4bb9d Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Sun, 3 Nov 2019 17:01:45 -0800 Subject: [PATCH 2/6] Finalify. --- .../aggregation/builtin/EarliestLatestSqlAggregator.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestSqlAggregator.java b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestSqlAggregator.java index f24b81b02342..347080f9bd86 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestSqlAggregator.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestSqlAggregator.java @@ -62,8 +62,8 @@ public class EarliestLatestSqlAggregator implements SqlAggregator { - public static SqlAggregator EARLIEST = new EarliestLatestSqlAggregator(EarliestOrLatest.EARLIEST); - public static SqlAggregator LATEST = new EarliestLatestSqlAggregator(EarliestOrLatest.LATEST); + public static final SqlAggregator EARLIEST = new EarliestLatestSqlAggregator(EarliestOrLatest.EARLIEST); + public static final SqlAggregator LATEST = new EarliestLatestSqlAggregator(EarliestOrLatest.LATEST); enum EarliestOrLatest { From ecb0ca54dcc8b995f9ebcdd078310a8585af36b2 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Wed, 6 Nov 2019 09:31:24 -0800 Subject: [PATCH 3/6] SQL updates. --- docs/querying/sql.md | 4 ++-- .../builtin/EarliestLatestSqlAggregator.java | 4 ++-- .../org/apache/druid/sql/calcite/CalciteQueryTest.java | 10 +++++----- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/docs/querying/sql.md b/docs/querying/sql.md index a932f5e7bdd1..dfa58e137b8d 100644 --- a/docs/querying/sql.md +++ b/docs/querying/sql.md @@ -195,9 +195,9 @@ Only the COUNT aggregation can accept DISTINCT. |`STDDEV_POP(expr)`|Computes standard deviation population of `expr`. See [stats extension](../development/extensions-core/stats.html) documentation for additional details.| |`STDDEV_SAMP(expr)`|Computes standard deviation sample of `expr`. See [stats extension](../development/extensions-core/stats.html) documentation for additional details.| |`STDDEV(expr)`|Computes standard deviation sample of `expr`. See [stats extension](../development/extensions-core/stats.html) documentation for additional details.| -|`EARLIEST(expr)`|Returns the earliest value of `expr`, which must be numeric. If `expr` comes from a relation with a timestamp column (like a Druid datasource) then "earliest" is the value first encountered with the minimum overall timestamp of all values being aggregated. If `expr` does not come from a relation with a timestamp, then it is simply the first value encountered.| +|`EARLIEST(expr)`|Returns the earliest non-null value of `expr`, which must be numeric. If `expr` comes from a relation with a timestamp column (like a Druid datasource) then "earliest" is the value first encountered with the minimum overall timestamp of all values being aggregated. If `expr` does not come from a relation with a timestamp, then it is simply the first value encountered.| |`EARLIEST(expr, maxBytesPerString)`|Like `EARLIEST(expr)`, but for strings. The `maxBytesPerString` parameter determines how much aggregation space to allocate per string. Strings longer than this limit will be truncated. This parameter should be set as low as possible, since high values will lead to wasted memory.| -|`LATEST(expr)`|Returns the latest value of `expr`, which must be numeric. If `expr` comes from a relation with a timestamp column (like a Druid datasource) then "latest" is the value last encountered with the maximum overall timestamp of all values being aggregated. If `expr` does not come from a relation with a timestamp, then it is simply the last value encountered.| +|`LATEST(expr)`|Returns the latest non-null value of `expr`, which must be numeric. If `expr` comes from a relation with a timestamp column (like a Druid datasource) then "latest" is the value last encountered with the maximum overall timestamp of all values being aggregated. If `expr` does not come from a relation with a timestamp, then it is simply the last value encountered.| |`LATEST(expr, maxBytesPerString)`|Like `LATEST(expr)`, but for strings. The `maxBytesPerString` parameter determines how much aggregation space to allocate per string. Strings longer than this limit will be truncated. This parameter should be set as low as possible, since high values will lead to wasted memory.| For advice on choosing approximate aggregation functions, check out our [approximate aggregations documentation](aggregations.html#approx). diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestSqlAggregator.java b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestSqlAggregator.java index 347080f9bd86..0a1db7781d72 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestSqlAggregator.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestSqlAggregator.java @@ -79,7 +79,7 @@ AggregatorFactory createAggregatorFactory(String name, String fieldName, ValueTy case DOUBLE: return new DoubleFirstAggregatorFactory(name, fieldName); case STRING: - return new StringFirstAggregatorFactory(name, fieldName, maxStringBytes); + return new StringFirstAggregatorFactory(name, fieldName, maxStringBytes, false); default: throw new ISE("Cannot build aggregatorFactory for type[%s]", type); } @@ -98,7 +98,7 @@ AggregatorFactory createAggregatorFactory(String name, String fieldName, ValueTy case DOUBLE: return new DoubleLastAggregatorFactory(name, fieldName); case STRING: - return new StringLastAggregatorFactory(name, fieldName, maxStringBytes); + return new StringLastAggregatorFactory(name, fieldName, maxStringBytes, false); default: throw new ISE("Cannot build aggregatorFactory for type[%s]", type); } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index 39ebe8602c11..02f97e9366a2 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -921,10 +921,10 @@ public void testEarliestAggregators() throws Exception aggregators( new LongFirstAggregatorFactory("a0", "cnt"), new FloatFirstAggregatorFactory("a1", "m1"), - new StringFirstAggregatorFactory("a2", "dim1", 10), + new StringFirstAggregatorFactory("a2", "dim1", 10, false), new LongFirstAggregatorFactory("a3", "v0"), new FloatFirstAggregatorFactory("a4", "v1"), - new StringFirstAggregatorFactory("a5", "v2", 10) + new StringFirstAggregatorFactory("a5", "v2", 10, false) ) ) .context(TIMESERIES_CONTEXT_DEFAULT) @@ -961,10 +961,10 @@ public void testLatestAggregators() throws Exception aggregators( new LongLastAggregatorFactory("a0", "cnt"), new FloatLastAggregatorFactory("a1", "m1"), - new StringLastAggregatorFactory("a2", "dim1", 10), + new StringLastAggregatorFactory("a2", "dim1", 10, false), new LongLastAggregatorFactory("a3", "v0"), new FloatLastAggregatorFactory("a4", "v1"), - new StringLastAggregatorFactory("a5", "v2", 10) + new StringLastAggregatorFactory("a5", "v2", 10, false) ) ) .context(TIMESERIES_CONTEXT_DEFAULT) @@ -1008,7 +1008,7 @@ public void testLatestInSubquery() throws Exception .build() ), ImmutableList.of( - new Object[]{15.0} + new Object[]{NullHandling.sqlCompatible() ? 18.0 : 15.0} ) ); } From dbde1af1835aa7a2e771d2c148b76c9f7913b41b Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Thu, 7 Nov 2019 18:37:25 -0800 Subject: [PATCH 4/6] Adjust aggregator calls. --- .../aggregation/builtin/EarliestLatestSqlAggregator.java | 4 ++-- .../org/apache/druid/sql/calcite/CalciteQueryTest.java | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestSqlAggregator.java b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestSqlAggregator.java index 0a1db7781d72..347080f9bd86 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestSqlAggregator.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestSqlAggregator.java @@ -79,7 +79,7 @@ AggregatorFactory createAggregatorFactory(String name, String fieldName, ValueTy case DOUBLE: return new DoubleFirstAggregatorFactory(name, fieldName); case STRING: - return new StringFirstAggregatorFactory(name, fieldName, maxStringBytes, false); + return new StringFirstAggregatorFactory(name, fieldName, maxStringBytes); default: throw new ISE("Cannot build aggregatorFactory for type[%s]", type); } @@ -98,7 +98,7 @@ AggregatorFactory createAggregatorFactory(String name, String fieldName, ValueTy case DOUBLE: return new DoubleLastAggregatorFactory(name, fieldName); case STRING: - return new StringLastAggregatorFactory(name, fieldName, maxStringBytes, false); + return new StringLastAggregatorFactory(name, fieldName, maxStringBytes); default: throw new ISE("Cannot build aggregatorFactory for type[%s]", type); } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index 02f97e9366a2..a96951880490 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -921,10 +921,10 @@ public void testEarliestAggregators() throws Exception aggregators( new LongFirstAggregatorFactory("a0", "cnt"), new FloatFirstAggregatorFactory("a1", "m1"), - new StringFirstAggregatorFactory("a2", "dim1", 10, false), + new StringFirstAggregatorFactory("a2", "dim1", 10), new LongFirstAggregatorFactory("a3", "v0"), new FloatFirstAggregatorFactory("a4", "v1"), - new StringFirstAggregatorFactory("a5", "v2", 10, false) + new StringFirstAggregatorFactory("a5", "v2", 10) ) ) .context(TIMESERIES_CONTEXT_DEFAULT) @@ -961,10 +961,10 @@ public void testLatestAggregators() throws Exception aggregators( new LongLastAggregatorFactory("a0", "cnt"), new FloatLastAggregatorFactory("a1", "m1"), - new StringLastAggregatorFactory("a2", "dim1", 10, false), + new StringLastAggregatorFactory("a2", "dim1", 10), new LongLastAggregatorFactory("a3", "v0"), new FloatLastAggregatorFactory("a4", "v1"), - new StringLastAggregatorFactory("a5", "v2", 10, false) + new StringLastAggregatorFactory("a5", "v2", 10) ) ) .context(TIMESERIES_CONTEXT_DEFAULT) From de41b4f5c49c880cbe0b0b943f664f3a4ce65315 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Thu, 7 Nov 2019 18:58:54 -0800 Subject: [PATCH 5/6] Validations, test updates. --- .../aggregation/first/StringFirstAggregatorFactory.java | 6 ++++++ .../query/aggregation/last/StringLastAggregatorFactory.java | 6 ++++++ .../java/org/apache/druid/sql/calcite/CalciteQueryTest.java | 2 +- 3 files changed, 13 insertions(+), 1 deletion(-) diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregatorFactory.java index 983ed9b2202b..1b30bf716fe2 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregatorFactory.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.base.Preconditions; import com.google.common.primitives.Longs; +import org.apache.druid.java.util.common.IAE; import org.apache.druid.query.aggregation.AggregateCombiner; import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.AggregatorFactory; @@ -102,6 +103,11 @@ public StringFirstAggregatorFactory( { Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name"); Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName"); + + if (maxStringBytes != null && maxStringBytes < 0) { + throw new IAE("maxStringBytes must be greater than 0"); + } + this.name = name; this.fieldName = fieldName; this.maxStringBytes = maxStringBytes == null diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregatorFactory.java index b024af022dbd..9277d0529dd7 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregatorFactory.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import org.apache.druid.java.util.common.IAE; import org.apache.druid.query.aggregation.AggregateCombiner; import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.AggregatorFactory; @@ -58,6 +59,11 @@ public StringLastAggregatorFactory( { Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name"); Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName"); + + if (maxStringBytes != null && maxStringBytes < 0) { + throw new IAE("maxStringBytes must be greater than 0"); + } + this.name = name; this.fieldName = fieldName; this.maxStringBytes = maxStringBytes == null diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index a96951880490..251272b31959 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -931,7 +931,7 @@ public void testEarliestAggregators() throws Exception .build() ), ImmutableList.of( - new Object[]{1L, 1.0f, "", 2L, 2.0f, "1"} + new Object[]{1L, 1.0f, NullHandling.sqlCompatible() ? "" : "10.1", 2L, 2.0f, "1"} ) ); } From 3a94e8522c08407ea7e40aefbf669e33a1b4560f Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Fri, 8 Nov 2019 14:14:11 -0800 Subject: [PATCH 6/6] Review docs. --- .../sql/calcite/aggregation/Aggregations.java | 16 +++++++++++++++- .../sql/calcite/expression/Expressions.java | 2 +- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/Aggregations.java b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/Aggregations.java index d7f1b34bc8bf..d54b20caeb8c 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/Aggregations.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/Aggregations.java @@ -41,12 +41,26 @@ private Aggregations() // No instantiation. } + /** + * Get Druid expressions that correspond to "simple" aggregator inputs. This is used by standard sum/min/max + * aggregators, which have the following properties: + * + * 1) They can take direct field accesses or expressions as inputs. + * 2) They cannot implicitly cast strings to numbers when using a direct field access. + * + * @param plannerContext SQL planner context + * @param rowSignature input row signature + * @param call aggregate call object + * @param project project that should be applied before aggregation; may be null + * + * @return list of expressions corresponding to aggregator arguments, or null if any cannot be translated + */ @Nullable public static List getArgumentsForSimpleAggregator( final PlannerContext plannerContext, final RowSignature rowSignature, final AggregateCall call, - final Project project + @Nullable final Project project ) { final List args = call diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/expression/Expressions.java b/sql/src/main/java/org/apache/druid/sql/calcite/expression/Expressions.java index 4764cf414686..9b6ceaa6bffd 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/expression/Expressions.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/expression/Expressions.java @@ -88,7 +88,7 @@ private Expressions() */ public static RexNode fromFieldAccess( final RowSignature rowSignature, - final Project project, + @Nullable final Project project, final int fieldNumber ) {