From d38f6b258df602d4f8d52e57c7946ec951c30280 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Fri, 12 Jan 2024 02:57:45 +0530 Subject: [PATCH 1/8] changes --- docs/multi-stage-query/concepts.md | 4 +- docs/multi-stage-query/known-issues.md | 5 --- docs/querying/aggregations.md | 7 ++-- docs/querying/sql-aggregations.md | 4 +- .../query/aggregation/AggregatorFactory.java | 18 +++++++++ .../EarliestLatestAnySqlAggregator.java | 3 +- .../EarliestLatestBySqlAggregator.java | 38 +++++++++++++++++-- .../sql/calcite/CalciteJoinQueryTest.java | 6 --- .../druid/sql/calcite/CalciteQueryTest.java | 32 ++++++++-------- 9 files changed, 77 insertions(+), 40 deletions(-) diff --git a/docs/multi-stage-query/concepts.md b/docs/multi-stage-query/concepts.md index 7100e14d01cf..d9385061d79f 100644 --- a/docs/multi-stage-query/concepts.md +++ b/docs/multi-stage-query/concepts.md @@ -200,8 +200,8 @@ rollup-related metadata into the generated segments. Other applications can then queries](../querying/segmentmetadataquery.md) to retrieve rollup-related information. The following [aggregation functions](../querying/sql-aggregations.md) are supported for rollup at ingestion time: -`COUNT` (but switch to `SUM` at query time), `SUM`, `MIN`, `MAX`, `EARLIEST` and `EARLIEST_BY` ([string only](known-issues.md#select-statement)), -`LATEST` and `LATEST_BY` ([string only](known-issues.md#select-statement)), `APPROX_COUNT_DISTINCT`, `APPROX_COUNT_DISTINCT_BUILTIN`, +`COUNT` (but switch to `SUM` at query time), `SUM`, `MIN`, `MAX`, `EARLIEST` and `EARLIEST_BY`, +`LATEST` and `LATEST_BY`, `APPROX_COUNT_DISTINCT`, `APPROX_COUNT_DISTINCT_BUILTIN`, `APPROX_COUNT_DISTINCT_DS_HLL`, `APPROX_COUNT_DISTINCT_DS_THETA`, and `DS_QUANTILES_SKETCH` (but switch to `APPROX_QUANTILE_DS` at query time). Do not use `AVG`; instead, use `SUM` and `COUNT` at ingest time and compute the quotient at query time. diff --git a/docs/multi-stage-query/known-issues.md b/docs/multi-stage-query/known-issues.md index f4e97dc23dad..2a67dafb0f6a 100644 --- a/docs/multi-stage-query/known-issues.md +++ b/docs/multi-stage-query/known-issues.md @@ -42,11 +42,6 @@ an [UnknownError](./reference.md#error_UnknownError) with a message including "N - `GROUPING SETS` are not implemented. Queries using these features return a [QueryNotSupported](reference.md#error_QueryNotSupported) error. -- The numeric varieties of the `EARLIEST` and `LATEST` aggregators do not work properly. Attempting to use the numeric - varieties of these aggregators lead to an error like - `java.lang.ClassCastException: class java.lang.Double cannot be cast to class org.apache.druid.collections.SerializablePair`. - The string varieties, however, do work properly. - ## `INSERT` and `REPLACE` Statements - The `INSERT` and `REPLACE` statements with column lists, like `INSERT INTO tbl (a, b, c) SELECT ...`, is not implemented. diff --git a/docs/querying/aggregations.md b/docs/querying/aggregations.md index c7f798011973..8ef8287a9822 100644 --- a/docs/querying/aggregations.md +++ b/docs/querying/aggregations.md @@ -177,10 +177,9 @@ Example: The first and last aggregators determine the metric values that respectively correspond to the earliest and latest values of a time column. -Do not use first and last aggregators for the double, float, and long types in an ingestion spec. They are only supported for queries. -The string-typed aggregators, `stringFirst` and `stringLast`, are supported for both ingestion and querying. - -Queries with first or last aggregators on a segment created with rollup return the rolled up value, not the first or last value from the raw ingested data. +Queries with first or last aggregators on a segment created with rollup return the rolled up value, not the first or last value from the +raw ingested data. The `timeColumn` will get ignored in such cases, and the aggregation will use the original value of the time column +stored at the time the segment was created. #### Numeric first and last aggregators diff --git a/docs/querying/sql-aggregations.md b/docs/querying/sql-aggregations.md index b2df640a68f0..bb0998bf1fb0 100644 --- a/docs/querying/sql-aggregations.md +++ b/docs/querying/sql-aggregations.md @@ -87,9 +87,9 @@ In the aggregation functions supported by Druid, only `COUNT`, `ARRAY_AGG`, and |`STDDEV_SAMP(expr)`|Computes standard deviation sample of `expr`. See [stats extension](../development/extensions-core/stats.md) documentation for additional details.|`null` or `0` if `druid.generic.useDefaultValueForNull=true` (legacy mode)| |`STDDEV(expr)`|Computes standard deviation sample of `expr`. See [stats extension](../development/extensions-core/stats.md) documentation for additional details.|`null` or `0` if `druid.generic.useDefaultValueForNull=true` (legacy mode)| |`EARLIEST(expr, [maxBytesPerValue])`|Returns the earliest value of `expr`.
If `expr` comes from a relation with a timestamp column (like `__time` in a Druid datasource), the "earliest" is taken from the row with the overall earliest non-null value of the timestamp column.
If the earliest non-null value of the timestamp column appears in multiple rows, the `expr` may be taken from any of those rows. If `expr` does not come from a relation with a timestamp, then it is simply the first value encountered.

If `expr` is a string or complex type `maxBytesPerValue` amount of space is allocated for the aggregation. Strings longer than this limit are truncated. The `maxBytesPerValue` parameter should be set as low as possible, since high values will lead to wasted memory.
If `maxBytesPerValue`is omitted; it defaults to `1024`. |`null` or `0`/`''` if `druid.generic.useDefaultValueForNull=true` (legacy mode)| -|`EARLIEST_BY(expr, timestampExpr, [maxBytesPerValue])`|Returns the earliest value of `expr`.
The earliest value of `expr` is taken from the row with the overall earliest non-null value of `timestampExpr`.
If the earliest non-null value of `timestampExpr` appears in multiple rows, the `expr` may be taken from any of those rows.

If `expr` is a string or complex type `maxBytesPerValue` amount of space is allocated for the aggregation. Strings longer than this limit are truncated. The `maxBytesPerValue` parameter should be set as low as possible, since high values will lead to wasted memory.
If `maxBytesPerValue`is omitted; it defaults to `1024`. |`null` or `0`/`''` if `druid.generic.useDefaultValueForNull=true` (legacy mode)| +|`EARLIEST_BY(expr, timestampExpr, [maxBytesPerValue])`|Returns the earliest value of `expr`.
The earliest value of `expr` is taken from the row with the overall earliest non-null value of `timestampExpr`.
If the earliest non-null value of `timestampExpr` appears in multiple rows, the `expr` may be taken from any of those rows.

If `expr` is a string or complex type `maxBytesPerValue` amount of space is allocated for the aggregation. Strings longer than this limit are truncated. The `maxBytesPerValue` parameter should be set as low as possible, since high values will lead to wasted memory.
If `maxBytesPerValue`is omitted; it defaults to `1024`. It cannot be used with the rollup tables created with any variant of the EARLIEST/LATEST/EARLIEST_BY/LATEST_BY aggregator since the intermediate type will already contain the timestamp and the one passed explicitly will get ignored by the native engine. USE `EARLIEST` in such scenario. |`null` or `0`/`''` if `druid.generic.useDefaultValueForNull=true` (legacy mode)| |`LATEST(expr, [maxBytesPerValue])`|Returns the latest value of `expr`
The `expr` must come from a relation with a timestamp column (like `__time` in a Druid datasource) and the "latest" is taken from the row with the overall latest non-null value of the timestamp column.
If the latest non-null value of the timestamp column appears in multiple rows, the `expr` may be taken from any of those rows.

If `expr` is a string or complex type `maxBytesPerValue` amount of space is allocated for the aggregation. Strings longer than this limit are truncated. The `maxBytesPerValue` parameter should be set as low as possible, since high values will lead to wasted memory.
If `maxBytesPerValue`is omitted; it defaults to `1024`. |`null` or `0`/`''` if `druid.generic.useDefaultValueForNull=true` (legacy mode)| -|`LATEST_BY(expr, timestampExpr, [maxBytesPerValue])`|Returns the latest value of `expr`.
The latest value of `expr` is taken from the row with the overall latest non-null value of `timestampExpr`.
If the overall latest non-null value of `timestampExpr` appears in multiple rows, the `expr` may be taken from any of those rows.

If `expr` is a string or complex type `maxBytesPerValue` amount of space is allocated for the aggregation. Strings longer than this limit are truncated. The `maxBytesPerValue` parameter should be set as low as possible, since high values will lead to wasted memory.
If `maxBytesPerValue`is omitted; it defaults to `1024`. |`null` or `0`/`''` if `druid.generic.useDefaultValueForNull=true` (legacy mode)| +|`LATEST_BY(expr, timestampExpr, [maxBytesPerValue])`|Returns the latest value of `expr`.
The latest value of `expr` is taken from the row with the overall latest non-null value of `timestampExpr`.
If the overall latest non-null value of `timestampExpr` appears in multiple rows, the `expr` may be taken from any of those rows.

If `expr` is a string or complex type `maxBytesPerValue` amount of space is allocated for the aggregation. Strings longer than this limit are truncated. The `maxBytesPerValue` parameter should be set as low as possible, since high values will lead to wasted memory.
If `maxBytesPerValue`is omitted; it defaults to `1024`. It cannot be used with the rollup tables created with any variant of the EARLIEST/LATEST/EARLIEST_BY/LATEST_BY aggregator since the intermediate type will already contain the timestamp and the one passed explicitly will get ignored by the native engine. Use `LATEST` in such scenario. |`null` or `0`/`''` if `druid.generic.useDefaultValueForNull=true` (legacy mode)| |`ANY_VALUE(expr, [maxBytesPerValue, [aggregateMultipleValues]])`|Returns any value of `expr` including null. This aggregator can simplify and optimize the performance by returning the first encountered value (including `null`).

If `expr` is a string or complex type `maxBytesPerValue` amount of space is allocated for the aggregation. Strings longer than this limit are truncated. The `maxBytesPerValue` parameter should be set as low as possible, since high values will lead to wasted memory.
If `maxBytesPerValue` is omitted; it defaults to `1024`. `aggregateMultipleValues` is an optional boolean flag controls the behavior of aggregating a [multi-value dimension](./multi-value-dimensions.md). `aggregateMultipleValues` is set as true by default and returns the stringified array in case of a multi-value dimension. By setting it to false, function will return first value instead. |`null` or `0`/`''` if `druid.generic.useDefaultValueForNull=true` (legacy mode)| |`GROUPING(expr, expr...)`|Returns a number to indicate which groupBy dimension is included in a row, when using `GROUPING SETS`. Refer to [additional documentation](aggregations.md#grouping-aggregator) on how to infer this number.|N/A| |`ARRAY_AGG(expr, [size])`|Collects all values of `expr` into an ARRAY, including null values, with `size` in bytes limit on aggregation size (default of 1024 bytes). If the aggregated array grows larger than the maximum size in bytes, the query will fail. Use of `ORDER BY` within the `ARRAY_AGG` expression is not currently supported, and the ordering of results within the output array may vary depending on processing order.|`null`| diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorFactory.java index 8567c4b6d6c6..9b86244d672c 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorFactory.java @@ -39,6 +39,24 @@ import java.util.Map; /** + * + * SELECT dim1, SUM(m1) FROM foo GROUP BY dim1 + * + * { + * dimensions: dim1 + * metrics: a0 + * aggregators: SumAggregatorFactory() + * } + * + * a 1 + * a 3 + * b 2 + * a 5 + * b 5 + * a-> SumAggregator (1 + 3 + * b -> SumAggregator (2 + * + * * AggregatorFactory is a strategy (in the terms of Design Patterns) that represents column aggregation, e.g. min, * max, sum of metric columns, or cardinality of dimension columns (see {@link * org.apache.druid.query.aggregation.cardinality.CardinalityAggregatorFactory}). diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java index 66bbdf8a49bf..d16ffe441ed6 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java @@ -229,7 +229,7 @@ public Aggregation toDruidAggregation( ); } - final String fieldName = getColumnName(plannerContext, virtualColumnRegistry, args.get(0), rexNodes.get(0)); + final String fieldName = getColumnName(virtualColumnRegistry, args.get(0), rexNodes.get(0)); if (!inputAccessor.getInputRowSignature().contains(ColumnHolder.TIME_COLUMN_NAME) && (aggregatorType == AggregatorType.LATEST || aggregatorType == AggregatorType.EARLIEST)) { @@ -291,7 +291,6 @@ public Aggregation toDruidAggregation( } static String getColumnName( - PlannerContext plannerContext, VirtualColumnRegistry virtualColumnRegistry, DruidExpression arg, RexNode rexNode diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestBySqlAggregator.java b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestBySqlAggregator.java index fac88d853e11..837758094843 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestBySqlAggregator.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestBySqlAggregator.java @@ -20,6 +20,7 @@ package org.apache.druid.sql.calcite.aggregation.builtin; import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rex.RexLiteral; import org.apache.calcite.rex.RexNode; import org.apache.calcite.sql.SqlAggFunction; @@ -33,6 +34,10 @@ import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.SerializablePairLongDoubleComplexMetricSerde; +import org.apache.druid.query.aggregation.SerializablePairLongFloatComplexMetricSerde; +import org.apache.druid.query.aggregation.SerializablePairLongLongComplexMetricSerde; +import org.apache.druid.query.aggregation.SerializablePairLongStringComplexMetricSerde; import org.apache.druid.query.aggregation.post.FinalizingFieldAccessPostAggregator; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.sql.calcite.aggregation.Aggregation; @@ -44,6 +49,7 @@ import org.apache.druid.sql.calcite.planner.PlannerContext; import org.apache.druid.sql.calcite.rel.InputAccessor; import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry; +import org.apache.druid.sql.calcite.table.RowSignatures; import javax.annotation.Nullable; import java.util.Collections; @@ -100,7 +106,6 @@ public Aggregation toDruidAggregation( } final String fieldName = EarliestLatestAnySqlAggregator.getColumnName( - plannerContext, virtualColumnRegistry, args.get(0), rexNodes.get(0) @@ -109,11 +114,13 @@ public Aggregation toDruidAggregation( final AggregatorFactory theAggFactory; switch (args.size()) { case 2: + if (isMetricPreAggregated(plannerContext, rexNodes.get(0), function.getName())) { + return null; + } theAggFactory = aggregatorType.createAggregatorFactory( aggregatorName, fieldName, EarliestLatestAnySqlAggregator.getColumnName( - plannerContext, virtualColumnRegistry, args.get(1), rexNodes.get(1) @@ -136,11 +143,13 @@ public Aggregation toDruidAggregation( ); return null; } + if (isMetricPreAggregated(plannerContext, rexNodes.get(0), function.getName())) { + return null; + } theAggFactory = aggregatorType.createAggregatorFactory( aggregatorName, fieldName, EarliestLatestAnySqlAggregator.getColumnName( - plannerContext, virtualColumnRegistry, args.get(1), rexNodes.get(1) @@ -165,6 +174,29 @@ public Aggregation toDruidAggregation( ); } + private static boolean isMetricPreAggregated(PlannerContext plannerContext, RexNode rexNode, String byAggregator) + { + final RelDataType type = rexNode.getType(); + if (type instanceof RowSignatures.ComplexSqlType) { + String complexColumnTypeName = ((RowSignatures.ComplexSqlType) type).getColumnType().getComplexTypeName(); + if ((SerializablePairLongLongComplexMetricSerde.TYPE_NAME.equals(complexColumnTypeName) + || SerializablePairLongFloatComplexMetricSerde.TYPE_NAME.equals(complexColumnTypeName) + || SerializablePairLongDoubleComplexMetricSerde.TYPE_NAME.equals(complexColumnTypeName) + || SerializablePairLongStringComplexMetricSerde.TYPE_NAME.equals(complexColumnTypeName))) { + plannerContext.setPlanningError( + "Cannot call %s with an explicit 'timeExpr' column for pre-aggregated metric of type [%s]. Use %s instead " + + "to further rollup the complex column.", + byAggregator, + complexColumnTypeName, + byAggregator.substring(0, byAggregator.length() - 3) + ); + return true; + } + return false; + } + return false; + } + private static class EarliestByLatestBySqlAggFunction extends SqlAggFunction { private static final SqlReturnTypeInference EARLIEST_LATEST_ARG0_RETURN_TYPE_INFERENCE = diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java index c43e298d847f..7949badff5d3 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java @@ -861,8 +861,6 @@ public void testFilterAndGroupByLookupUsingJoinOperatorWithNotFilter(Map queryContext) { - // MSQ does not support UNION ALL. - msqIncompatible(); // Cannot vectorize JOIN operator. cannotVectorize(); @@ -3841,10 +3839,6 @@ public void testJoinWithExplicitIsNotDistinctFromCondition(Map q @Parameters(source = QueryContextForJoinProvider.class) public void testInnerJoinSubqueryWithSelectorFilter(Map queryContext) { - if (sortBasedJoin) { - // Cannot handle the [l1.k = 'abc'] condition. - msqIncompatible(); - } // Cannot vectorize due to 'concat' expression. cannotVectorize(); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index b0e607b61a00..cce50e13553f 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -638,8 +638,6 @@ public void testGroupBySingleColumnDescendingNoTopN() @Test public void testEarliestAggregators() { - msqIncompatible(); - testQuery( "SELECT " + "EARLIEST(cnt), EARLIEST(m1), EARLIEST(dim1, 10), EARLIEST(dim1, CAST(10 AS INTEGER)), " @@ -1107,6 +1105,20 @@ public void testNumericLatestEarliestGroupBy() ); } + @Test + public void testLatestByEarliestByWithRollupColumn() + { + assertQueryIsUnplannable( + "SELECT EARLIEST_BY(long_last_added, __time) FROM wikipedia_first_last", + "Cannot call EARLIEST_BY with an explicit 'timeExpr' column for pre-aggregated metric of type [serializablePairLongLong]. Use EARLIEST instead to further rollup the complex column." + ); + assertQueryIsUnplannable( + "SELECT LATEST_BY(long_last_added, __time) FROM wikipedia_first_last", + "Cannot call LATEST_BY with an explicit 'timeExpr' column for pre-aggregated metric of type [serializablePairLongLong]. Use LATEST instead to further rollup the complex column." + ); + } + + @Test public void testNumericLatestEarliestWithOpratorsGroupBy() { @@ -1200,8 +1212,6 @@ public void testStringLatestByGroupByWithAlwaysFalseCondition() @Test public void testPrimitiveEarliestInSubquery() { - msqIncompatible(); - testQuery( "SELECT SUM(val1), SUM(val2), SUM(val3) FROM (SELECT dim2, EARLIEST(m1) AS val1, EARLIEST(cnt) AS val2, EARLIEST(m2) AS val3 FROM foo GROUP BY dim2)", ImmutableList.of( @@ -1408,7 +1418,6 @@ public void testPrimitiveAnyInSubquery() @Test public void testStringEarliestSingleStringDim() { - msqIncompatible(); testQuery( "SELECT dim2, EARLIEST(dim1,10) AS val FROM foo GROUP BY dim2", ImmutableList.of( @@ -1524,8 +1533,6 @@ public void testStringAnyInSubquery() @Test public void testEarliestAggregatorsNumericNulls() { - msqIncompatible(); - testQuery( "SELECT EARLIEST(l1), EARLIEST(d1), EARLIEST(f1) FROM druid.numfoo", ImmutableList.of( @@ -1583,8 +1590,6 @@ public void testLatestAggregatorsNumericNull() @Test public void testFirstLatestAggregatorsSkipNulls() { - msqIncompatible(); - final DimFilter filter; if (useDefault) { filter = notNull("dim1"); @@ -1697,8 +1702,6 @@ public void testAnyAggregatorsSkipNullsWithFilter() @Test public void testOrderByEarliestFloat() { - msqIncompatible(); - List expected; if (NullHandling.replaceWithDefault()) { expected = ImmutableList.of( @@ -1744,8 +1747,6 @@ public void testOrderByEarliestFloat() @Test public void testOrderByEarliestDouble() { - msqIncompatible(); - List expected; if (NullHandling.replaceWithDefault()) { expected = ImmutableList.of( @@ -1791,8 +1792,6 @@ public void testOrderByEarliestDouble() @Test public void testOrderByEarliestLong() { - msqIncompatible(); - List expected; if (NullHandling.replaceWithDefault()) { expected = ImmutableList.of( @@ -9660,7 +9659,9 @@ public void testTimeseriesEmptyResultsAggregatorDefaultValues() @Test public void testTimeseriesEmptyResultsAggregatorDefaultValuesNonVectorized() { + // Empty-dataset aggregation queries in MSQ return an empty row, rather than a single row as SQL requires. msqIncompatible(); + cannotVectorize(); skipVectorize(); // timeseries with all granularity have a single group, so should return default results for given aggregators @@ -9976,7 +9977,6 @@ public void testGroupByAggregatorDefaultValues() @Test public void testGroupByAggregatorDefaultValuesNonVectorized() { - msqIncompatible(); cannotVectorize(); skipVectorize(); testQuery( From 6b1190c5b807fa243d2eadda09b9a70fbc735872 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Wed, 17 Jan 2024 11:13:46 +0530 Subject: [PATCH 2/8] fix tests --- .../EarliestLatestAnySqlAggregator.java | 4 +- .../EarliestLatestBySqlAggregator.java | 89 +++++++++++++++---- .../druid/sql/calcite/CalciteQueryTest.java | 4 +- 3 files changed, 75 insertions(+), 22 deletions(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java index d16ffe441ed6..b47985ea95a9 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java @@ -359,7 +359,9 @@ public TimeColIdentifer() @Override public R accept(SqlVisitor visitor) { - + // We overridde the "accept()" method, because the __time column's presence is determined when Calcite is converting + // the identifiers to the fully qualified column names with prefixes. This is where the validation exception can + // trigger try { return super.accept(visitor); } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestBySqlAggregator.java b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestBySqlAggregator.java index 837758094843..c445bb17f5c4 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestBySqlAggregator.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestBySqlAggregator.java @@ -39,6 +39,7 @@ import org.apache.druid.query.aggregation.SerializablePairLongLongComplexMetricSerde; import org.apache.druid.query.aggregation.SerializablePairLongStringComplexMetricSerde; import org.apache.druid.query.aggregation.post.FinalizingFieldAccessPostAggregator; +import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.sql.calcite.aggregation.Aggregation; import org.apache.druid.sql.calcite.aggregation.SqlAggregator; @@ -63,7 +64,9 @@ public class EarliestLatestBySqlAggregator implements SqlAggregator private final EarliestLatestAnySqlAggregator.AggregatorType aggregatorType; private final SqlAggFunction function; - private EarliestLatestBySqlAggregator(final EarliestLatestAnySqlAggregator.AggregatorType aggregatorType) + private EarliestLatestBySqlAggregator( + final EarliestLatestAnySqlAggregator.AggregatorType aggregatorType + ) { this.aggregatorType = aggregatorType; this.function = new EarliestByLatestBySqlAggFunction(aggregatorType); @@ -114,7 +117,12 @@ public Aggregation toDruidAggregation( final AggregatorFactory theAggFactory; switch (args.size()) { case 2: - if (isMetricPreAggregated(plannerContext, rexNodes.get(0), function.getName())) { + if (passedExplicitTimeColumnToPreAggregatedMetric( + plannerContext, + args.get(1), + rexNodes.get(0), + function.getName() + )) { return null; } theAggFactory = aggregatorType.createAggregatorFactory( @@ -143,7 +151,12 @@ public Aggregation toDruidAggregation( ); return null; } - if (isMetricPreAggregated(plannerContext, rexNodes.get(0), function.getName())) { + if (passedExplicitTimeColumnToPreAggregatedMetric( + plannerContext, + args.get(1), + rexNodes.get(0), + function.getName() + )) { return null; } theAggFactory = aggregatorType.createAggregatorFactory( @@ -174,25 +187,63 @@ public Aggregation toDruidAggregation( ); } - private static boolean isMetricPreAggregated(PlannerContext plannerContext, RexNode rexNode, String byAggregator) + /** + * Returns true if we have passed a time column to a pre-aggregated first/last metric. It is used to detect calls of + * type EARLIEST_BY(firstPreAggregated, customTimestampExpr). This is disallowed, and the caller should instead do + * EARLIEST(firstPreAggregated) to further rollup the column. The time information will get ignored in the native + * layer, therefore the SQL prevents the users from making such calls. + * This also sets a planning error with the appropriate message, and the caller should either error out or refuse + * aggregation if this is true. + */ + private static boolean passedExplicitTimeColumnToPreAggregatedMetric( + PlannerContext plannerContext, + DruidExpression timeColumnDruidExpression, + RexNode metricRexNode, + String aggregatorName + ) { - final RelDataType type = rexNode.getType(); + if (isTimeArgModified(timeColumnDruidExpression) && isMetricPreAggregated(metricRexNode)) { + final RelDataType type = metricRexNode.getType(); + String complexColumnTypeName = ((RowSignatures.ComplexSqlType) type).getColumnType().getComplexTypeName(); + plannerContext.setPlanningError( + "Cannot call %s with an explicit 'timeExpr' column for pre-aggregated metric of type [%s]. Use %s instead " + + "to further rollup the complex column.", + aggregatorName, + complexColumnTypeName, + aggregatorName.substring(0, aggregatorName.length() - 3) + ); + return true; + } + return false; + } + + /** + * Checks whether the given DruidExpression refers a direct column named __time. It's not the perfect check to + * figure out if EARLIEST/LATEST or EARLIEST_BY/LATEST_BY have been used, because this would return true if + * EARLIEST_BY(metric, __time) has been used, which is incorrect. However, it's impossible to figure out which call + * has been made, since EARLIEST gets rewritten to EARLIEST_BY multiple times within Calcite, using the same + * function call as EARLIEST_BY's - which means that we can't add a boolean flag within the SQL function, that + * distinguises between the EARLIEST that has been rewritten, and the EARLIEST_BY which hasn't been rewritten, because + * Calcite loses that information + */ + private static boolean isTimeArgModified(DruidExpression druidExpression) + { + return !DruidExpression.ofColumn(ColumnType.LONG, ColumnHolder.TIME_COLUMN_NAME).equals(druidExpression); + } + + /** + * Returns true if the metric passed to the function has already been pre aggregated using one of the first/last + * aggregators. + */ + private static boolean isMetricPreAggregated(RexNode metricRexNode) + { + final RelDataType type = metricRexNode.getType(); if (type instanceof RowSignatures.ComplexSqlType) { String complexColumnTypeName = ((RowSignatures.ComplexSqlType) type).getColumnType().getComplexTypeName(); - if ((SerializablePairLongLongComplexMetricSerde.TYPE_NAME.equals(complexColumnTypeName) - || SerializablePairLongFloatComplexMetricSerde.TYPE_NAME.equals(complexColumnTypeName) - || SerializablePairLongDoubleComplexMetricSerde.TYPE_NAME.equals(complexColumnTypeName) - || SerializablePairLongStringComplexMetricSerde.TYPE_NAME.equals(complexColumnTypeName))) { - plannerContext.setPlanningError( - "Cannot call %s with an explicit 'timeExpr' column for pre-aggregated metric of type [%s]. Use %s instead " - + "to further rollup the complex column.", - byAggregator, - complexColumnTypeName, - byAggregator.substring(0, byAggregator.length() - 3) - ); - return true; - } - return false; + return SerializablePairLongLongComplexMetricSerde.TYPE_NAME.equals(complexColumnTypeName) + || SerializablePairLongFloatComplexMetricSerde.TYPE_NAME.equals(complexColumnTypeName) + || SerializablePairLongDoubleComplexMetricSerde.TYPE_NAME.equals(complexColumnTypeName) + || SerializablePairLongStringComplexMetricSerde.TYPE_NAME.equals(complexColumnTypeName); } return false; } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index cce50e13553f..8fd73d4da18c 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -1109,11 +1109,11 @@ public void testNumericLatestEarliestGroupBy() public void testLatestByEarliestByWithRollupColumn() { assertQueryIsUnplannable( - "SELECT EARLIEST_BY(long_last_added, __time) FROM wikipedia_first_last", + "SELECT EARLIEST_BY(long_last_added, TIME_PARSE('2000-01-01')) FROM wikipedia_first_last", "Cannot call EARLIEST_BY with an explicit 'timeExpr' column for pre-aggregated metric of type [serializablePairLongLong]. Use EARLIEST instead to further rollup the complex column." ); assertQueryIsUnplannable( - "SELECT LATEST_BY(long_last_added, __time) FROM wikipedia_first_last", + "SELECT LATEST_BY(long_last_added, TIME_PARSE('2000-01-01')) FROM wikipedia_first_last", "Cannot call LATEST_BY with an explicit 'timeExpr' column for pre-aggregated metric of type [serializablePairLongLong]. Use LATEST instead to further rollup the complex column." ); } From c762ff60522854f950e00f3a36093a381b29e089 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Wed, 31 Jan 2024 15:13:21 +0530 Subject: [PATCH 3/8] revert some changes --- .../EarliestLatestBySqlAggregator.java | 91 +------------------ .../sql/calcite/CalciteJoinQueryTest.java | 18 ++-- .../druid/sql/calcite/CalciteQueryTest.java | 14 --- 3 files changed, 16 insertions(+), 107 deletions(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestBySqlAggregator.java b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestBySqlAggregator.java index c445bb17f5c4..fac88d853e11 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestBySqlAggregator.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestBySqlAggregator.java @@ -20,7 +20,6 @@ package org.apache.druid.sql.calcite.aggregation.builtin; import org.apache.calcite.rel.core.AggregateCall; -import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rex.RexLiteral; import org.apache.calcite.rex.RexNode; import org.apache.calcite.sql.SqlAggFunction; @@ -34,12 +33,7 @@ import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.query.aggregation.AggregatorFactory; -import org.apache.druid.query.aggregation.SerializablePairLongDoubleComplexMetricSerde; -import org.apache.druid.query.aggregation.SerializablePairLongFloatComplexMetricSerde; -import org.apache.druid.query.aggregation.SerializablePairLongLongComplexMetricSerde; -import org.apache.druid.query.aggregation.SerializablePairLongStringComplexMetricSerde; import org.apache.druid.query.aggregation.post.FinalizingFieldAccessPostAggregator; -import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.sql.calcite.aggregation.Aggregation; import org.apache.druid.sql.calcite.aggregation.SqlAggregator; @@ -50,7 +44,6 @@ import org.apache.druid.sql.calcite.planner.PlannerContext; import org.apache.druid.sql.calcite.rel.InputAccessor; import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry; -import org.apache.druid.sql.calcite.table.RowSignatures; import javax.annotation.Nullable; import java.util.Collections; @@ -64,9 +57,7 @@ public class EarliestLatestBySqlAggregator implements SqlAggregator private final EarliestLatestAnySqlAggregator.AggregatorType aggregatorType; private final SqlAggFunction function; - private EarliestLatestBySqlAggregator( - final EarliestLatestAnySqlAggregator.AggregatorType aggregatorType - ) + private EarliestLatestBySqlAggregator(final EarliestLatestAnySqlAggregator.AggregatorType aggregatorType) { this.aggregatorType = aggregatorType; this.function = new EarliestByLatestBySqlAggFunction(aggregatorType); @@ -109,6 +100,7 @@ public Aggregation toDruidAggregation( } final String fieldName = EarliestLatestAnySqlAggregator.getColumnName( + plannerContext, virtualColumnRegistry, args.get(0), rexNodes.get(0) @@ -117,18 +109,11 @@ public Aggregation toDruidAggregation( final AggregatorFactory theAggFactory; switch (args.size()) { case 2: - if (passedExplicitTimeColumnToPreAggregatedMetric( - plannerContext, - args.get(1), - rexNodes.get(0), - function.getName() - )) { - return null; - } theAggFactory = aggregatorType.createAggregatorFactory( aggregatorName, fieldName, EarliestLatestAnySqlAggregator.getColumnName( + plannerContext, virtualColumnRegistry, args.get(1), rexNodes.get(1) @@ -151,18 +136,11 @@ public Aggregation toDruidAggregation( ); return null; } - if (passedExplicitTimeColumnToPreAggregatedMetric( - plannerContext, - args.get(1), - rexNodes.get(0), - function.getName() - )) { - return null; - } theAggFactory = aggregatorType.createAggregatorFactory( aggregatorName, fieldName, EarliestLatestAnySqlAggregator.getColumnName( + plannerContext, virtualColumnRegistry, args.get(1), rexNodes.get(1) @@ -187,67 +165,6 @@ public Aggregation toDruidAggregation( ); } - /** - * Returns true if we have passed a time column to a pre-aggregated first/last metric. It is used to detect calls of - * type EARLIEST_BY(firstPreAggregated, customTimestampExpr). This is disallowed, and the caller should instead do - * EARLIEST(firstPreAggregated) to further rollup the column. The time information will get ignored in the native - * layer, therefore the SQL prevents the users from making such calls. - * This also sets a planning error with the appropriate message, and the caller should either error out or refuse - * aggregation if this is true. - */ - private static boolean passedExplicitTimeColumnToPreAggregatedMetric( - PlannerContext plannerContext, - DruidExpression timeColumnDruidExpression, - RexNode metricRexNode, - String aggregatorName - ) - { - if (isTimeArgModified(timeColumnDruidExpression) && isMetricPreAggregated(metricRexNode)) { - final RelDataType type = metricRexNode.getType(); - String complexColumnTypeName = ((RowSignatures.ComplexSqlType) type).getColumnType().getComplexTypeName(); - plannerContext.setPlanningError( - "Cannot call %s with an explicit 'timeExpr' column for pre-aggregated metric of type [%s]. Use %s instead " - + "to further rollup the complex column.", - aggregatorName, - complexColumnTypeName, - aggregatorName.substring(0, aggregatorName.length() - 3) - ); - return true; - } - return false; - } - - /** - * Checks whether the given DruidExpression refers a direct column named __time. It's not the perfect check to - * figure out if EARLIEST/LATEST or EARLIEST_BY/LATEST_BY have been used, because this would return true if - * EARLIEST_BY(metric, __time) has been used, which is incorrect. However, it's impossible to figure out which call - * has been made, since EARLIEST gets rewritten to EARLIEST_BY multiple times within Calcite, using the same - * function call as EARLIEST_BY's - which means that we can't add a boolean flag within the SQL function, that - * distinguises between the EARLIEST that has been rewritten, and the EARLIEST_BY which hasn't been rewritten, because - * Calcite loses that information - */ - private static boolean isTimeArgModified(DruidExpression druidExpression) - { - return !DruidExpression.ofColumn(ColumnType.LONG, ColumnHolder.TIME_COLUMN_NAME).equals(druidExpression); - } - - /** - * Returns true if the metric passed to the function has already been pre aggregated using one of the first/last - * aggregators. - */ - private static boolean isMetricPreAggregated(RexNode metricRexNode) - { - final RelDataType type = metricRexNode.getType(); - if (type instanceof RowSignatures.ComplexSqlType) { - String complexColumnTypeName = ((RowSignatures.ComplexSqlType) type).getColumnType().getComplexTypeName(); - return SerializablePairLongLongComplexMetricSerde.TYPE_NAME.equals(complexColumnTypeName) - || SerializablePairLongFloatComplexMetricSerde.TYPE_NAME.equals(complexColumnTypeName) - || SerializablePairLongDoubleComplexMetricSerde.TYPE_NAME.equals(complexColumnTypeName) - || SerializablePairLongStringComplexMetricSerde.TYPE_NAME.equals(complexColumnTypeName); - } - return false; - } - private static class EarliestByLatestBySqlAggFunction extends SqlAggFunction { private static final SqlReturnTypeInference EARLIEST_LATEST_ARG0_RETURN_TYPE_INFERENCE = diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java index 7949badff5d3..5420be74e7f2 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java @@ -861,6 +861,8 @@ public void testFilterAndGroupByLookupUsingJoinOperatorWithNotFilter(Map queryContext) { + // MSQ does not support UNION ALL. + msqIncompatible(); // Cannot vectorize JOIN operator. cannotVectorize(); @@ -1560,8 +1562,8 @@ public void testInnerJoinQueryOfLookup(Map queryContext) .build() ), ImmutableList.of( - new Object[]{"", "a", "xa", "xa"}, - new Object[]{"1", "a", "xa", "xa"} + new Object[]{"", "a", "xabc", "xabc"}, + new Object[]{"1", "a", "xabc", "xabc"} ) ); } @@ -2515,9 +2517,9 @@ public void testSelectOnLookupUsingRightJoinOperator(Map queryCo ), ImmutableList.of( new Object[]{"abc", "abc", "xabc"}, + new Object[]{NULL_STRING, "6", "x6"}, new Object[]{NULL_STRING, "a", "xa"}, - new Object[]{NULL_STRING, "nosuchkey", "mysteryvalue"}, - new Object[]{NULL_STRING, "6", "x6"} + new Object[]{NULL_STRING, "nosuchkey", "mysteryvalue"} ) ); } @@ -2563,9 +2565,9 @@ public void testSelectOnLookupUsingFullJoinOperator(Map queryCon new Object[]{"1", 4f, 1L, NULL_STRING, NULL_STRING}, new Object[]{"def", 5f, 1L, NULL_STRING, NULL_STRING}, new Object[]{"abc", 6f, 1L, "abc", "xabc"}, + new Object[]{NULL_STRING, NULL_FLOAT, NULL_LONG, "6", "x6"}, new Object[]{NULL_STRING, NULL_FLOAT, NULL_LONG, "a", "xa"}, - new Object[]{NULL_STRING, NULL_FLOAT, NULL_LONG, "nosuchkey", "mysteryvalue"}, - new Object[]{NULL_STRING, NULL_FLOAT, NULL_LONG, "6", "x6"} + new Object[]{NULL_STRING, NULL_FLOAT, NULL_LONG, "nosuchkey", "mysteryvalue"} ) ); } @@ -3839,6 +3841,10 @@ public void testJoinWithExplicitIsNotDistinctFromCondition(Map q @Parameters(source = QueryContextForJoinProvider.class) public void testInnerJoinSubqueryWithSelectorFilter(Map queryContext) { + if (sortBasedJoin) { + // Cannot handle the [l1.k = 'abc'] condition. + msqIncompatible(); + } // Cannot vectorize due to 'concat' expression. cannotVectorize(); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index 8fd73d4da18c..a617a9461943 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -1105,20 +1105,6 @@ public void testNumericLatestEarliestGroupBy() ); } - @Test - public void testLatestByEarliestByWithRollupColumn() - { - assertQueryIsUnplannable( - "SELECT EARLIEST_BY(long_last_added, TIME_PARSE('2000-01-01')) FROM wikipedia_first_last", - "Cannot call EARLIEST_BY with an explicit 'timeExpr' column for pre-aggregated metric of type [serializablePairLongLong]. Use EARLIEST instead to further rollup the complex column." - ); - assertQueryIsUnplannable( - "SELECT LATEST_BY(long_last_added, TIME_PARSE('2000-01-01')) FROM wikipedia_first_last", - "Cannot call LATEST_BY with an explicit 'timeExpr' column for pre-aggregated metric of type [serializablePairLongLong]. Use LATEST instead to further rollup the complex column." - ); - } - - @Test public void testNumericLatestEarliestWithOpratorsGroupBy() { From 0690bb69e76ac6ac77533645a239abbd7d6565b7 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Wed, 31 Jan 2024 15:31:55 +0530 Subject: [PATCH 4/8] revert some changes --- .../builtin/EarliestLatestAnySqlAggregator.java | 2 +- .../druid/sql/calcite/CalciteJoinQueryTest.java | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java index b47985ea95a9..507eb9fd3320 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java @@ -229,7 +229,7 @@ public Aggregation toDruidAggregation( ); } - final String fieldName = getColumnName(virtualColumnRegistry, args.get(0), rexNodes.get(0)); + final String fieldName = getColumnName(plannerContext, virtualColumnRegistry, args.get(0), rexNodes.get(0)); if (!inputAccessor.getInputRowSignature().contains(ColumnHolder.TIME_COLUMN_NAME) && (aggregatorType == AggregatorType.LATEST || aggregatorType == AggregatorType.EARLIEST)) { diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java index 5420be74e7f2..c43e298d847f 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java @@ -1562,8 +1562,8 @@ public void testInnerJoinQueryOfLookup(Map queryContext) .build() ), ImmutableList.of( - new Object[]{"", "a", "xabc", "xabc"}, - new Object[]{"1", "a", "xabc", "xabc"} + new Object[]{"", "a", "xa", "xa"}, + new Object[]{"1", "a", "xa", "xa"} ) ); } @@ -2517,9 +2517,9 @@ public void testSelectOnLookupUsingRightJoinOperator(Map queryCo ), ImmutableList.of( new Object[]{"abc", "abc", "xabc"}, - new Object[]{NULL_STRING, "6", "x6"}, new Object[]{NULL_STRING, "a", "xa"}, - new Object[]{NULL_STRING, "nosuchkey", "mysteryvalue"} + new Object[]{NULL_STRING, "nosuchkey", "mysteryvalue"}, + new Object[]{NULL_STRING, "6", "x6"} ) ); } @@ -2565,9 +2565,9 @@ public void testSelectOnLookupUsingFullJoinOperator(Map queryCon new Object[]{"1", 4f, 1L, NULL_STRING, NULL_STRING}, new Object[]{"def", 5f, 1L, NULL_STRING, NULL_STRING}, new Object[]{"abc", 6f, 1L, "abc", "xabc"}, - new Object[]{NULL_STRING, NULL_FLOAT, NULL_LONG, "6", "x6"}, new Object[]{NULL_STRING, NULL_FLOAT, NULL_LONG, "a", "xa"}, - new Object[]{NULL_STRING, NULL_FLOAT, NULL_LONG, "nosuchkey", "mysteryvalue"} + new Object[]{NULL_STRING, NULL_FLOAT, NULL_LONG, "nosuchkey", "mysteryvalue"}, + new Object[]{NULL_STRING, NULL_FLOAT, NULL_LONG, "6", "x6"} ) ); } From 508e7cec7b92119148065c6bd51aa72ccae429cd Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Wed, 31 Jan 2024 15:32:29 +0530 Subject: [PATCH 5/8] revert some changes --- .../query/aggregation/AggregatorFactory.java | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorFactory.java index 9b86244d672c..8567c4b6d6c6 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorFactory.java @@ -39,24 +39,6 @@ import java.util.Map; /** - * - * SELECT dim1, SUM(m1) FROM foo GROUP BY dim1 - * - * { - * dimensions: dim1 - * metrics: a0 - * aggregators: SumAggregatorFactory() - * } - * - * a 1 - * a 3 - * b 2 - * a 5 - * b 5 - * a-> SumAggregator (1 + 3 - * b -> SumAggregator (2 - * - * * AggregatorFactory is a strategy (in the terms of Design Patterns) that represents column aggregation, e.g. min, * max, sum of metric columns, or cardinality of dimension columns (see {@link * org.apache.druid.query.aggregation.cardinality.CardinalityAggregatorFactory}). From 69db68e58b6daeada124326a42bf13d9a4710ea1 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Wed, 31 Jan 2024 15:52:38 +0530 Subject: [PATCH 6/8] revert some changes, and doc changes --- docs/querying/sql-aggregations.md | 4 ++-- .../aggregation/builtin/EarliestLatestAnySqlAggregator.java | 2 +- .../aggregation/builtin/EarliestLatestBySqlAggregator.java | 3 --- 3 files changed, 3 insertions(+), 6 deletions(-) diff --git a/docs/querying/sql-aggregations.md b/docs/querying/sql-aggregations.md index bb0998bf1fb0..82ab80de4921 100644 --- a/docs/querying/sql-aggregations.md +++ b/docs/querying/sql-aggregations.md @@ -87,9 +87,9 @@ In the aggregation functions supported by Druid, only `COUNT`, `ARRAY_AGG`, and |`STDDEV_SAMP(expr)`|Computes standard deviation sample of `expr`. See [stats extension](../development/extensions-core/stats.md) documentation for additional details.|`null` or `0` if `druid.generic.useDefaultValueForNull=true` (legacy mode)| |`STDDEV(expr)`|Computes standard deviation sample of `expr`. See [stats extension](../development/extensions-core/stats.md) documentation for additional details.|`null` or `0` if `druid.generic.useDefaultValueForNull=true` (legacy mode)| |`EARLIEST(expr, [maxBytesPerValue])`|Returns the earliest value of `expr`.
If `expr` comes from a relation with a timestamp column (like `__time` in a Druid datasource), the "earliest" is taken from the row with the overall earliest non-null value of the timestamp column.
If the earliest non-null value of the timestamp column appears in multiple rows, the `expr` may be taken from any of those rows. If `expr` does not come from a relation with a timestamp, then it is simply the first value encountered.

If `expr` is a string or complex type `maxBytesPerValue` amount of space is allocated for the aggregation. Strings longer than this limit are truncated. The `maxBytesPerValue` parameter should be set as low as possible, since high values will lead to wasted memory.
If `maxBytesPerValue`is omitted; it defaults to `1024`. |`null` or `0`/`''` if `druid.generic.useDefaultValueForNull=true` (legacy mode)| -|`EARLIEST_BY(expr, timestampExpr, [maxBytesPerValue])`|Returns the earliest value of `expr`.
The earliest value of `expr` is taken from the row with the overall earliest non-null value of `timestampExpr`.
If the earliest non-null value of `timestampExpr` appears in multiple rows, the `expr` may be taken from any of those rows.

If `expr` is a string or complex type `maxBytesPerValue` amount of space is allocated for the aggregation. Strings longer than this limit are truncated. The `maxBytesPerValue` parameter should be set as low as possible, since high values will lead to wasted memory.
If `maxBytesPerValue`is omitted; it defaults to `1024`. It cannot be used with the rollup tables created with any variant of the EARLIEST/LATEST/EARLIEST_BY/LATEST_BY aggregator since the intermediate type will already contain the timestamp and the one passed explicitly will get ignored by the native engine. USE `EARLIEST` in such scenario. |`null` or `0`/`''` if `druid.generic.useDefaultValueForNull=true` (legacy mode)| +|`EARLIEST_BY(expr, timestampExpr, [maxBytesPerValue])`|Returns the earliest value of `expr`.
The earliest value of `expr` is taken from the row with the overall earliest non-null value of `timestampExpr`.
If the earliest non-null value of `timestampExpr` appears in multiple rows, the `expr` may be taken from any of those rows.

If `expr` is a string or complex type `maxBytesPerValue` amount of space is allocated for the aggregation. Strings longer than this limit are truncated. The `maxBytesPerValue` parameter should be set as low as possible, since high values will lead to wasted memory.
If `maxBytesPerValue`is omitted; it defaults to `1024`.
Use `EARLIEST` instead of `EARLIEST_BY` on a table that has rollup enabled and was created with any variant of `EARLIEST`, `LATEST`, `EARLIEST_BY`, or `LATEST_BY`. In these cases, the intermediate type already stores the timestamp, and Druid ignores the value passed in `timestampExpr`. |`null` or `0`/`''` if `druid.generic.useDefaultValueForNull=true` (legacy mode)| |`LATEST(expr, [maxBytesPerValue])`|Returns the latest value of `expr`
The `expr` must come from a relation with a timestamp column (like `__time` in a Druid datasource) and the "latest" is taken from the row with the overall latest non-null value of the timestamp column.
If the latest non-null value of the timestamp column appears in multiple rows, the `expr` may be taken from any of those rows.

If `expr` is a string or complex type `maxBytesPerValue` amount of space is allocated for the aggregation. Strings longer than this limit are truncated. The `maxBytesPerValue` parameter should be set as low as possible, since high values will lead to wasted memory.
If `maxBytesPerValue`is omitted; it defaults to `1024`. |`null` or `0`/`''` if `druid.generic.useDefaultValueForNull=true` (legacy mode)| -|`LATEST_BY(expr, timestampExpr, [maxBytesPerValue])`|Returns the latest value of `expr`.
The latest value of `expr` is taken from the row with the overall latest non-null value of `timestampExpr`.
If the overall latest non-null value of `timestampExpr` appears in multiple rows, the `expr` may be taken from any of those rows.

If `expr` is a string or complex type `maxBytesPerValue` amount of space is allocated for the aggregation. Strings longer than this limit are truncated. The `maxBytesPerValue` parameter should be set as low as possible, since high values will lead to wasted memory.
If `maxBytesPerValue`is omitted; it defaults to `1024`. It cannot be used with the rollup tables created with any variant of the EARLIEST/LATEST/EARLIEST_BY/LATEST_BY aggregator since the intermediate type will already contain the timestamp and the one passed explicitly will get ignored by the native engine. Use `LATEST` in such scenario. |`null` or `0`/`''` if `druid.generic.useDefaultValueForNull=true` (legacy mode)| +|`LATEST_BY(expr, timestampExpr, [maxBytesPerValue])`|Returns the latest value of `expr`.
The latest value of `expr` is taken from the row with the overall latest non-null value of `timestampExpr`.
If the overall latest non-null value of `timestampExpr` appears in multiple rows, the `expr` may be taken from any of those rows.

If `expr` is a string or complex type `maxBytesPerValue` amount of space is allocated for the aggregation. Strings longer than this limit are truncated. The `maxBytesPerValue` parameter should be set as low as possible, since high values will lead to wasted memory.
If `maxBytesPerValue`is omitted; it defaults to `1024`.
Use `LATEST` instead of `LATEST_BY` on a table that has rollup enabled and was created with any variant of `EARLIEST`, `LATEST`, `EARLIEST_BY`, or `LATEST_BY`. In these cases, the intermediate type already stores the timestamp, and Druid ignores the value passed in `timestampExpr`. |`null` or `0`/`''` if `druid.generic.useDefaultValueForNull=true` (legacy mode)| |`ANY_VALUE(expr, [maxBytesPerValue, [aggregateMultipleValues]])`|Returns any value of `expr` including null. This aggregator can simplify and optimize the performance by returning the first encountered value (including `null`).

If `expr` is a string or complex type `maxBytesPerValue` amount of space is allocated for the aggregation. Strings longer than this limit are truncated. The `maxBytesPerValue` parameter should be set as low as possible, since high values will lead to wasted memory.
If `maxBytesPerValue` is omitted; it defaults to `1024`. `aggregateMultipleValues` is an optional boolean flag controls the behavior of aggregating a [multi-value dimension](./multi-value-dimensions.md). `aggregateMultipleValues` is set as true by default and returns the stringified array in case of a multi-value dimension. By setting it to false, function will return first value instead. |`null` or `0`/`''` if `druid.generic.useDefaultValueForNull=true` (legacy mode)| |`GROUPING(expr, expr...)`|Returns a number to indicate which groupBy dimension is included in a row, when using `GROUPING SETS`. Refer to [additional documentation](aggregations.md#grouping-aggregator) on how to infer this number.|N/A| |`ARRAY_AGG(expr, [size])`|Collects all values of `expr` into an ARRAY, including null values, with `size` in bytes limit on aggregation size (default of 1024 bytes). If the aggregated array grows larger than the maximum size in bytes, the query will fail. Use of `ORDER BY` within the `ARRAY_AGG` expression is not currently supported, and the ordering of results within the output array may vary depending on processing order.|`null`| diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java index 507eb9fd3320..b47985ea95a9 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java @@ -229,7 +229,7 @@ public Aggregation toDruidAggregation( ); } - final String fieldName = getColumnName(plannerContext, virtualColumnRegistry, args.get(0), rexNodes.get(0)); + final String fieldName = getColumnName(virtualColumnRegistry, args.get(0), rexNodes.get(0)); if (!inputAccessor.getInputRowSignature().contains(ColumnHolder.TIME_COLUMN_NAME) && (aggregatorType == AggregatorType.LATEST || aggregatorType == AggregatorType.EARLIEST)) { diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestBySqlAggregator.java b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestBySqlAggregator.java index fac88d853e11..c72ad3150ad5 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestBySqlAggregator.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestBySqlAggregator.java @@ -100,7 +100,6 @@ public Aggregation toDruidAggregation( } final String fieldName = EarliestLatestAnySqlAggregator.getColumnName( - plannerContext, virtualColumnRegistry, args.get(0), rexNodes.get(0) @@ -113,7 +112,6 @@ public Aggregation toDruidAggregation( aggregatorName, fieldName, EarliestLatestAnySqlAggregator.getColumnName( - plannerContext, virtualColumnRegistry, args.get(1), rexNodes.get(1) @@ -140,7 +138,6 @@ public Aggregation toDruidAggregation( aggregatorName, fieldName, EarliestLatestAnySqlAggregator.getColumnName( - plannerContext, virtualColumnRegistry, args.get(1), rexNodes.get(1) From a42ede6df09d926261687abb2d16957dfd9f8eea Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Thu, 1 Feb 2024 00:50:44 +0530 Subject: [PATCH 7/8] Update docs/querying/sql-aggregations.md Co-authored-by: Victoria Lim --- docs/querying/sql-aggregations.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/querying/sql-aggregations.md b/docs/querying/sql-aggregations.md index 82ab80de4921..45701a93ccf9 100644 --- a/docs/querying/sql-aggregations.md +++ b/docs/querying/sql-aggregations.md @@ -87,7 +87,7 @@ In the aggregation functions supported by Druid, only `COUNT`, `ARRAY_AGG`, and |`STDDEV_SAMP(expr)`|Computes standard deviation sample of `expr`. See [stats extension](../development/extensions-core/stats.md) documentation for additional details.|`null` or `0` if `druid.generic.useDefaultValueForNull=true` (legacy mode)| |`STDDEV(expr)`|Computes standard deviation sample of `expr`. See [stats extension](../development/extensions-core/stats.md) documentation for additional details.|`null` or `0` if `druid.generic.useDefaultValueForNull=true` (legacy mode)| |`EARLIEST(expr, [maxBytesPerValue])`|Returns the earliest value of `expr`.
If `expr` comes from a relation with a timestamp column (like `__time` in a Druid datasource), the "earliest" is taken from the row with the overall earliest non-null value of the timestamp column.
If the earliest non-null value of the timestamp column appears in multiple rows, the `expr` may be taken from any of those rows. If `expr` does not come from a relation with a timestamp, then it is simply the first value encountered.

If `expr` is a string or complex type `maxBytesPerValue` amount of space is allocated for the aggregation. Strings longer than this limit are truncated. The `maxBytesPerValue` parameter should be set as low as possible, since high values will lead to wasted memory.
If `maxBytesPerValue`is omitted; it defaults to `1024`. |`null` or `0`/`''` if `druid.generic.useDefaultValueForNull=true` (legacy mode)| -|`EARLIEST_BY(expr, timestampExpr, [maxBytesPerValue])`|Returns the earliest value of `expr`.
The earliest value of `expr` is taken from the row with the overall earliest non-null value of `timestampExpr`.
If the earliest non-null value of `timestampExpr` appears in multiple rows, the `expr` may be taken from any of those rows.

If `expr` is a string or complex type `maxBytesPerValue` amount of space is allocated for the aggregation. Strings longer than this limit are truncated. The `maxBytesPerValue` parameter should be set as low as possible, since high values will lead to wasted memory.
If `maxBytesPerValue`is omitted; it defaults to `1024`.
Use `EARLIEST` instead of `EARLIEST_BY` on a table that has rollup enabled and was created with any variant of `EARLIEST`, `LATEST`, `EARLIEST_BY`, or `LATEST_BY`. In these cases, the intermediate type already stores the timestamp, and Druid ignores the value passed in `timestampExpr`. |`null` or `0`/`''` if `druid.generic.useDefaultValueForNull=true` (legacy mode)| +|`EARLIEST_BY(expr, timestampExpr, [maxBytesPerValue])`|Returns the earliest value of `expr`.
The earliest value of `expr` is taken from the row with the overall earliest non-null value of `timestampExpr`.
If the earliest non-null value of `timestampExpr` appears in multiple rows, the `expr` may be taken from any of those rows.

If `expr` is a string or complex type `maxBytesPerValue` amount of space is allocated for the aggregation. Strings longer than this limit are truncated. The `maxBytesPerValue` parameter should be set as low as possible, since high values will lead to wasted memory.
If `maxBytesPerValue`is omitted; it defaults to `1024`.

Use `EARLIEST` instead of `EARLIEST_BY` on a table that has rollup enabled and was created with any variant of `EARLIEST`, `LATEST`, `EARLIEST_BY`, or `LATEST_BY`. In these cases, the intermediate type already stores the timestamp, and Druid ignores the value passed in `timestampExpr`. |`null` or `0`/`''` if `druid.generic.useDefaultValueForNull=true` (legacy mode)| |`LATEST(expr, [maxBytesPerValue])`|Returns the latest value of `expr`
The `expr` must come from a relation with a timestamp column (like `__time` in a Druid datasource) and the "latest" is taken from the row with the overall latest non-null value of the timestamp column.
If the latest non-null value of the timestamp column appears in multiple rows, the `expr` may be taken from any of those rows.

If `expr` is a string or complex type `maxBytesPerValue` amount of space is allocated for the aggregation. Strings longer than this limit are truncated. The `maxBytesPerValue` parameter should be set as low as possible, since high values will lead to wasted memory.
If `maxBytesPerValue`is omitted; it defaults to `1024`. |`null` or `0`/`''` if `druid.generic.useDefaultValueForNull=true` (legacy mode)| |`LATEST_BY(expr, timestampExpr, [maxBytesPerValue])`|Returns the latest value of `expr`.
The latest value of `expr` is taken from the row with the overall latest non-null value of `timestampExpr`.
If the overall latest non-null value of `timestampExpr` appears in multiple rows, the `expr` may be taken from any of those rows.

If `expr` is a string or complex type `maxBytesPerValue` amount of space is allocated for the aggregation. Strings longer than this limit are truncated. The `maxBytesPerValue` parameter should be set as low as possible, since high values will lead to wasted memory.
If `maxBytesPerValue`is omitted; it defaults to `1024`.
Use `LATEST` instead of `LATEST_BY` on a table that has rollup enabled and was created with any variant of `EARLIEST`, `LATEST`, `EARLIEST_BY`, or `LATEST_BY`. In these cases, the intermediate type already stores the timestamp, and Druid ignores the value passed in `timestampExpr`. |`null` or `0`/`''` if `druid.generic.useDefaultValueForNull=true` (legacy mode)| |`ANY_VALUE(expr, [maxBytesPerValue, [aggregateMultipleValues]])`|Returns any value of `expr` including null. This aggregator can simplify and optimize the performance by returning the first encountered value (including `null`).

If `expr` is a string or complex type `maxBytesPerValue` amount of space is allocated for the aggregation. Strings longer than this limit are truncated. The `maxBytesPerValue` parameter should be set as low as possible, since high values will lead to wasted memory.
If `maxBytesPerValue` is omitted; it defaults to `1024`. `aggregateMultipleValues` is an optional boolean flag controls the behavior of aggregating a [multi-value dimension](./multi-value-dimensions.md). `aggregateMultipleValues` is set as true by default and returns the stringified array in case of a multi-value dimension. By setting it to false, function will return first value instead. |`null` or `0`/`''` if `druid.generic.useDefaultValueForNull=true` (legacy mode)| From 110682bc352602a6cd1a806e9fc0b7f1dfa64d2a Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Thu, 1 Feb 2024 00:50:52 +0530 Subject: [PATCH 8/8] Update docs/querying/sql-aggregations.md Co-authored-by: Victoria Lim --- docs/querying/sql-aggregations.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/querying/sql-aggregations.md b/docs/querying/sql-aggregations.md index 45701a93ccf9..5124b75c7798 100644 --- a/docs/querying/sql-aggregations.md +++ b/docs/querying/sql-aggregations.md @@ -89,7 +89,7 @@ In the aggregation functions supported by Druid, only `COUNT`, `ARRAY_AGG`, and |`EARLIEST(expr, [maxBytesPerValue])`|Returns the earliest value of `expr`.
If `expr` comes from a relation with a timestamp column (like `__time` in a Druid datasource), the "earliest" is taken from the row with the overall earliest non-null value of the timestamp column.
If the earliest non-null value of the timestamp column appears in multiple rows, the `expr` may be taken from any of those rows. If `expr` does not come from a relation with a timestamp, then it is simply the first value encountered.

If `expr` is a string or complex type `maxBytesPerValue` amount of space is allocated for the aggregation. Strings longer than this limit are truncated. The `maxBytesPerValue` parameter should be set as low as possible, since high values will lead to wasted memory.
If `maxBytesPerValue`is omitted; it defaults to `1024`. |`null` or `0`/`''` if `druid.generic.useDefaultValueForNull=true` (legacy mode)| |`EARLIEST_BY(expr, timestampExpr, [maxBytesPerValue])`|Returns the earliest value of `expr`.
The earliest value of `expr` is taken from the row with the overall earliest non-null value of `timestampExpr`.
If the earliest non-null value of `timestampExpr` appears in multiple rows, the `expr` may be taken from any of those rows.

If `expr` is a string or complex type `maxBytesPerValue` amount of space is allocated for the aggregation. Strings longer than this limit are truncated. The `maxBytesPerValue` parameter should be set as low as possible, since high values will lead to wasted memory.
If `maxBytesPerValue`is omitted; it defaults to `1024`.

Use `EARLIEST` instead of `EARLIEST_BY` on a table that has rollup enabled and was created with any variant of `EARLIEST`, `LATEST`, `EARLIEST_BY`, or `LATEST_BY`. In these cases, the intermediate type already stores the timestamp, and Druid ignores the value passed in `timestampExpr`. |`null` or `0`/`''` if `druid.generic.useDefaultValueForNull=true` (legacy mode)| |`LATEST(expr, [maxBytesPerValue])`|Returns the latest value of `expr`
The `expr` must come from a relation with a timestamp column (like `__time` in a Druid datasource) and the "latest" is taken from the row with the overall latest non-null value of the timestamp column.
If the latest non-null value of the timestamp column appears in multiple rows, the `expr` may be taken from any of those rows.

If `expr` is a string or complex type `maxBytesPerValue` amount of space is allocated for the aggregation. Strings longer than this limit are truncated. The `maxBytesPerValue` parameter should be set as low as possible, since high values will lead to wasted memory.
If `maxBytesPerValue`is omitted; it defaults to `1024`. |`null` or `0`/`''` if `druid.generic.useDefaultValueForNull=true` (legacy mode)| -|`LATEST_BY(expr, timestampExpr, [maxBytesPerValue])`|Returns the latest value of `expr`.
The latest value of `expr` is taken from the row with the overall latest non-null value of `timestampExpr`.
If the overall latest non-null value of `timestampExpr` appears in multiple rows, the `expr` may be taken from any of those rows.

If `expr` is a string or complex type `maxBytesPerValue` amount of space is allocated for the aggregation. Strings longer than this limit are truncated. The `maxBytesPerValue` parameter should be set as low as possible, since high values will lead to wasted memory.
If `maxBytesPerValue`is omitted; it defaults to `1024`.
Use `LATEST` instead of `LATEST_BY` on a table that has rollup enabled and was created with any variant of `EARLIEST`, `LATEST`, `EARLIEST_BY`, or `LATEST_BY`. In these cases, the intermediate type already stores the timestamp, and Druid ignores the value passed in `timestampExpr`. |`null` or `0`/`''` if `druid.generic.useDefaultValueForNull=true` (legacy mode)| +|`LATEST_BY(expr, timestampExpr, [maxBytesPerValue])`|Returns the latest value of `expr`.
The latest value of `expr` is taken from the row with the overall latest non-null value of `timestampExpr`.
If the overall latest non-null value of `timestampExpr` appears in multiple rows, the `expr` may be taken from any of those rows.

If `expr` is a string or complex type `maxBytesPerValue` amount of space is allocated for the aggregation. Strings longer than this limit are truncated. The `maxBytesPerValue` parameter should be set as low as possible, since high values will lead to wasted memory.
If `maxBytesPerValue`is omitted; it defaults to `1024`.

Use `LATEST` instead of `LATEST_BY` on a table that has rollup enabled and was created with any variant of `EARLIEST`, `LATEST`, `EARLIEST_BY`, or `LATEST_BY`. In these cases, the intermediate type already stores the timestamp, and Druid ignores the value passed in `timestampExpr`. |`null` or `0`/`''` if `druid.generic.useDefaultValueForNull=true` (legacy mode)| |`ANY_VALUE(expr, [maxBytesPerValue, [aggregateMultipleValues]])`|Returns any value of `expr` including null. This aggregator can simplify and optimize the performance by returning the first encountered value (including `null`).

If `expr` is a string or complex type `maxBytesPerValue` amount of space is allocated for the aggregation. Strings longer than this limit are truncated. The `maxBytesPerValue` parameter should be set as low as possible, since high values will lead to wasted memory.
If `maxBytesPerValue` is omitted; it defaults to `1024`. `aggregateMultipleValues` is an optional boolean flag controls the behavior of aggregating a [multi-value dimension](./multi-value-dimensions.md). `aggregateMultipleValues` is set as true by default and returns the stringified array in case of a multi-value dimension. By setting it to false, function will return first value instead. |`null` or `0`/`''` if `druid.generic.useDefaultValueForNull=true` (legacy mode)| |`GROUPING(expr, expr...)`|Returns a number to indicate which groupBy dimension is included in a row, when using `GROUPING SETS`. Refer to [additional documentation](aggregations.md#grouping-aggregator) on how to infer this number.|N/A| |`ARRAY_AGG(expr, [size])`|Collects all values of `expr` into an ARRAY, including null values, with `size` in bytes limit on aggregation size (default of 1024 bytes). If the aggregated array grows larger than the maximum size in bytes, the query will fail. Use of `ORDER BY` within the `ARRAY_AGG` expression is not currently supported, and the ordering of results within the output array may vary depending on processing order.|`null`|