diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/DataSketchesHllBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/DataSketchesHllBenchmark.java index 6fbffa54023e..bed30c85c64b 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/DataSketchesHllBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/DataSketchesHllBenchmark.java @@ -64,6 +64,7 @@ public class DataSketchesHllBenchmark "hll", null, null, + null, false ); diff --git a/codestyle/spotbugs-exclude.xml b/codestyle/spotbugs-exclude.xml index 607d4da423a7..79a6faf61c5a 100644 --- a/codestyle/spotbugs-exclude.xml +++ b/codestyle/spotbugs-exclude.xml @@ -44,6 +44,7 @@ + diff --git a/core/src/main/java/org/apache/druid/jackson/DefaultTrueJsonIncludeFilter.java b/core/src/main/java/org/apache/druid/jackson/DefaultTrueJsonIncludeFilter.java new file mode 100644 index 000000000000..6f412bbd9657 --- /dev/null +++ b/core/src/main/java/org/apache/druid/jackson/DefaultTrueJsonIncludeFilter.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.jackson; + +import com.fasterxml.jackson.annotation.JsonInclude; + +/** + * {@link JsonInclude} filter for boolean values that default to true. + * + * This API works by "creative" use of equals. It requires warnings to be suppressed and also requires spotbugs + * exclusions (see spotbugs-exclude.xml). + */ +@SuppressWarnings({"EqualsAndHashcode", "EqualsHashCode"}) +public class DefaultTrueJsonIncludeFilter // lgtm [java/inconsistent-equals-and-hashcode] +{ + @Override + public boolean equals(Object obj) + { + return obj == null || (obj instanceof Boolean && (boolean) obj); + } +} diff --git a/docs/querying/sql-query-context.md b/docs/querying/sql-query-context.md index 550ce3c78c96..caab4772ab69 100644 --- a/docs/querying/sql-query-context.md +++ b/docs/querying/sql-query-context.md @@ -36,13 +36,14 @@ Configure Druid SQL query planning using the parameters in the table below. |Parameter|Description|Default value| |---------|-----------|-------------| |`sqlQueryId`|Unique identifier given to this SQL query. For HTTP client, it will be returned in `X-Druid-SQL-Query-Id` header.

To specify a unique identifier for SQL query, use `sqlQueryId` instead of [`queryId`](query-context.md). Setting `queryId` for a SQL request has no effect. All native queries underlying SQL use an auto-generated `queryId`.|auto-generated| -|`sqlTimeZone`|Sets the time zone for this connection, which will affect how time functions and timestamp literals behave. Should be a time zone name like "America/Los_Angeles" or offset like "-08:00".|druid.sql.planner.sqlTimeZone on the Broker (default: UTC)| -|`sqlStringifyArrays`|When set to true, result columns which return array values will be serialized into a JSON string in the response instead of as an array (default: true, except for JDBC connections, where it is always false)| -|`useApproximateCountDistinct`|Whether to use an approximate cardinality algorithm for `COUNT(DISTINCT foo)`.|druid.sql.planner.useApproximateCountDistinct on the Broker (default: true)| -|`useGroupingSetForExactDistinct`|Whether to use grouping sets to execute queries with multiple exact distinct aggregations.|druid.sql.planner.useGroupingSetForExactDistinct on the Broker (default: false)| -|`useApproximateTopN`|Whether to use approximate [TopN queries](topnquery.md) when a SQL query could be expressed as such. If false, exact [GroupBy queries](groupbyquery.md) will be used instead.|druid.sql.planner.useApproximateTopN on the Broker (default: true)| -|`enableTimeBoundaryPlanning`|If true, SQL queries will get converted to TimeBoundary queries wherever possible. TimeBoundary queries are very efficient for min-max calculation on __time column in a datasource |druid.query.default.context.enableTimeBoundaryPlanning on the Broker (default: false)| -|`useNativeQueryExplain`|If true, `EXPLAIN PLAN FOR` will return the explain plan as a JSON representation of equivalent native query(s), else it will return the original version of explain plan generated by Calcite.|`druid.sql.planner.useNativeQueryExplain` on the Broker (default: true)| +|`sqlTimeZone`|Sets the time zone for this connection, which will affect how time functions and timestamp literals behave. Should be a time zone name like "America/Los_Angeles" or offset like "-08:00".|`druid.sql.planner.sqlTimeZone` on the Broker (default: UTC)| +|`sqlStringifyArrays`|When set to true, result columns which return array values will be serialized into a JSON string in the response instead of as an array|true, except for JDBC connections, where it is always false| +|`useApproximateCountDistinct`|Whether to use an approximate cardinality algorithm for `COUNT(DISTINCT foo)`.|`druid.sql.planner.useApproximateCountDistinct` on the Broker (default: true)| +|`useGroupingSetForExactDistinct`|Whether to use grouping sets to execute queries with multiple exact distinct aggregations.|`druid.sql.planner.useGroupingSetForExactDistinct` on the Broker (default: false)| +|`useApproximateTopN`|Whether to use approximate [TopN queries](topnquery.md) when a SQL query could be expressed as such. If false, exact [GroupBy queries](groupbyquery.md) will be used instead.|`druid.sql.planner.useApproximateTopN` on the Broker (default: true)| +|`enableTimeBoundaryPlanning`|If true, SQL queries will get converted to TimeBoundary queries wherever possible. TimeBoundary queries are very efficient for min-max calculation on __time column in a datasource |`druid.query.default.context.enableTimeBoundaryPlanning` on the Broker (default: false)| +|`useNativeQueryExplain`|If true, `EXPLAIN PLAN FOR` will return the explain plan as a JSON representation of equivalent native query(s), else it will return the original version of explain plan generated by Calcite.

This property is provided for backwards compatibility. It is not recommended to use this parameter unless you were depending on the older behavior.|`druid.sql.planner.useNativeQueryExplain` on the Broker (default: true)| +|`sqlFinalizeOuterSketches`|If false (default behavior in Druid 25.0.0 and later), `DS_HLL`, `DS_THETA`, and `DS_QUANTILES_SKETCH` return sketches in query results, as documented. If true (default behavior in Druid 24.0.1 and earlier), sketches from these functions are finalized when they appear in query results.

This property is provided for backwards compatibility with behavior in Druid 24.0.1 and earlier. It is not recommended to use this parameter unless you were depending on the older behavior. Instead, use a function that does not return a sketch, such as `APPROX_COUNT_DISTINCT_DS_HLL`, `APPROX_COUNT_DISTINCT_DS_THETA`, `APPROX_QUANTILE_DS`, `DS_THETA_ESTIMATE`, or `DS_GET_QUANTILE`.|`druid.query.default.context.sqlFinalizeOuterSketches` on the Broker (default: false)| ## Setting the query context The query context parameters can be specified as a "context" object in the [JSON API](sql-api.md) or as a [JDBC connection properties object](sql-jdbc.md). diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/SketchQueryContext.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/SketchQueryContext.java new file mode 100644 index 000000000000..977267e1beee --- /dev/null +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/SketchQueryContext.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.datasketches; + +import org.apache.druid.query.QueryContexts; +import org.apache.druid.sql.calcite.planner.PlannerContext; + +public class SketchQueryContext +{ + public static final String CTX_FINALIZE_OUTER_SKETCHES = "sqlFinalizeOuterSketches"; + public static final boolean DEFAULT_FINALIZE_OUTER_SKETCHES = false; + + public static boolean isFinalizeOuterSketches(final PlannerContext plannerContext) + { + return QueryContexts.getAsBoolean( + CTX_FINALIZE_OUTER_SKETCHES, + plannerContext.queryContextMap().get(CTX_FINALIZE_OUTER_SKETCHES), + DEFAULT_FINALIZE_OUTER_SKETCHES + ); + } +} diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorFactory.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorFactory.java index 6e01703f4acb..c518e5ab0548 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorFactory.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorFactory.java @@ -19,10 +19,12 @@ package org.apache.druid.query.aggregation.datasketches.hll; +import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.datasketches.hll.HllSketch; import org.apache.datasketches.hll.TgtHllType; import org.apache.datasketches.hll.Union; +import org.apache.druid.jackson.DefaultTrueJsonIncludeFilter; import org.apache.druid.query.aggregation.AggregateCombiner; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.ObjectAggregateCombiner; @@ -42,6 +44,7 @@ public abstract class HllSketchAggregatorFactory extends AggregatorFactory { public static final boolean DEFAULT_ROUND = false; + public static final boolean DEFAULT_SHOULD_FINALIZE = true; public static final int DEFAULT_LG_K = 12; public static final TgtHllType DEFAULT_TGT_HLL_TYPE = TgtHllType.HLL_4; @@ -52,6 +55,7 @@ public abstract class HllSketchAggregatorFactory extends AggregatorFactory private final String fieldName; private final int lgK; private final TgtHllType tgtHllType; + private final boolean shouldFinalize; private final boolean round; HllSketchAggregatorFactory( @@ -59,6 +63,7 @@ public abstract class HllSketchAggregatorFactory extends AggregatorFactory final String fieldName, @Nullable final Integer lgK, @Nullable final String tgtHllType, + final Boolean shouldFinalize, final boolean round ) { @@ -66,6 +71,7 @@ public abstract class HllSketchAggregatorFactory extends AggregatorFactory this.fieldName = Objects.requireNonNull(fieldName); this.lgK = lgK == null ? DEFAULT_LG_K : lgK; this.tgtHllType = tgtHllType == null ? DEFAULT_TGT_HLL_TYPE : TgtHllType.valueOf(tgtHllType); + this.shouldFinalize = shouldFinalize == null ? DEFAULT_SHOULD_FINALIZE : shouldFinalize; this.round = round; } @@ -95,6 +101,14 @@ public String getTgtHllType() } @JsonProperty + @JsonInclude(value = JsonInclude.Include.CUSTOM, valueFilter = DefaultTrueJsonIncludeFilter.class) + public boolean isShouldFinalize() + { + return shouldFinalize; + } + + @JsonProperty + @JsonInclude(JsonInclude.Include.NON_DEFAULT) public boolean isRound() { return round; @@ -114,7 +128,7 @@ public List requiredFields() public List getRequiredColumns() { return Collections.singletonList( - new HllSketchBuildAggregatorFactory(fieldName, fieldName, lgK, tgtHllType.toString(), round) + new HllSketchBuildAggregatorFactory(fieldName, fieldName, lgK, tgtHllType.toString(), shouldFinalize, round) ); } @@ -179,9 +193,10 @@ public ColumnType getResultType() @Override public Object finalizeComputation(@Nullable final Object object) { - if (object == null) { - return null; + if (!shouldFinalize || object == null) { + return object; } + final HllSketch sketch = (HllSketch) object; final double estimate = sketch.getEstimate(); @@ -201,7 +216,14 @@ public Comparator getComparator() @Override public AggregatorFactory getCombiningFactory() { - return new HllSketchMergeAggregatorFactory(getName(), getName(), getLgK(), getTgtHllType(), isRound()); + return new HllSketchMergeAggregatorFactory( + getName(), + getName(), + getLgK(), + getTgtHllType(), + isShouldFinalize(), + isRound() + ); } @Override @@ -212,51 +234,41 @@ public byte[] getCacheKey() } @Override - public boolean equals(final Object object) + public boolean equals(Object o) { - if (this == object) { + if (this == o) { return true; } - if (object == null || !getClass().equals(object.getClass())) { - return false; - } - final HllSketchAggregatorFactory that = (HllSketchAggregatorFactory) object; - if (!name.equals(that.getName())) { + if (o == null || getClass() != o.getClass()) { return false; } - if (!fieldName.equals(that.getFieldName())) { - return false; - } - if (lgK != that.getLgK()) { - return false; - } - if (!tgtHllType.equals(that.tgtHllType)) { - return false; - } - if (round != that.round) { - return false; - } - return true; + HllSketchAggregatorFactory that = (HllSketchAggregatorFactory) o; + return lgK == that.lgK + && shouldFinalize == that.shouldFinalize + && round == that.round + && Objects.equals(name, that.name) + && Objects.equals(fieldName, that.fieldName) + && tgtHllType == that.tgtHllType; } @Override public int hashCode() { - return Objects.hash(name, fieldName, lgK, tgtHllType); + return Objects.hash(name, fieldName, lgK, tgtHllType, shouldFinalize, round); } @Override public String toString() { - return getClass().getSimpleName() + " {" - + " name=" + name - + ", fieldName=" + fieldName - + ", lgK=" + lgK - + ", tgtHllType=" + tgtHllType - + ", round=" + round - + " }"; + return getClass().getSimpleName() + "{" + + "name='" + name + '\'' + + ", fieldName='" + fieldName + '\'' + + ", lgK=" + lgK + + ", tgtHllType=" + tgtHllType + + (shouldFinalize != DEFAULT_SHOULD_FINALIZE ? ", shouldFinalize=" + shouldFinalize : "") + + (round != DEFAULT_ROUND ? ", round=" + round : "") + + '}'; } protected abstract byte getCacheTypeId(); - } diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildAggregatorFactory.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildAggregatorFactory.java index 65d27df6e30b..226bda132455 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildAggregatorFactory.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildAggregatorFactory.java @@ -53,10 +53,11 @@ public HllSketchBuildAggregatorFactory( @JsonProperty("fieldName") final String fieldName, @JsonProperty("lgK") @Nullable final Integer lgK, @JsonProperty("tgtHllType") @Nullable final String tgtHllType, + @JsonProperty("shouldFinalize") final Boolean shouldFinalize, @JsonProperty("round") final boolean round ) { - super(name, fieldName, lgK, tgtHllType, round); + super(name, fieldName, lgK, tgtHllType, shouldFinalize, round); } @@ -125,7 +126,14 @@ public int getMaxIntermediateSize() @Override public AggregatorFactory withName(String newName) { - return new HllSketchBuildAggregatorFactory(newName, getFieldName(), getLgK(), getTgtHllType(), isRound()); + return new HllSketchBuildAggregatorFactory( + newName, + getFieldName(), + getLgK(), + getTgtHllType(), + isShouldFinalize(), + isRound() + ); } private void validateInputs(@Nullable ColumnCapabilities capabilities) diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeAggregatorFactory.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeAggregatorFactory.java index 8286f8392ff6..995830a641f3 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeAggregatorFactory.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeAggregatorFactory.java @@ -52,10 +52,11 @@ public HllSketchMergeAggregatorFactory( @JsonProperty("fieldName") final String fieldName, @JsonProperty("lgK") @Nullable final Integer lgK, @JsonProperty("tgtHllType") @Nullable final String tgtHllType, + @JsonProperty("shouldFinalize") final Boolean shouldFinalize, @JsonProperty("round") final boolean round ) { - super(name, fieldName, lgK, tgtHllType, round); + super(name, fieldName, lgK, tgtHllType, shouldFinalize, round); } @Override @@ -64,16 +65,19 @@ public AggregatorFactory getMergingFactory(AggregatorFactory other) throws Aggre if (other.getName().equals(this.getName()) && other instanceof HllSketchMergeAggregatorFactory) { HllSketchMergeAggregatorFactory castedOther = (HllSketchMergeAggregatorFactory) other; - return new HllSketchMergeAggregatorFactory( - getName(), - getName(), - Math.max(getLgK(), castedOther.getLgK()), - getTgtHllType().compareTo(castedOther.getTgtHllType()) < 0 ? castedOther.getTgtHllType() : getTgtHllType(), - isRound() || castedOther.isRound() - ); - } else { - throw new AggregatorFactoryNotMergeableException(this, other); + if (castedOther.isShouldFinalize() == isShouldFinalize()) { + return new HllSketchMergeAggregatorFactory( + getName(), + getName(), + Math.max(getLgK(), castedOther.getLgK()), + getTgtHllType().compareTo(castedOther.getTgtHllType()) < 0 ? castedOther.getTgtHllType() : getTgtHllType(), + isShouldFinalize(), + isRound() || castedOther.isRound() + ); + } } + + throw new AggregatorFactoryNotMergeableException(this, other); } @Override @@ -134,7 +138,14 @@ public int getMaxIntermediateSize() @Override public AggregatorFactory withName(String newName) { - return new HllSketchMergeAggregatorFactory(newName, getFieldName(), getLgK(), getTgtHllType(), isRound()); + return new HllSketchMergeAggregatorFactory( + newName, + getFieldName(), + getLgK(), + getTgtHllType(), + isShouldFinalize(), + isRound() + ); } } diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchApproxCountDistinctSqlAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchApproxCountDistinctSqlAggregator.java index d47b15a7eaff..5177e9fd53a7 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchApproxCountDistinctSqlAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchApproxCountDistinctSqlAggregator.java @@ -40,6 +40,11 @@ public class HllSketchApproxCountDistinctSqlAggregator extends HllSketchBaseSqlA private static final SqlAggFunction FUNCTION_INSTANCE = new HllSketchApproxCountDistinctSqlAggFunction(); + public HllSketchApproxCountDistinctSqlAggregator() + { + super(true); + } + @Override public SqlAggFunction calciteFunction() { diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchBaseSqlAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchBaseSqlAggregator.java index 2c576327e2c5..0266f1ed5e7e 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchBaseSqlAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchBaseSqlAggregator.java @@ -28,6 +28,7 @@ import org.apache.calcite.sql.SqlKind; import org.apache.druid.java.util.common.ISE; import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.datasketches.SketchQueryContext; import org.apache.druid.query.aggregation.datasketches.hll.HllSketchAggregatorFactory; import org.apache.druid.query.aggregation.datasketches.hll.HllSketchBuildAggregatorFactory; import org.apache.druid.query.aggregation.datasketches.hll.HllSketchMergeAggregatorFactory; @@ -51,6 +52,13 @@ public abstract class HllSketchBaseSqlAggregator implements SqlAggregator { private static final boolean ROUND = true; + private final boolean finalizeSketch; + + protected HllSketchBaseSqlAggregator(boolean finalizeSketch) + { + this.finalizeSketch = finalizeSketch; + } + @Nullable @Override public Aggregation toDruidAggregation( @@ -118,12 +126,15 @@ public Aggregation toDruidAggregation( final String aggregatorName = finalizeAggregations ? Calcites.makePrefixedName(name, "a") : name; if (columnArg.isDirectColumnAccess() - && rowSignature.getColumnType(columnArg.getDirectColumn()).map(type -> type.is(ValueType.COMPLEX)).orElse(false)) { + && rowSignature.getColumnType(columnArg.getDirectColumn()) + .map(type -> type.is(ValueType.COMPLEX)) + .orElse(false)) { aggregatorFactory = new HllSketchMergeAggregatorFactory( aggregatorName, columnArg.getDirectColumn(), logK, tgtHllType, + finalizeSketch || SketchQueryContext.isFinalizeOuterSketches(plannerContext), ROUND ); } else { @@ -154,6 +165,7 @@ public Aggregation toDruidAggregation( dimensionSpec.getDimension(), logK, tgtHllType, + finalizeSketch || SketchQueryContext.isFinalizeOuterSketches(plannerContext), ROUND ); } diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchEstimateOperatorConversion.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchEstimateOperatorConversion.java index 2379a144ba03..63980972150e 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchEstimateOperatorConversion.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchEstimateOperatorConversion.java @@ -81,7 +81,8 @@ public PostAggregator toPostAggregator( plannerContext, rowSignature, operands.get(0), - postAggregatorVisitor + postAggregatorVisitor, + true ); if (firstOperand == null) { diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchEstimateWithErrorBoundsOperatorConversion.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchEstimateWithErrorBoundsOperatorConversion.java index 9082e00211c7..83a0ce43bf55 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchEstimateWithErrorBoundsOperatorConversion.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchEstimateWithErrorBoundsOperatorConversion.java @@ -80,7 +80,8 @@ public PostAggregator toPostAggregator( plannerContext, rowSignature, operands.get(0), - postAggregatorVisitor + postAggregatorVisitor, + true ); if (firstOperand == null) { diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchObjectSqlAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchObjectSqlAggregator.java index f5da4d7c6a88..4dc18e176e2d 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchObjectSqlAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchObjectSqlAggregator.java @@ -38,6 +38,11 @@ public class HllSketchObjectSqlAggregator extends HllSketchBaseSqlAggregator imp private static final SqlAggFunction FUNCTION_INSTANCE = new HllSketchSqlAggFunction(); private static final String NAME = "DS_HLL"; + public HllSketchObjectSqlAggregator() + { + super(false); + } + @Override public SqlAggFunction calciteFunction() { diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchSetUnionOperatorConversion.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchSetUnionOperatorConversion.java index 2fe6a5e6ebfd..94440f68cfc1 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchSetUnionOperatorConversion.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchSetUnionOperatorConversion.java @@ -94,7 +94,8 @@ public PostAggregator toPostAggregator( plannerContext, rowSignature, operand, - postAggregatorVisitor + postAggregatorVisitor, + true ); if (convertedPostAgg == null) { if (operandCounter == 0) { diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchToStringOperatorConversion.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchToStringOperatorConversion.java index c143c6647c73..65118fc06838 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchToStringOperatorConversion.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchToStringOperatorConversion.java @@ -77,7 +77,8 @@ public PostAggregator toPostAggregator( plannerContext, rowSignature, operands.get(0), - postAggregatorVisitor + postAggregatorVisitor, + true ); if (firstOperand == null) { diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactory.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactory.java index 19694a5842c8..10ce17c4c189 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactory.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactory.java @@ -20,11 +20,13 @@ package org.apache.druid.query.aggregation.datasketches.quantiles; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.annotations.VisibleForTesting; import org.apache.datasketches.Util; import org.apache.datasketches.quantiles.DoublesSketch; import org.apache.datasketches.quantiles.DoublesUnion; +import org.apache.druid.jackson.DefaultTrueJsonIncludeFilter; import org.apache.druid.java.util.common.IAE; import org.apache.druid.query.aggregation.AggregateCombiner; import org.apache.druid.query.aggregation.Aggregator; @@ -62,6 +64,7 @@ public class DoublesSketchAggregatorFactory extends AggregatorFactory Comparator.nullsFirst(Comparator.comparingLong(DoublesSketch::getN)); public static final int DEFAULT_K = 128; + public static final boolean DEFAULT_SHOULD_FINALIZE = true; // Used for sketch size estimation. public static final long DEFAULT_MAX_STREAM_LENGTH = 1_000_000_000; @@ -70,6 +73,7 @@ public class DoublesSketchAggregatorFactory extends AggregatorFactory private final String fieldName; private final int k; private final long maxStreamLength; + private final boolean shouldFinalize; private final byte cacheTypeId; @JsonCreator @@ -77,10 +81,18 @@ public DoublesSketchAggregatorFactory( @JsonProperty("name") final String name, @JsonProperty("fieldName") final String fieldName, @JsonProperty("k") @Nullable final Integer k, - @JsonProperty("maxStreamLength") @Nullable final Long maxStreamLength + @JsonProperty("maxStreamLength") @Nullable final Long maxStreamLength, + @JsonProperty("shouldFinalize") @Nullable final Boolean shouldFinalize ) { - this(name, fieldName, k, maxStreamLength, AggregatorUtil.QUANTILES_DOUBLES_SKETCH_BUILD_CACHE_TYPE_ID); + this( + name, + fieldName, + k, + maxStreamLength, + shouldFinalize, + AggregatorUtil.QUANTILES_DOUBLES_SKETCH_BUILD_CACHE_TYPE_ID + ); } @VisibleForTesting @@ -90,7 +102,7 @@ public DoublesSketchAggregatorFactory( @Nullable final Integer k ) { - this(name, fieldName, k, null); + this(name, fieldName, k, null, DEFAULT_SHOULD_FINALIZE); } DoublesSketchAggregatorFactory( @@ -98,6 +110,7 @@ public DoublesSketchAggregatorFactory( final String fieldName, @Nullable final Integer k, @Nullable final Long maxStreamLength, + @Nullable final Boolean shouldFinalize, final byte cacheTypeId ) { @@ -112,6 +125,7 @@ public DoublesSketchAggregatorFactory( this.k = k == null ? DEFAULT_K : k; Util.checkIfPowerOf2(this.k, "k"); this.maxStreamLength = maxStreamLength == null ? DEFAULT_MAX_STREAM_LENGTH : maxStreamLength; + this.shouldFinalize = shouldFinalize == null ? DEFAULT_SHOULD_FINALIZE : shouldFinalize; this.cacheTypeId = cacheTypeId; } @@ -292,6 +306,13 @@ public long getMaxStreamLength() return maxStreamLength; } + @JsonProperty + @JsonInclude(value = JsonInclude.Include.CUSTOM, valueFilter = DefaultTrueJsonIncludeFilter.class) + public boolean isShouldFinalize() + { + return shouldFinalize; + } + @Override public List requiredFields() { @@ -307,7 +328,14 @@ public int guessAggregatorHeapFootprint(long rows) @Override public AggregatorFactory withName(String newName) { - return new DoublesSketchAggregatorFactory(newName, getFieldName(), getK(), getMaxStreamLength(), cacheTypeId); + return new DoublesSketchAggregatorFactory( + newName, + getFieldName(), + getK(), + getMaxStreamLength(), + shouldFinalize, + cacheTypeId + ); } // Quantiles sketches never stop growing, but they do so very slowly. @@ -327,7 +355,8 @@ public List getRequiredColumns() fieldName, fieldName, k, - maxStreamLength + maxStreamLength, + shouldFinalize ) ); } @@ -335,31 +364,40 @@ public List getRequiredColumns() @Override public AggregatorFactory getCombiningFactory() { - return new DoublesSketchMergeAggregatorFactory(name, k, maxStreamLength); + return new DoublesSketchMergeAggregatorFactory(name, k, maxStreamLength, shouldFinalize); } @Override public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException { if (other.getName().equals(this.getName()) && other instanceof DoublesSketchAggregatorFactory) { - // DoublesUnion supports inputs with different k. - // The result will have effective k between the specified k and the minimum k from all input sketches - // to achieve higher accuracy as much as possible. - return new DoublesSketchMergeAggregatorFactory( - name, - Math.max(k, ((DoublesSketchAggregatorFactory) other).k), - maxStreamLength - ); - } else { - throw new AggregatorFactoryNotMergeableException(this, other); + final DoublesSketchAggregatorFactory castedOther = (DoublesSketchAggregatorFactory) other; + + if (castedOther.shouldFinalize == shouldFinalize) { + // DoublesUnion supports inputs with different k. + // The result will have effective k between the specified k and the minimum k from all input sketches + // to achieve higher accuracy as much as possible. + return new DoublesSketchMergeAggregatorFactory( + name, + Math.max(k, castedOther.k), + Math.max(maxStreamLength, castedOther.maxStreamLength), + shouldFinalize + ); + } } + + throw new AggregatorFactoryNotMergeableException(this, other); } @Nullable @Override public Object finalizeComputation(@Nullable final Object object) { - return object == null ? null : ((DoublesSketch) object).getN(); + if (!shouldFinalize || object == null) { + return object; + } + + return ((DoublesSketch) object).getN(); } /** @@ -394,8 +432,11 @@ public boolean equals(Object o) return false; } DoublesSketchAggregatorFactory that = (DoublesSketchAggregatorFactory) o; + + // no need to use cacheTypeId here return k == that.k && maxStreamLength == that.maxStreamLength + && shouldFinalize == that.shouldFinalize && name.equals(that.name) && fieldName.equals(that.fieldName); } @@ -403,7 +444,8 @@ public boolean equals(Object o) @Override public int hashCode() { - return Objects.hash(name, fieldName, k, maxStreamLength); // no need to use cacheTypeId here + // no need to use cacheTypeId here + return Objects.hash(name, fieldName, k, maxStreamLength, shouldFinalize); } @Override diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeAggregatorFactory.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeAggregatorFactory.java index 55c39cff924c..cf65d3f49fb8 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeAggregatorFactory.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeAggregatorFactory.java @@ -40,10 +40,11 @@ public class DoublesSketchMergeAggregatorFactory extends DoublesSketchAggregator public DoublesSketchMergeAggregatorFactory( @JsonProperty("name") final String name, @JsonProperty("k") @Nullable final Integer k, - @JsonProperty("maxStreamLength") @Nullable final Long maxStreamLength + @JsonProperty("maxStreamLength") @Nullable final Long maxStreamLength, + @JsonProperty("shouldFinalize") @Nullable final Boolean shouldFinalize ) { - super(name, name, k, maxStreamLength, AggregatorUtil.QUANTILES_DOUBLES_SKETCH_MERGE_CACHE_TYPE_ID); + super(name, name, k, maxStreamLength, shouldFinalize, AggregatorUtil.QUANTILES_DOUBLES_SKETCH_MERGE_CACHE_TYPE_ID); } @VisibleForTesting @@ -52,7 +53,7 @@ public DoublesSketchMergeAggregatorFactory( @Nullable final Integer k ) { - this(name, k, null); + this(name, k, null, null); } @Override @@ -78,6 +79,6 @@ public BufferAggregator factorizeBuffered(final ColumnSelectorFactory metricFact @Override public AggregatorFactory withName(String newName) { - return new DoublesSketchMergeAggregatorFactory(newName, getK(), getMaxStreamLength()); + return new DoublesSketchMergeAggregatorFactory(newName, getK(), getMaxStreamLength(), isShouldFinalize()); } } diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchApproxQuantileSqlAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchApproxQuantileSqlAggregator.java index c6729e3036fd..c71185733765 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchApproxQuantileSqlAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchApproxQuantileSqlAggregator.java @@ -172,7 +172,8 @@ public Aggregation toDruidAggregation( histogramName, input.getDirectColumn(), k, - getMaxStreamLengthFromQueryContext(plannerContext.queryContext()) + getMaxStreamLengthFromQueryContext(plannerContext.queryContext()), + true ); } else { String virtualColumnName = virtualColumnRegistry.getOrCreateVirtualColumnForExpression( @@ -183,7 +184,8 @@ public Aggregation toDruidAggregation( histogramName, virtualColumnName, k, - getMaxStreamLengthFromQueryContext(plannerContext.queryContext()) + getMaxStreamLengthFromQueryContext(plannerContext.queryContext()), + true ); } diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchListArgBaseOperatorConversion.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchListArgBaseOperatorConversion.java index c9533ef337ff..f5d46c9d5009 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchListArgBaseOperatorConversion.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchListArgBaseOperatorConversion.java @@ -19,17 +19,25 @@ package org.apache.druid.query.aggregation.datasketches.quantiles.sql; +import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rex.RexCall; import org.apache.calcite.rex.RexLiteral; import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlCallBinding; import org.apache.calcite.sql.SqlFunction; import org.apache.calcite.sql.SqlFunctionCategory; import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlOperandCountRange; import org.apache.calcite.sql.SqlOperator; -import org.apache.calcite.sql.type.OperandTypes; +import org.apache.calcite.sql.SqlUtil; import org.apache.calcite.sql.type.ReturnTypes; import org.apache.calcite.sql.type.SqlOperandCountRanges; +import org.apache.calcite.sql.type.SqlOperandTypeChecker; +import org.apache.calcite.sql.type.SqlTypeFamily; import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.util.Static; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.query.aggregation.PostAggregator; import org.apache.druid.segment.column.RowSignature; import org.apache.druid.sql.calcite.expression.DruidExpression; @@ -72,47 +80,24 @@ public PostAggregator toPostAggregator( { final List operands = ((RexCall) rexNode).getOperands(); final double[] args = new double[operands.size() - 1]; - PostAggregator inputSketchPostAgg = null; - - int operandCounter = 0; - for (RexNode operand : operands) { - final PostAggregator convertedPostAgg = OperatorConversions.toPostAggregator( - plannerContext, - rowSignature, - operand, - postAggregatorVisitor - ); - if (convertedPostAgg == null) { - if (operandCounter > 0) { - try { - if (!operand.isA(SqlKind.LITERAL)) { - return null; - } - double arg = ((Number) RexLiteral.value(operand)).doubleValue(); - args[operandCounter - 1] = arg; - } - catch (ClassCastException cce) { - return null; - } - } else { - return null; - } - } else { - if (operandCounter == 0) { - inputSketchPostAgg = convertedPostAgg; - } else { - if (!operand.isA(SqlKind.LITERAL)) { - return null; - } - } - } - operandCounter++; - } + final PostAggregator inputSketchPostAgg = OperatorConversions.toPostAggregator( + plannerContext, + rowSignature, + operands.get(0), + postAggregatorVisitor, + true + ); if (inputSketchPostAgg == null) { return null; } + for (int i = 1; i < operands.size(); i++) { + RexNode operand = operands.get(i); + double arg = ((Number) RexLiteral.value(operand)).doubleValue(); + args[i - 1] = arg; + } + return makePostAgg( postAggregatorVisitor.getOutputNamePrefix() + postAggregatorVisitor.getAndIncrementCounter(), inputSketchPostAgg, @@ -129,7 +114,7 @@ private SqlFunction makeSqlFunction() factory -> Calcites.createSqlType(factory, SqlTypeName.OTHER) ), null, - OperandTypes.variadic(SqlOperandCountRanges.from(2)), + new DoublesSketchListArgOperandTypeChecker(), SqlFunctionCategory.USER_DEFINED_FUNCTION ); } @@ -141,4 +126,68 @@ public abstract PostAggregator makePostAgg( PostAggregator field, double[] args ); + + /** + * Minimum 2 arguments. 2nd and further arguments must be literal numbers. + */ + private static class DoublesSketchListArgOperandTypeChecker implements SqlOperandTypeChecker + { + private static final int REQUIRED_OPERANDS = 2; + + @Override + public boolean checkOperandTypes(SqlCallBinding callBinding, boolean throwOnFailure) + { + for (int i = 1; i < callBinding.operands().size(); i++) { + final SqlNode operand = callBinding.operands().get(i); + final RelDataType operandType = callBinding.getValidator().deriveType(callBinding.getScope(), operand); + + // Verify that 'operand' is a literal number. + if (!SqlUtil.isLiteral(operand)) { + return OperatorConversions.throwOrReturn( + throwOnFailure, + callBinding, + cb -> cb.getValidator() + .newValidationError( + operand, + Static.RESOURCE.argumentMustBeLiteral(callBinding.getOperator().getName()) + ) + ); + } + + if (!SqlTypeFamily.NUMERIC.contains(operandType)) { + return OperatorConversions.throwOrReturn( + throwOnFailure, + callBinding, + SqlCallBinding::newValidationSignatureError + ); + } + } + + return true; + } + + @Override + public SqlOperandCountRange getOperandCountRange() + { + return SqlOperandCountRanges.from(REQUIRED_OPERANDS); + } + + @Override + public String getAllowedSignatures(SqlOperator op, String opName) + { + return StringUtils.format("'%s(sketch, arg1, [arg2, ...])'", opName); + } + + @Override + public Consistency getConsistency() + { + return Consistency.NONE; + } + + @Override + public boolean isOptional(int i) + { + return i + 1 > REQUIRED_OPERANDS; + } + } } diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchObjectSqlAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchObjectSqlAggregator.java index 04654daaf238..309168d31cec 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchObjectSqlAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchObjectSqlAggregator.java @@ -34,6 +34,7 @@ import org.apache.calcite.sql.type.SqlTypeName; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.datasketches.SketchQueryContext; import org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchAggregatorFactory; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; @@ -114,7 +115,8 @@ public Aggregation toDruidAggregation( histogramName, input.getDirectColumn(), k, - DoublesSketchApproxQuantileSqlAggregator.getMaxStreamLengthFromQueryContext(plannerContext.queryContext()) + DoublesSketchApproxQuantileSqlAggregator.getMaxStreamLengthFromQueryContext(plannerContext.queryContext()), + SketchQueryContext.isFinalizeOuterSketches(plannerContext) ); } else { String virtualColumnName = virtualColumnRegistry.getOrCreateVirtualColumnForExpression( @@ -125,7 +127,8 @@ public Aggregation toDruidAggregation( histogramName, virtualColumnName, k, - DoublesSketchApproxQuantileSqlAggregator.getMaxStreamLengthFromQueryContext(plannerContext.queryContext()) + DoublesSketchApproxQuantileSqlAggregator.getMaxStreamLengthFromQueryContext(plannerContext.queryContext()), + SketchQueryContext.isFinalizeOuterSketches(plannerContext) ); } diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSingleArgBaseOperatorConversion.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSingleArgBaseOperatorConversion.java index e1756507ee53..984c739b4b14 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSingleArgBaseOperatorConversion.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSingleArgBaseOperatorConversion.java @@ -74,7 +74,8 @@ public PostAggregator toPostAggregator( plannerContext, rowSignature, operands.get(0), - postAggregatorVisitor + postAggregatorVisitor, + true ); if (firstOperand == null) { diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSummaryOperatorConversion.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSummaryOperatorConversion.java index c0a742cc78ed..22d4b77016e7 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSummaryOperatorConversion.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSummaryOperatorConversion.java @@ -77,7 +77,8 @@ public PostAggregator toPostAggregator( plannerContext, rowSignature, operands.get(0), - postAggregatorVisitor + postAggregatorVisitor, + true ); if (firstOperand == null) { diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchEstimatePostAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchEstimatePostAggregator.java index 119fb016518d..e3de0c071fdb 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchEstimatePostAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchEstimatePostAggregator.java @@ -20,6 +20,7 @@ package org.apache.druid.query.aggregation.datasketches.theta; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import com.google.common.collect.Ordering; @@ -32,7 +33,6 @@ import org.apache.druid.segment.column.ColumnType; import javax.annotation.Nullable; - import java.util.Comparator; import java.util.HashSet; import java.util.Map; @@ -122,6 +122,7 @@ public PostAggregator getField() @Nullable @JsonProperty + @JsonInclude(JsonInclude.Include.NON_NULL) public Integer getErrorBoundsStdDev() { return errorBoundsStdDev; diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchMergeAggregatorFactory.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchMergeAggregatorFactory.java index ee5b7bfb4209..41869d5ea509 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchMergeAggregatorFactory.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchMergeAggregatorFactory.java @@ -20,7 +20,9 @@ package org.apache.druid.query.aggregation.datasketches.theta; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.jackson.DefaultTrueJsonIncludeFilter; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorFactoryNotMergeableException; import org.apache.druid.query.aggregation.AggregatorUtil; @@ -94,12 +96,14 @@ public AggregatorFactory getMergingFactory(AggregatorFactory other) throws Aggre } @JsonProperty + @JsonInclude(value = JsonInclude.Include.CUSTOM, valueFilter = DefaultTrueJsonIncludeFilter.class) public boolean getShouldFinalize() { return shouldFinalize; } @JsonProperty + @JsonInclude(JsonInclude.Include.NON_DEFAULT) public boolean getIsInputThetaSketch() { return isInputThetaSketch; @@ -107,6 +111,7 @@ public boolean getIsInputThetaSketch() @Nullable @JsonProperty + @JsonInclude(JsonInclude.Include.NON_NULL) public Integer getErrorBoundsStdDev() { return errorBoundsStdDev; diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchApproxCountDistinctSqlAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchApproxCountDistinctSqlAggregator.java index cac6cdaee213..fc252f075da3 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchApproxCountDistinctSqlAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchApproxCountDistinctSqlAggregator.java @@ -40,6 +40,11 @@ public class ThetaSketchApproxCountDistinctSqlAggregator extends ThetaSketchBase private static final SqlAggFunction FUNCTION_INSTANCE = new ThetaSketchSqlAggFunction(); + public ThetaSketchApproxCountDistinctSqlAggregator() + { + super(true); + } + @Override public SqlAggFunction calciteFunction() { diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchBaseSqlAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchBaseSqlAggregator.java index 20485e1deb05..9db5ad92426b 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchBaseSqlAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchBaseSqlAggregator.java @@ -28,6 +28,7 @@ import org.apache.calcite.sql.SqlKind; import org.apache.druid.java.util.common.ISE; import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.datasketches.SketchQueryContext; import org.apache.druid.query.aggregation.datasketches.theta.SketchAggregatorFactory; import org.apache.druid.query.aggregation.datasketches.theta.SketchMergeAggregatorFactory; import org.apache.druid.query.dimension.DefaultDimensionSpec; @@ -48,6 +49,13 @@ public abstract class ThetaSketchBaseSqlAggregator implements SqlAggregator { + private final boolean finalizeSketch; + + protected ThetaSketchBaseSqlAggregator(boolean finalizeSketch) + { + this.finalizeSketch = finalizeSketch; + } + @Nullable @Override public Aggregation toDruidAggregation( @@ -97,12 +105,14 @@ public Aggregation toDruidAggregation( final String aggregatorName = finalizeAggregations ? Calcites.makePrefixedName(name, "a") : name; if (columnArg.isDirectColumnAccess() - && rowSignature.getColumnType(columnArg.getDirectColumn()).map(type -> type.is(ValueType.COMPLEX)).orElse(false)) { + && rowSignature.getColumnType(columnArg.getDirectColumn()) + .map(type -> type.is(ValueType.COMPLEX)) + .orElse(false)) { aggregatorFactory = new SketchMergeAggregatorFactory( aggregatorName, columnArg.getDirectColumn(), sketchSize, - null, + finalizeSketch || SketchQueryContext.isFinalizeOuterSketches(plannerContext), null, null ); @@ -133,7 +143,7 @@ public Aggregation toDruidAggregation( aggregatorName, dimensionSpec.getDimension(), sketchSize, - null, + finalizeSketch || SketchQueryContext.isFinalizeOuterSketches(plannerContext), null, null ); diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchEstimateOperatorConversion.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchEstimateOperatorConversion.java index cd943c719287..48d19e5c5f15 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchEstimateOperatorConversion.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchEstimateOperatorConversion.java @@ -77,7 +77,8 @@ public PostAggregator toPostAggregator( plannerContext, rowSignature, operands.get(0), - postAggregatorVisitor + postAggregatorVisitor, + true ); if (firstOperand == null) { diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchEstimateWithErrorBoundsOperatorConversion.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchEstimateWithErrorBoundsOperatorConversion.java index 206777ac82f7..459a12867bcc 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchEstimateWithErrorBoundsOperatorConversion.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchEstimateWithErrorBoundsOperatorConversion.java @@ -79,7 +79,8 @@ public PostAggregator toPostAggregator( plannerContext, rowSignature, operands.get(0), - postAggregatorVisitor + postAggregatorVisitor, + true ); if (firstOperand == null) { diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchObjectSqlAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchObjectSqlAggregator.java index ac2a4bac64eb..d0e95096e7d3 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchObjectSqlAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchObjectSqlAggregator.java @@ -24,9 +24,7 @@ import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.type.InferTypes; import org.apache.calcite.sql.type.OperandTypes; -import org.apache.calcite.sql.type.ReturnTypes; import org.apache.calcite.sql.type.SqlTypeFamily; -import org.apache.calcite.sql.type.SqlTypeName; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.sql.calcite.aggregation.Aggregation; import org.apache.druid.sql.calcite.aggregation.SqlAggregator; @@ -38,6 +36,11 @@ public class ThetaSketchObjectSqlAggregator extends ThetaSketchBaseSqlAggregator private static final SqlAggFunction FUNCTION_INSTANCE = new ThetaSketchObjectSqlAggFunction(); private static final String NAME = "DS_THETA"; + public ThetaSketchObjectSqlAggregator() + { + super(false); + } + @Override public SqlAggFunction calciteFunction() { @@ -67,7 +70,7 @@ private static class ThetaSketchObjectSqlAggFunction extends SqlAggFunction NAME, null, SqlKind.OTHER_FUNCTION, - ReturnTypes.explicit(SqlTypeName.OTHER), + ThetaSketchSqlOperators.RETURN_TYPE_INFERENCE, InferTypes.VARCHAR_1024, OperandTypes.or( OperandTypes.ANY, diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchSetBaseOperatorConversion.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchSetBaseOperatorConversion.java index 5ec31f014a56..0afad6cbe6b1 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchSetBaseOperatorConversion.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchSetBaseOperatorConversion.java @@ -27,9 +27,8 @@ import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlOperator; import org.apache.calcite.sql.type.OperandTypes; -import org.apache.calcite.sql.type.ReturnTypes; import org.apache.calcite.sql.type.SqlOperandCountRanges; -import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.sql.type.SqlTypeFamily; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.query.aggregation.PostAggregator; import org.apache.druid.query.aggregation.datasketches.theta.SketchSetPostAggregator; @@ -38,7 +37,6 @@ import org.apache.druid.sql.calcite.expression.OperatorConversions; import org.apache.druid.sql.calcite.expression.PostAggregatorVisitor; import org.apache.druid.sql.calcite.expression.SqlOperatorConversion; -import org.apache.druid.sql.calcite.planner.Calcites; import org.apache.druid.sql.calcite.planner.PlannerContext; import javax.annotation.Nullable; @@ -81,31 +79,24 @@ public PostAggregator toPostAggregator( final List inputPostAggs = new ArrayList<>(); Integer size = null; - int operandCounter = 0; - for (RexNode operand : operands) { - final PostAggregator convertedPostAgg = OperatorConversions.toPostAggregator( - plannerContext, - rowSignature, - operand, - postAggregatorVisitor - ); - if (convertedPostAgg == null) { - if (operandCounter == 0) { - try { - if (!operand.isA(SqlKind.LITERAL)) { - return null; - } - size = RexLiteral.intValue(operand); - } - catch (ClassCastException cce) { - return null; - } - } else { + for (int i = 0; i < operands.size(); i++) { + RexNode operand = operands.get(i); + if (i == 0 && operand.isA(SqlKind.LITERAL) && SqlTypeFamily.INTEGER.contains(operand.getType())) { + size = RexLiteral.intValue(operand); + } else { + final PostAggregator convertedPostAgg = OperatorConversions.toPostAggregator( + plannerContext, + rowSignature, + operand, + postAggregatorVisitor, + true + ); + + if (convertedPostAgg == null) { return null; + } else { + inputPostAggs.add(convertedPostAgg); } - } else { - inputPostAggs.add(convertedPostAgg); - operandCounter++; } } @@ -122,9 +113,7 @@ private SqlFunction makeSqlFunction() return new SqlFunction( getFunctionName(), SqlKind.OTHER_FUNCTION, - ReturnTypes.explicit( - factory -> Calcites.createSqlType(factory, SqlTypeName.OTHER) - ), + ThetaSketchSqlOperators.RETURN_TYPE_INFERENCE, null, OperandTypes.variadic(SqlOperandCountRanges.from(2)), SqlFunctionCategory.USER_DEFINED_FUNCTION diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchSqlOperators.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchSqlOperators.java new file mode 100644 index 000000000000..3dcfca36d0f7 --- /dev/null +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchSqlOperators.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.datasketches.theta.sql; + +import org.apache.calcite.sql.type.SqlReturnTypeInference; +import org.apache.druid.query.aggregation.datasketches.theta.SketchModule; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.sql.calcite.table.RowSignatures; + +public class ThetaSketchSqlOperators +{ + public static final SqlReturnTypeInference RETURN_TYPE_INFERENCE = + opBinding -> RowSignatures.makeComplexType( + opBinding.getTypeFactory(), + ColumnType.ofComplex(SketchModule.THETA_SKETCH), + true + ); +} diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorFactoryTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorFactoryTest.java index 2177177e1241..12198fda6c90 100644 --- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorFactoryTest.java +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorFactoryTest.java @@ -45,6 +45,7 @@ import java.lang.reflect.Modifier; import java.util.Arrays; import java.util.List; +import java.util.regex.Pattern; import java.util.stream.Collectors; public class HllSketchAggregatorFactoryTest @@ -80,6 +81,7 @@ public void testGetRequiredColumns() Assert.assertEquals(FIELD_NAME, aggregatorFactory.getFieldName()); Assert.assertEquals(LG_K, aggregatorFactory.getLgK()); Assert.assertEquals(TGT_HLL_TYPE, aggregatorFactory.getTgtHllType()); + Assert.assertEquals(HllSketchAggregatorFactory.DEFAULT_SHOULD_FINALIZE, aggregatorFactory.isShouldFinalize()); Assert.assertEquals(ROUND, aggregatorFactory.isRound()); } @@ -236,8 +238,13 @@ public void testToString() .collect(Collectors.toList()); for (Field field : toStringFields) { - String expectedToken = formatFieldForToString(field); - Assert.assertTrue("Missing \"" + expectedToken + "\"", string.contains(expectedToken)); + if ("shouldFinalize".equals(field.getName())) { + // Skip; not included in the toString if it has the default value. + continue; + } + + Pattern expectedPattern = testPatternForToString(field); + Assert.assertTrue("Missing \"" + field.getName() + "\"", expectedPattern.matcher(string).find()); } } @@ -256,6 +263,7 @@ public void testResultArraySignature() "col", null, null, + null, false ), new HllSketchBuildAggregatorFactory( @@ -263,6 +271,7 @@ public void testResultArraySignature() "col", null, null, + null, true ), new HllSketchMergeAggregatorFactory( @@ -270,6 +279,7 @@ public void testResultArraySignature() "col", null, null, + null, false ), new HllSketchMergeAggregatorFactory( @@ -277,6 +287,7 @@ public void testResultArraySignature() "col", null, null, + null, true ) ) @@ -319,9 +330,9 @@ private static boolean isToStringField(Field field) return Modifier.isPrivate(modfiers) && !Modifier.isStatic(modfiers) && Modifier.isFinal(modfiers); } - private static String formatFieldForToString(Field field) + private static Pattern testPatternForToString(Field field) { - return " " + field.getName() + "="; + return Pattern.compile("\\b" + Pattern.quote(field.getName()) + "="); } // Helper for testing abstract base class @@ -341,7 +352,7 @@ private static class TestHllSketchAggregatorFactory extends HllSketchAggregatorF boolean round ) { - super(name, fieldName, lgK, tgtHllType, round); + super(name, fieldName, lgK, tgtHllType, null, round); } @Override diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorTest.java index fa41db64d841..909a976246f8 100644 --- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorTest.java +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorTest.java @@ -254,7 +254,7 @@ public void testPostAggs() throws Exception .setGranularity(Granularities.ALL) .setInterval(Intervals.ETERNITY) .setAggregatorSpecs( - new HllSketchMergeAggregatorFactory("sketch", "sketch", null, null, false) + new HllSketchMergeAggregatorFactory("sketch", "sketch", null, null, null, false) ) .setPostAggregatorSpecs( ImmutableList.of( diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeAggregatorFactoryTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeAggregatorFactoryTest.java index 25d0726b779e..ee1bd965e190 100644 --- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeAggregatorFactoryTest.java +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeAggregatorFactoryTest.java @@ -31,6 +31,7 @@ public class HllSketchMergeAggregatorFactoryTest private static final String FIELD_NAME = "fieldName"; private static final int LG_K = 2; private static final String TGT_HLL_TYPE = TgtHllType.HLL_6.name(); + private static final boolean SHOULD_FINALIZE = true; private static final boolean ROUND = true; private HllSketchMergeAggregatorFactory targetRound; @@ -39,8 +40,8 @@ public class HllSketchMergeAggregatorFactoryTest @Before public void setUp() { - targetRound = new HllSketchMergeAggregatorFactory(NAME, FIELD_NAME, LG_K, TGT_HLL_TYPE, ROUND); - targetNoRound = new HllSketchMergeAggregatorFactory(NAME, FIELD_NAME, LG_K, TGT_HLL_TYPE, !ROUND); + targetRound = new HllSketchMergeAggregatorFactory(NAME, FIELD_NAME, LG_K, TGT_HLL_TYPE, SHOULD_FINALIZE, ROUND); + targetNoRound = new HllSketchMergeAggregatorFactory(NAME, FIELD_NAME, LG_K, TGT_HLL_TYPE, SHOULD_FINALIZE, !ROUND); } @Test(expected = AggregatorFactoryNotMergeableException.class) @@ -51,6 +52,7 @@ public void testGetMergingFactoryBadName() throws Exception FIELD_NAME, LG_K, TGT_HLL_TYPE, + SHOULD_FINALIZE, ROUND ); targetRound.getMergingFactory(other); @@ -64,6 +66,7 @@ public void testGetMergingFactoryBadType() throws Exception FIELD_NAME, LG_K, TGT_HLL_TYPE, + SHOULD_FINALIZE, ROUND ); targetRound.getMergingFactory(other); @@ -78,6 +81,7 @@ public void testGetMergingFactoryOtherSmallerLgK() throws Exception FIELD_NAME, smallerLgK, TGT_HLL_TYPE, + SHOULD_FINALIZE, ROUND ); HllSketchAggregatorFactory result = (HllSketchAggregatorFactory) targetRound.getMergingFactory(other); @@ -93,6 +97,7 @@ public void testGetMergingFactoryOtherLargerLgK() throws Exception FIELD_NAME, largerLgK, TGT_HLL_TYPE, + SHOULD_FINALIZE, ROUND ); HllSketchAggregatorFactory result = (HllSketchAggregatorFactory) targetRound.getMergingFactory(other); @@ -108,6 +113,7 @@ public void testGetMergingFactoryOtherSmallerTgtHllType() throws Exception FIELD_NAME, LG_K, smallerTgtHllType, + SHOULD_FINALIZE, ROUND ); HllSketchAggregatorFactory result = (HllSketchAggregatorFactory) targetRound.getMergingFactory(other); @@ -123,6 +129,7 @@ public void testGetMergingFactoryOtherLargerTgtHllType() throws Exception FIELD_NAME, LG_K, largerTgtHllType, + SHOULD_FINALIZE, ROUND ); HllSketchAggregatorFactory result = (HllSketchAggregatorFactory) targetRound.getMergingFactory(other); diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchToEstimatePostAggregatorTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchToEstimatePostAggregatorTest.java index 4a8b5ec80427..2c4a31ef7164 100644 --- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchToEstimatePostAggregatorTest.java +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchToEstimatePostAggregatorTest.java @@ -93,6 +93,7 @@ public void testResultArraySignature() "col", null, null, + null, false ) ) diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchSqlAggregatorTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchSqlAggregatorTest.java index 42f272c66a52..1bcacdeccff6 100644 --- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchSqlAggregatorTest.java +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchSqlAggregatorTest.java @@ -36,6 +36,7 @@ import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory; import org.apache.druid.query.aggregation.FilteredAggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; +import org.apache.druid.query.aggregation.datasketches.SketchQueryContext; import org.apache.druid.query.aggregation.datasketches.hll.HllSketchBuildAggregatorFactory; import org.apache.druid.query.aggregation.datasketches.hll.HllSketchMergeAggregatorFactory; import org.apache.druid.query.aggregation.datasketches.hll.HllSketchModule; @@ -107,6 +108,7 @@ public SpecificSegmentsQuerySegmentWalker createQuerySegmentWalker( "dim1", null, null, + false, ROUND ) ) @@ -209,6 +211,7 @@ public void testApproxCountDistinctHllSketch() "dim2", null, null, + null, ROUND ), new FilteredAggregatorFactory( @@ -217,6 +220,7 @@ public void testApproxCountDistinctHllSketch() "dim2", null, null, + null, ROUND ), BaseCalciteQueryTest.not(BaseCalciteQueryTest.selector("dim2", "", null)) @@ -226,6 +230,7 @@ public void testApproxCountDistinctHllSketch() "v0", null, null, + null, ROUND ), new HllSketchBuildAggregatorFactory( @@ -233,10 +238,11 @@ public void testApproxCountDistinctHllSketch() "v1", null, null, + null, ROUND ), - new HllSketchMergeAggregatorFactory("a5", "hllsketch_dim1", 21, "HLL_8", ROUND), - new HllSketchMergeAggregatorFactory("a6", "hllsketch_dim1", null, null, ROUND) + new HllSketchMergeAggregatorFactory("a5", "hllsketch_dim1", 21, "HLL_8", null, ROUND), + new HllSketchMergeAggregatorFactory("a6", "hllsketch_dim1", null, null, null, ROUND) ) ) .context(QUERY_CONTEXT_DEFAULT) @@ -284,6 +290,7 @@ public void testAvgDailyCountDistinctHllSketch() "cnt", null, null, + null, ROUND ) ) @@ -361,7 +368,7 @@ public void testApproxCountDistinctHllSketchIsRounded() .setGranularity(Granularities.ALL) .setAggregatorSpecs( aggregators( - new HllSketchBuildAggregatorFactory("a0", "m1", null, null, true) + new HllSketchBuildAggregatorFactory("a0", "m1", null, null, null, true) ) ) .setHavingSpec(having(selector("a0", "2", null))) @@ -445,6 +452,7 @@ public void testHllSketchPostAggs() "dim2", null, null, + false, true ), new HllSketchBuildAggregatorFactory( @@ -452,6 +460,7 @@ public void testHllSketchPostAggs() "m1", null, null, + false, true ), new HllSketchBuildAggregatorFactory( @@ -459,6 +468,7 @@ public void testHllSketchPostAggs() "v0", null, null, + false, true ), new HllSketchBuildAggregatorFactory( @@ -466,6 +476,7 @@ public void testHllSketchPostAggs() "v1", null, null, + false, true ), new HllSketchBuildAggregatorFactory( @@ -473,39 +484,37 @@ public void testHllSketchPostAggs() "dim2", null, null, + null, true ) ) ) .postAggregators( ImmutableList.of( - new FieldAccessPostAggregator("p0", "a0"), - new FieldAccessPostAggregator("p1", "a1"), + new HllSketchToEstimatePostAggregator("p1", new FieldAccessPostAggregator("p0", "a0"), false), new HllSketchToEstimatePostAggregator("p3", new FieldAccessPostAggregator("p2", "a0"), false), - new HllSketchToEstimatePostAggregator("p5", new FieldAccessPostAggregator("p4", "a0"), false), - new ExpressionPostAggregator("p6", "(\"p5\" + 1)", null, TestExprMacroTable.INSTANCE), - new HllSketchToEstimatePostAggregator("p8", new FieldAccessPostAggregator("p7", "a2"), false), + new ExpressionPostAggregator("p4", "(\"p3\" + 1)", null, TestExprMacroTable.INSTANCE), + new HllSketchToEstimatePostAggregator("p6", new FieldAccessPostAggregator("p5", "a2"), false), new HllSketchToEstimatePostAggregator( - "p10", - new FieldAccessPostAggregator("p9", "a0"), + "p8", + new FieldAccessPostAggregator("p7", "a0"), false ), - new ExpressionPostAggregator("p11", "abs(\"p10\")", null, TestExprMacroTable.INSTANCE), + new ExpressionPostAggregator("p9", "abs(\"p8\")", null, TestExprMacroTable.INSTANCE), new HllSketchToEstimateWithBoundsPostAggregator( - "p13", - new FieldAccessPostAggregator("p12", "a0"), + "p11", + new FieldAccessPostAggregator("p10", "a0"), 2 ), new HllSketchToEstimateWithBoundsPostAggregator( - "p15", - new FieldAccessPostAggregator("p14", "a0"), + "p13", + new FieldAccessPostAggregator("p12", "a0"), 1 ), - new FieldAccessPostAggregator("p16", "a3"), - new HllSketchToStringPostAggregator("p18", new FieldAccessPostAggregator("p17", "a0")), - new HllSketchToStringPostAggregator("p20", new FieldAccessPostAggregator("p19", "a0")), - new ExpressionPostAggregator("p21", "upper(\"p20\")", null, TestExprMacroTable.INSTANCE), - new HllSketchToEstimatePostAggregator("p23", new FieldAccessPostAggregator("p22", "a0"), true) + new HllSketchToStringPostAggregator("p15", new FieldAccessPostAggregator("p14", "a0")), + new HllSketchToStringPostAggregator("p17", new FieldAccessPostAggregator("p16", "a0")), + new ExpressionPostAggregator("p18", "upper(\"p17\")", null, TestExprMacroTable.INSTANCE), + new HllSketchToEstimatePostAggregator("p20", new FieldAccessPostAggregator("p19", "a0"), true) ) ) .context(QUERY_CONTEXT_DEFAULT) @@ -531,6 +540,167 @@ public void testHllSketchPostAggs() ); } + @Test + public void testHllSketchPostAggsFinalizeOuterSketches() + { + final ImmutableMap queryContext = + ImmutableMap.builder() + .putAll(QUERY_CONTEXT_DEFAULT) + .put(SketchQueryContext.CTX_FINALIZE_OUTER_SKETCHES, true) + .build(); + + final String sketchSummary = "### HLL SKETCH SUMMARY: \n" + + " Log Config K : 12\n" + + " Hll Target : HLL_4\n" + + " Current Mode : LIST\n" + + " Memory : false\n" + + " LB : 2.0\n" + + " Estimate : 2.000000004967054\n" + + " UB : 2.000099863468538\n" + + " OutOfOrder Flag: false\n" + + " Coupon Count : 2\n"; + + final String otherSketchSummary = "### HLL SKETCH SUMMARY: \n" + + " LOG CONFIG K : 12\n" + + " HLL TARGET : HLL_4\n" + + " CURRENT MODE : LIST\n" + + " MEMORY : FALSE\n" + + " LB : 2.0\n" + + " ESTIMATE : 2.000000004967054\n" + + " UB : 2.000099863468538\n" + + " OUTOFORDER FLAG: FALSE\n" + + " COUPON COUNT : 2\n"; + testQuery( + "SELECT\n" + + " DS_HLL(dim2),\n" + + " DS_HLL(m1),\n" + + " HLL_SKETCH_ESTIMATE(DS_HLL(dim2)),\n" + + " HLL_SKETCH_ESTIMATE(DS_HLL(dim2)) + 1,\n" + + " HLL_SKETCH_ESTIMATE(DS_HLL(CONCAT(dim2, 'hello'))),\n" + + " ABS(HLL_SKETCH_ESTIMATE(DS_HLL(dim2))),\n" + + " HLL_SKETCH_ESTIMATE_WITH_ERROR_BOUNDS(DS_HLL(dim2), 2),\n" + + " HLL_SKETCH_ESTIMATE_WITH_ERROR_BOUNDS(DS_HLL(dim2)),\n" + + " DS_HLL(POWER(ABS(m1 + 100), 2)),\n" + + " APPROX_COUNT_DISTINCT_DS_HLL(dim2),\n" + + " HLL_SKETCH_TO_STRING(DS_HLL(dim2)),\n" + + " UPPER(HLL_SKETCH_TO_STRING(DS_HLL(dim2))),\n" + + " HLL_SKETCH_ESTIMATE(DS_HLL(dim2), true)\n" + + "FROM druid.foo", + queryContext, + ImmutableList.of( + Druids.newTimeseriesQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity()))) + .granularity(Granularities.ALL) + .virtualColumns( + new ExpressionVirtualColumn( + "v0", + "concat(\"dim2\",'hello')", + ColumnType.STRING, + TestExprMacroTable.INSTANCE + ), + new ExpressionVirtualColumn( + "v1", + "pow(abs((\"m1\" + 100)),2)", + ColumnType.DOUBLE, + TestExprMacroTable.INSTANCE + ) + ) + .aggregators( + ImmutableList.of( + new HllSketchBuildAggregatorFactory( + "a0", + "dim2", + null, + null, + null, + true + ), + new HllSketchBuildAggregatorFactory( + "a1", + "m1", + null, + null, + null, + true + ), + new HllSketchBuildAggregatorFactory( + "a2", + "v0", + null, + null, + null, + true + ), + new HllSketchBuildAggregatorFactory( + "a3", + "v1", + null, + null, + null, + true + ), + new HllSketchBuildAggregatorFactory( + "a4", + "dim2", + null, + null, + null, + true + ) + ) + ) + .postAggregators( + ImmutableList.of( + new HllSketchToEstimatePostAggregator("p1", new FieldAccessPostAggregator("p0", "a0"), false), + new HllSketchToEstimatePostAggregator("p3", new FieldAccessPostAggregator("p2", "a0"), false), + new ExpressionPostAggregator("p4", "(\"p3\" + 1)", null, TestExprMacroTable.INSTANCE), + new HllSketchToEstimatePostAggregator("p6", new FieldAccessPostAggregator("p5", "a2"), false), + new HllSketchToEstimatePostAggregator( + "p8", + new FieldAccessPostAggregator("p7", "a0"), + false + ), + new ExpressionPostAggregator("p9", "abs(\"p8\")", null, TestExprMacroTable.INSTANCE), + new HllSketchToEstimateWithBoundsPostAggregator( + "p11", + new FieldAccessPostAggregator("p10", "a0"), + 2 + ), + new HllSketchToEstimateWithBoundsPostAggregator( + "p13", + new FieldAccessPostAggregator("p12", "a0"), + 1 + ), + new HllSketchToStringPostAggregator("p15", new FieldAccessPostAggregator("p14", "a0")), + new HllSketchToStringPostAggregator("p17", new FieldAccessPostAggregator("p16", "a0")), + new ExpressionPostAggregator("p18", "upper(\"p17\")", null, TestExprMacroTable.INSTANCE), + new HllSketchToEstimatePostAggregator("p20", new FieldAccessPostAggregator("p19", "a0"), true) + ) + ) + .context(queryContext) + .build() + ), + ImmutableList.of( + new Object[]{ + "2", + "6", + 2.000000004967054d, + 3.000000004967054d, + 3.000000014901161d, + 2.000000004967054d, + "[2.000000004967054,2.0,2.0001997319422404]", + "[2.000000004967054,2.0,2.000099863468538]", + "6", + 2L, + sketchSummary, + otherSketchSummary, + 2.0 + } + ) + ); + } + @Test public void testtHllSketchPostAggsPostSort() { @@ -561,16 +731,16 @@ public void testtHllSketchPostAggsPostSort() "dim2", null, null, + false, true ) ) ) .postAggregators( ImmutableList.of( - new FieldAccessPostAggregator("p0", "a0"), - new HllSketchToEstimatePostAggregator("p2", new FieldAccessPostAggregator("p1", "a0"), false), - new HllSketchToEstimatePostAggregator("s1", new FieldAccessPostAggregator("s0", "p0"), false), - new HllSketchToStringPostAggregator("s3", new FieldAccessPostAggregator("s2", "p0")) + new HllSketchToEstimatePostAggregator("p1", new FieldAccessPostAggregator("p0", "a0"), false), + new HllSketchToEstimatePostAggregator("s1", new FieldAccessPostAggregator("s0", "a0"), false), + new HllSketchToStringPostAggregator("s3", new FieldAccessPostAggregator("s2", "a0")) ) ) .context(QUERY_CONTEXT_DEFAULT) @@ -606,6 +776,7 @@ public void testEmptyTimeseriesResults() "dim2", null, null, + null, true ), new HllSketchBuildAggregatorFactory( @@ -613,13 +784,14 @@ public void testEmptyTimeseriesResults() "dim2", null, null, + false, true ) ) ) .context(QUERY_CONTEXT_DEFAULT) .build()), - ImmutableList.of(new Object[]{0L, "0"}) + ImmutableList.of(new Object[]{0L, "\"AgEHDAMMAAA=\""}) ); } @@ -648,6 +820,7 @@ public void testGroupByAggregatorDefaultValues() "v0", null, null, + null, true ), selector("dim1", "nonexistent", null) @@ -658,6 +831,7 @@ public void testGroupByAggregatorDefaultValues() "v0", null, null, + false, true ), selector("dim1", "nonexistent", null) @@ -667,6 +841,63 @@ public void testGroupByAggregatorDefaultValues() .setContext(QUERY_CONTEXT_DEFAULT) .build() ), + ImmutableList.of(new Object[]{"a", 0L, "\"AgEHDAMMAAA=\""}) + ); + } + + @Test + public void testGroupByAggregatorDefaultValuesFinalizeOuterSketches() + { + final ImmutableMap queryContext = + ImmutableMap.builder() + .putAll(QUERY_CONTEXT_DEFAULT) + .put(SketchQueryContext.CTX_FINALIZE_OUTER_SKETCHES, true) + .build(); + + testQuery( + "SELECT\n" + + "dim2,\n" + + "APPROX_COUNT_DISTINCT_DS_HLL(dim2) FILTER(WHERE dim1 = 'nonexistent')," + + "DS_HLL(dim2) FILTER(WHERE dim1 = 'nonexistent')" + + "FROM foo WHERE dim2 = 'a' GROUP BY dim2", + queryContext, + ImmutableList.of( + GroupByQuery.builder() + .setDataSource(CalciteTests.DATASOURCE1) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setDimFilter(selector("dim2", "a", null)) + .setGranularity(Granularities.ALL) + .setVirtualColumns(expressionVirtualColumn("v0", "'a'", ColumnType.STRING)) + .setDimensions(new DefaultDimensionSpec("v0", "d0", ColumnType.STRING)) + .setAggregatorSpecs( + aggregators( + new FilteredAggregatorFactory( + new HllSketchBuildAggregatorFactory( + "a0", + "v0", + null, + null, + null, + true + ), + selector("dim1", "nonexistent", null) + ), + new FilteredAggregatorFactory( + new HllSketchBuildAggregatorFactory( + "a1", + "v0", + null, + null, + null, + true + ), + selector("dim1", "nonexistent", null) + ) + ) + ) + .setContext(queryContext) + .build() + ), ImmutableList.of(new Object[]{"a", 0L, "0"}) ); } diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactoryTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactoryTest.java index fb09425341b4..e628b153e3ea 100644 --- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactoryTest.java +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactoryTest.java @@ -59,7 +59,8 @@ public void testSerde() throws IOException "myFactory", "myField", 1024, - 1000L + 1000L, + null ); final byte[] json = mapper.writeValueAsBytes(factory); final DoublesSketchAggregatorFactory fromJson = (DoublesSketchAggregatorFactory) mapper.readValue( @@ -76,6 +77,7 @@ public void testDefaultParams() "myFactory", "myField", null, + null, null ); @@ -90,6 +92,7 @@ public void testGuessAggregatorHeapFootprint() "myFactory", "myField", 128, + null, null ); Assert.assertEquals(64, factory.guessAggregatorHeapFootprint(1)); @@ -105,6 +108,7 @@ public void testMaxIntermediateSize() "myFactory", "myField", 128, + null, null ); Assert.assertEquals(24608L, factory.getMaxIntermediateSize()); @@ -113,7 +117,8 @@ public void testMaxIntermediateSize() "myFactory", "myField", 128, - 1_000_000_000_000L + 1_000_000_000_000L, + null ); Assert.assertEquals(34848L, factory.getMaxIntermediateSize()); } @@ -161,7 +166,8 @@ public void testWithName() "myFactory", "myField", 1024, - 1000L + 1000L, + null ); Assert.assertEquals(factory, factory.withName("myFactory")); Assert.assertEquals("newTest", factory.withName("newTest").getName()); diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeAggregatorFactoryTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeAggregatorFactoryTest.java index facf45b18745..66fc28102b9d 100644 --- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeAggregatorFactoryTest.java +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeAggregatorFactoryTest.java @@ -51,7 +51,8 @@ public void testSerde() throws IOException final DoublesSketchMergeAggregatorFactory factory = new DoublesSketchMergeAggregatorFactory( "myFactory", 1024, - 1000L + 1000L, + null ); final byte[] json = mapper.writeValueAsBytes(factory); final DoublesSketchMergeAggregatorFactory fromJson = (DoublesSketchMergeAggregatorFactory) mapper.readValue( @@ -67,7 +68,8 @@ public void testWithName() final DoublesSketchMergeAggregatorFactory factory = new DoublesSketchMergeAggregatorFactory( "myFactory", 1024, - 1000L + 1000L, + null ); Assert.assertEquals(factory, factory.withName("myFactory")); Assert.assertEquals("newTest", factory.withName("newTest").getName()); diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java index 3d71c0b17374..81ae1a637b9b 100644 --- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.Module; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import org.apache.druid.common.config.NullHandling; @@ -34,6 +35,7 @@ import org.apache.druid.query.aggregation.FilteredAggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; import org.apache.druid.query.aggregation.PostAggregator; +import org.apache.druid.query.aggregation.datasketches.SketchQueryContext; import org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchAggregatorFactory; import org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchModule; import org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchToCDFPostAggregator; @@ -543,8 +545,8 @@ public void testDoublesSketchPostAggs() .aggregators(ImmutableList.of( new LongSumAggregatorFactory("a0", "cnt"), new DoublesSketchAggregatorFactory("a1:agg", "cnt", 128), - new DoublesSketchAggregatorFactory("a2:agg", "cnt", 128), - new DoublesSketchAggregatorFactory("a3:agg", "v0", 128) + new DoublesSketchAggregatorFactory("a2:agg", "cnt", 128, null, false), + new DoublesSketchAggregatorFactory("a3:agg", "v0", 128, null, false) )) .postAggregators( new DoublesSketchToQuantilePostAggregator( @@ -696,25 +698,24 @@ public void testDoublesSketchPostAggsPostSort() .granularity(Granularities.ALL) .aggregators( ImmutableList.of( - new DoublesSketchAggregatorFactory("a0:agg", "m1", 128) + new DoublesSketchAggregatorFactory("a0:agg", "m1", 128, null, false) ) ) .postAggregators( ImmutableList.of( - new FieldAccessPostAggregator("p0", "a0:agg"), new DoublesSketchToQuantilePostAggregator( - "p2", - new FieldAccessPostAggregator("p1", "a0:agg"), + "p1", + new FieldAccessPostAggregator("p0", "a0:agg"), 0.5 ), new DoublesSketchToQuantilePostAggregator( "s1", - new FieldAccessPostAggregator("s0", "p0"), + new FieldAccessPostAggregator("s0", "a0:agg"), 0.5 ), new DoublesSketchToQuantilePostAggregator( "s3", - new FieldAccessPostAggregator("s2", "p0"), + new FieldAccessPostAggregator("s2", "a0:agg"), 0.9800000190734863 ) ) @@ -750,8 +751,8 @@ public void testEmptyTimeseriesResults() .aggregators(ImmutableList.of( new DoublesSketchAggregatorFactory("a0:agg", "m1", null), new DoublesSketchAggregatorFactory("a1:agg", "qsketch_m1", null), - new DoublesSketchAggregatorFactory("a2:agg", "m1", null), - new DoublesSketchAggregatorFactory("a3:agg", "qsketch_m1", null) + new DoublesSketchAggregatorFactory("a2:agg", "m1", null, null, false), + new DoublesSketchAggregatorFactory("a3:agg", "qsketch_m1", null, null, false) )) .postAggregators( new DoublesSketchToQuantilePostAggregator("a0", makeFieldAccessPostAgg("a0:agg"), 0.01f), @@ -760,6 +761,53 @@ public void testEmptyTimeseriesResults() .context(QUERY_CONTEXT_DEFAULT) .build() ), + ImmutableList.of( + new Object[]{ + Double.NaN, + Double.NaN, + "\"AQMIHoAAAAA=\"", + "\"AQMIHoAAAAA=\"" + } + ) + ); + } + + @Test + public void testEmptyTimeseriesResultsWithFinalizeSketches() + { + final ImmutableMap queryContext = + ImmutableMap.builder() + .putAll(QUERY_CONTEXT_DEFAULT) + .put(SketchQueryContext.CTX_FINALIZE_OUTER_SKETCHES, true) + .build(); + + testQuery( + "SELECT\n" + + "APPROX_QUANTILE_DS(m1, 0.01),\n" + + "APPROX_QUANTILE_DS(qsketch_m1, 0.01),\n" + + "DS_QUANTILES_SKETCH(m1),\n" + + "DS_QUANTILES_SKETCH(qsketch_m1)\n" + + "FROM foo WHERE dim2 = 0", + queryContext, + Collections.singletonList( + Druids.newTimeseriesQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity()))) + .granularity(Granularities.ALL) + .filters(bound("dim2", "0", "0", false, false, null, StringComparators.NUMERIC)) + .aggregators(ImmutableList.of( + new DoublesSketchAggregatorFactory("a0:agg", "m1", null), + new DoublesSketchAggregatorFactory("a1:agg", "qsketch_m1", null), + new DoublesSketchAggregatorFactory("a2:agg", "m1", null, null, true), + new DoublesSketchAggregatorFactory("a3:agg", "qsketch_m1", null, null, true) + )) + .postAggregators( + new DoublesSketchToQuantilePostAggregator("a0", makeFieldAccessPostAgg("a0:agg"), 0.01f), + new DoublesSketchToQuantilePostAggregator("a1", makeFieldAccessPostAgg("a1:agg"), 0.01f) + ) + .context(queryContext) + .build() + ), ImmutableList.of( new Object[]{ Double.NaN, @@ -801,24 +849,99 @@ public void testGroupByAggregatorDefaultValues() selector("dim1", "nonexistent", null) ), new FilteredAggregatorFactory( - new DoublesSketchAggregatorFactory("a2:agg", "m1", null), + new DoublesSketchAggregatorFactory("a2:agg", "m1", null, null, false), selector("dim1", "nonexistent", null) ), new FilteredAggregatorFactory( - new DoublesSketchAggregatorFactory("a3:agg", "qsketch_m1", null), + new DoublesSketchAggregatorFactory("a3:agg", "qsketch_m1", null, null, false), selector("dim1", "nonexistent", null) ) ) ) .setPostAggregatorSpecs( ImmutableList.of( - new DoublesSketchToQuantilePostAggregator("a0", makeFieldAccessPostAgg("a0:agg"), 0.01f), + new DoublesSketchToQuantilePostAggregator( + "a0", + makeFieldAccessPostAgg("a0:agg"), + 0.01f + ), new DoublesSketchToQuantilePostAggregator("a1", makeFieldAccessPostAgg("a1:agg"), 0.01f) ) ) .setContext(QUERY_CONTEXT_DEFAULT) .build() ), + ImmutableList.of( + new Object[]{ + "a", + Double.NaN, + Double.NaN, + "\"AQMIHoAAAAA=\"", + "\"AQMIHoAAAAA=\"" + } + ) + ); + } + + @Test + public void testGroupByAggregatorDefaultValuesWithFinalizeSketches() + { + final ImmutableMap queryContext = + ImmutableMap.builder() + .putAll(QUERY_CONTEXT_DEFAULT) + .put(SketchQueryContext.CTX_FINALIZE_OUTER_SKETCHES, true) + .build(); + + testQuery( + "SELECT\n" + + "dim2,\n" + + "APPROX_QUANTILE_DS(m1, 0.01) FILTER(WHERE dim1 = 'nonexistent'),\n" + + "APPROX_QUANTILE_DS(qsketch_m1, 0.01) FILTER(WHERE dim1 = 'nonexistent'),\n" + + "DS_QUANTILES_SKETCH(m1) FILTER(WHERE dim1 = 'nonexistent'),\n" + + "DS_QUANTILES_SKETCH(qsketch_m1) FILTER(WHERE dim1 = 'nonexistent')\n" + + "FROM foo WHERE dim2 = 'a' GROUP BY dim2", + queryContext, + ImmutableList.of( + GroupByQuery.builder() + .setDataSource(CalciteTests.DATASOURCE1) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setDimFilter(selector("dim2", "a", null)) + .setGranularity(Granularities.ALL) + .setVirtualColumns(expressionVirtualColumn("v0", "'a'", ColumnType.STRING)) + .setDimensions(new DefaultDimensionSpec("v0", "d0", ColumnType.STRING)) + .setAggregatorSpecs( + aggregators( + new FilteredAggregatorFactory( + new DoublesSketchAggregatorFactory("a0:agg", "m1", null), + selector("dim1", "nonexistent", null) + ), + new FilteredAggregatorFactory( + new DoublesSketchAggregatorFactory("a1:agg", "qsketch_m1", null), + selector("dim1", "nonexistent", null) + ), + new FilteredAggregatorFactory( + new DoublesSketchAggregatorFactory("a2:agg", "m1", null, null, true), + selector("dim1", "nonexistent", null) + ), + new FilteredAggregatorFactory( + new DoublesSketchAggregatorFactory("a3:agg", "qsketch_m1", null, null, true), + selector("dim1", "nonexistent", null) + ) + ) + ) + .setPostAggregatorSpecs( + ImmutableList.of( + new DoublesSketchToQuantilePostAggregator( + "a0", + makeFieldAccessPostAgg("a0:agg"), + 0.01f + ), + new DoublesSketchToQuantilePostAggregator("a1", makeFieldAccessPostAgg("a1:agg"), 0.01f) + ) + ) + .setContext(queryContext) + .build() + ), ImmutableList.of( new Object[]{ "a", @@ -851,8 +974,8 @@ public void testSuccessWithSmallMaxStreamLength() .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity()))) .granularity(Granularities.ALL) .aggregators(ImmutableList.of( - new DoublesSketchAggregatorFactory("a0:agg", "m1", null, 1L), - new DoublesSketchAggregatorFactory("a1:agg", "cnt", null, 1L) + new DoublesSketchAggregatorFactory("a0:agg", "m1", null, 1L, null), + new DoublesSketchAggregatorFactory("a1:agg", "cnt", null, 1L, null) )) .postAggregators( new DoublesSketchToQuantilePostAggregator("a0", makeFieldAccessPostAgg("a0:agg"), 0.01f), @@ -860,13 +983,13 @@ public void testSuccessWithSmallMaxStreamLength() ) .context(context) .build() - ), - ImmutableList.of( - new Object[]{ - 1.0, - 1.0 - } - ) + ), + ImmutableList.of( + new Object[]{ + 1.0, + 1.0 + } + ) ); } diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchSqlAggregatorTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchSqlAggregatorTest.java index ef2fea417735..784166afa78b 100644 --- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchSqlAggregatorTest.java +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchSqlAggregatorTest.java @@ -21,12 +21,15 @@ import com.fasterxml.jackson.databind.Module; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import org.apache.druid.common.config.NullHandling; +import org.apache.druid.guice.ExpressionModule; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.PeriodGranularity; +import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.query.Druids; import org.apache.druid.query.QueryDataSource; import org.apache.druid.query.QueryRunnerFactoryConglomerate; @@ -34,6 +37,7 @@ import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory; import org.apache.druid.query.aggregation.FilteredAggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; +import org.apache.druid.query.aggregation.datasketches.SketchQueryContext; import org.apache.druid.query.aggregation.datasketches.theta.SketchEstimatePostAggregator; import org.apache.druid.query.aggregation.datasketches.theta.SketchMergeAggregatorFactory; import org.apache.druid.query.aggregation.datasketches.theta.SketchModule; @@ -42,7 +46,6 @@ import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator; import org.apache.druid.query.aggregation.post.FinalizingFieldAccessPostAggregator; import org.apache.druid.query.dimension.DefaultDimensionSpec; -import org.apache.druid.query.expression.TestExprMacroTable; import org.apache.druid.query.groupby.GroupByQuery; import org.apache.druid.query.ordering.StringComparators; import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; @@ -64,9 +67,11 @@ import org.apache.druid.timeline.partition.LinearShardSpec; import org.joda.time.DateTimeZone; import org.joda.time.Period; +import org.junit.Before; import org.junit.Test; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -75,12 +80,30 @@ public class ThetaSketchSqlAggregatorTest extends BaseCalciteQueryTest { private static final String DATA_SOURCE = "foo"; + private ExprMacroTable macroTable; + + @Before + public void setUp() + { + macroTable = createMacroTable(); + } + @Override public Iterable getJacksonModules() { return Iterables.concat(super.getJacksonModules(), new SketchModule().getJacksonModules()); } + @Override + public ExprMacroTable createMacroTable() + { + final List exprMacros = new ArrayList<>(); + for (Class clazz : ExpressionModule.EXPR_MACROS) { + exprMacros.add(CalciteTests.INJECTOR.getInstance(clazz)); + } + return new ExprMacroTable(exprMacros); + } + @Override public SpecificSegmentsQuerySegmentWalker createQuerySegmentWalker( QueryRunnerFactoryConglomerate conglomerate @@ -210,13 +233,13 @@ public void testApproxCountDistinctThetaSketch() "v0", "substring(\"dim2\", 0, 1)", ColumnType.STRING, - TestExprMacroTable.INSTANCE + macroTable ), new ExpressionVirtualColumn( "v1", "concat(substring(\"dim2\", 0, 1),'x')", ColumnType.STRING, - TestExprMacroTable.INSTANCE + macroTable ) ) .aggregators( @@ -419,7 +442,184 @@ public void testThetaSketchPostAggs() "v0", "concat(\"dim2\",'hello')", ColumnType.STRING, - TestExprMacroTable.INSTANCE + macroTable + ) + ) + .aggregators( + ImmutableList.of( + new LongSumAggregatorFactory("a0", "cnt"), + new SketchMergeAggregatorFactory( + "a1", + "dim2", + null, + false, + null, + null + ), + new SketchMergeAggregatorFactory( + "a2", + "v0", + null, + false, + null, + null + ), + new SketchMergeAggregatorFactory( + "a3", + "dim1", + null, + false, + null, + null + ) + ) + ) + .postAggregators( + new SketchEstimatePostAggregator( + "p1", + new FieldAccessPostAggregator("p0", "a1"), + null + ), + new SketchEstimatePostAggregator( + "p3", + new FieldAccessPostAggregator("p2", "a2"), + null + ), + new SketchEstimatePostAggregator( + "p5", + new FieldAccessPostAggregator("p4", "a1"), + 10 + ), + new SketchSetPostAggregator( + "p8", + "INTERSECT", + null, + ImmutableList.of( + new FieldAccessPostAggregator("p6", "a1"), + new FieldAccessPostAggregator("p7", "a3") + ) + ), + new SketchSetPostAggregator( + "p11", + "UNION", + null, + ImmutableList.of( + new FieldAccessPostAggregator("p9", "a1"), + new FieldAccessPostAggregator("p10", "a3") + ) + ), + new SketchSetPostAggregator( + "p14", + "NOT", + null, + ImmutableList.of( + new FieldAccessPostAggregator("p12", "a1"), + new FieldAccessPostAggregator("p13", "a3") + ) + ), + new SketchSetPostAggregator( + "p17", + "INTERSECT", + 32768, + ImmutableList.of( + new FieldAccessPostAggregator("p15", "a1"), + new FieldAccessPostAggregator("p16", "a3") + ) + ), + new SketchEstimatePostAggregator( + "p23", + new SketchSetPostAggregator( + "p22", + "INTERSECT", + null, + ImmutableList.of( + new SketchSetPostAggregator( + "p20", + "INTERSECT", + null, + ImmutableList.of( + new FieldAccessPostAggregator("p18", "a1"), + new FieldAccessPostAggregator("p19", "a3") + ) + ), + new FieldAccessPostAggregator("p21", "a1") + ) + ), + null + ) + ) + .context(QUERY_CONTEXT_DEFAULT) + .build() + ), + expectedResults + ); + } + + @Test + public void testThetaSketchPostAggsFinalizeOuterSketches() + { + final ImmutableMap queryContext = + ImmutableMap.builder() + .putAll(QUERY_CONTEXT_DEFAULT) + .put(SketchQueryContext.CTX_FINALIZE_OUTER_SKETCHES, true) + .build(); + + final List expectedResults; + + if (NullHandling.replaceWithDefault()) { + expectedResults = ImmutableList.of( + new Object[]{ + 6L, + 2.0d, + 3.0d, + "{\"estimate\":2.0,\"highBound\":2.0,\"lowBound\":2.0,\"numStdDev\":10}", + "\"AQMDAAA6zJOQxkPsNomrZQ==\"", + "\"AgMDAAAazJMGAAAAAACAP1XTBztMIcMJ+HOoBBne1zKQxkPsNomrZUeWbJt3n+VpF8EdUoUHAXvxsLkOSE0lfQ==\"", + "\"AQMDAAA6zJMXwR1ShQcBew==\"", + "\"AQMDAAA6zJOQxkPsNomrZQ==\"", + 1.0d + } + ); + } else { + expectedResults = ImmutableList.of( + new Object[]{ + 6L, + 2.0d, + 3.0d, + "{\"estimate\":2.0,\"highBound\":2.0,\"lowBound\":2.0,\"numStdDev\":10}", + "\"AQMDAAA6zJOQxkPsNomrZQ==\"", + "\"AgMDAAAazJMGAAAAAACAP1XTBztMIcMJ+HOoBBne1zKQxkPsNomrZUeWbJt3n+VpF8EdUoUHAXvxsLkOSE0lfQ==\"", + "\"AQMDAAA6zJMXwR1ShQcBew==\"", + "\"AQMDAAA6zJOQxkPsNomrZQ==\"", + 1.0d + } + ); + } + + testQuery( + "SELECT\n" + + " SUM(cnt),\n" + + " theta_sketch_estimate(DS_THETA(dim2)),\n" + + " theta_sketch_estimate(DS_THETA(CONCAT(dim2, 'hello'))),\n" + + " theta_sketch_estimate_with_error_bounds(DS_THETA(dim2), 10),\n" + + " THETA_SKETCH_INTERSECT(DS_THETA(dim2), DS_THETA(dim1)),\n" + + " THETA_SKETCH_UNION(DS_THETA(dim2), DS_THETA(dim1)),\n" + + " THETA_SKETCH_NOT(DS_THETA(dim2), DS_THETA(dim1)),\n" + + " THETA_SKETCH_INTERSECT(32768, DS_THETA(dim2), DS_THETA(dim1)),\n" + + " theta_sketch_estimate(THETA_SKETCH_INTERSECT(THETA_SKETCH_INTERSECT(DS_THETA(dim2), DS_THETA(dim1)), DS_THETA(dim2)))\n" + + "FROM druid.foo", + queryContext, + ImmutableList.of( + Druids.newTimeseriesQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity()))) + .granularity(Granularities.ALL) + .virtualColumns( + new ExpressionVirtualColumn( + "v0", + "concat(\"dim2\",'hello')", + ColumnType.STRING, + macroTable ) ) .aggregators( @@ -525,7 +725,7 @@ public void testThetaSketchPostAggs() null ) ) - .context(QUERY_CONTEXT_DEFAULT) + .context(queryContext) .build() ), expectedResults @@ -555,22 +755,21 @@ public void testThetaSketchPostAggsPostSort() "a0", "dim2", null, - null, + false, null, null ) ) ) .postAggregators( - new FieldAccessPostAggregator("p0", "a0"), new SketchEstimatePostAggregator( - "p2", - new FieldAccessPostAggregator("p1", "a0"), + "p1", + new FieldAccessPostAggregator("p0", "a0"), null ), new SketchEstimatePostAggregator( "s1", - new FieldAccessPostAggregator("s0", "p0"), + new FieldAccessPostAggregator("s0", "a0"), null ) ) @@ -582,6 +781,62 @@ public void testThetaSketchPostAggsPostSort() ); } + @Test + public void testThetaSketchPostAggsPostSortFinalizeOuterSketches() + { + final String sql = "SELECT DS_THETA(dim2) as y FROM druid.foo ORDER BY THETA_SKETCH_ESTIMATE(DS_THETA(dim2)) DESC LIMIT 10"; + + final ImmutableMap queryContext = + ImmutableMap.builder() + .putAll(QUERY_CONTEXT_DEFAULT) + .put(SketchQueryContext.CTX_FINALIZE_OUTER_SKETCHES, true) + .build(); + + final List expectedResults = ImmutableList.of( + new Object[]{ + 2.0d + } + ); + + testQuery( + StringUtils.format("SELECT THETA_SKETCH_ESTIMATE(y) from (%s)", sql), + queryContext, + ImmutableList.of(Druids.newTimeseriesQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity()))) + .granularity(Granularities.ALL) + .aggregators( + ImmutableList.of( + new SketchMergeAggregatorFactory( + "a0", + "dim2", + null, + null, + null, + null + ) + ) + ) + .postAggregators( + new SketchEstimatePostAggregator( + "p1", + new FieldAccessPostAggregator("p0", "a0"), + null + ), + new SketchEstimatePostAggregator( + "s1", + new FieldAccessPostAggregator("s0", "a0"), + null + ) + ) + .context(queryContext) + .build() + + ), + expectedResults + ); + } + @Test public void testEmptyTimeseriesResults() { @@ -620,7 +875,7 @@ public void testEmptyTimeseriesResults() "a2", "dim2", 1024, - null, + false, null, null ), @@ -628,7 +883,7 @@ public void testEmptyTimeseriesResults() "a3", "thetasketch_dim1", 1024, - null, + false, null, null ) @@ -637,7 +892,7 @@ public void testEmptyTimeseriesResults() .context(QUERY_CONTEXT_DEFAULT) .build() ), - ImmutableList.of(new Object[]{0L, 0L, "0.0", "0.0"}) + ImmutableList.of(new Object[]{0L, 0L, "\"AQMDAAAeAAA=\"", "\"AQMDAAAeAAA=\""}) ); } @@ -667,7 +922,7 @@ public void testGroupByAggregatorDefaultValues() "a0", "v0", null, - null, + true, null, null ), @@ -678,7 +933,7 @@ public void testGroupByAggregatorDefaultValues() "a1", "thetasketch_dim1", null, - null, + true, null, null ), @@ -689,7 +944,7 @@ public void testGroupByAggregatorDefaultValues() "a2", "v0", 1024, - null, + false, null, null ), @@ -700,7 +955,7 @@ public void testGroupByAggregatorDefaultValues() "a3", "thetasketch_dim1", 1024, - null, + false, null, null ), @@ -711,6 +966,87 @@ public void testGroupByAggregatorDefaultValues() .setContext(QUERY_CONTEXT_DEFAULT) .build() ), + ImmutableList.of(new Object[]{"a", 0L, 0L, "\"AQMDAAAeAAA=\"", "\"AQMDAAAeAAA=\""}) + ); + } + + @Test + public void testGroupByAggregatorDefaultValuesFinalizeOuterSketches() + { + final ImmutableMap queryContext = + ImmutableMap.builder() + .putAll(QUERY_CONTEXT_DEFAULT) + .put(SketchQueryContext.CTX_FINALIZE_OUTER_SKETCHES, true) + .build(); + + testQuery( + "SELECT\n" + + "dim2,\n" + + " APPROX_COUNT_DISTINCT_DS_THETA(dim2) FILTER(WHERE dim1 = 'nonexistent'),\n" + + " APPROX_COUNT_DISTINCT_DS_THETA(thetasketch_dim1) FILTER(WHERE dim1 = 'nonexistent'),\n" + + " DS_THETA(dim2, 1024) FILTER(WHERE dim1 = 'nonexistent'),\n" + + " DS_THETA(thetasketch_dim1, 1024) FILTER(WHERE dim1 = 'nonexistent')\n" + + "FROM foo WHERE dim2 = 'a' GROUP BY dim2", + queryContext, + ImmutableList.of( + GroupByQuery.builder() + .setDataSource(CalciteTests.DATASOURCE1) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setDimFilter(selector("dim2", "a", null)) + .setGranularity(Granularities.ALL) + .setVirtualColumns(expressionVirtualColumn("v0", "'a'", ColumnType.STRING)) + .setDimensions(new DefaultDimensionSpec("v0", "d0", ColumnType.STRING)) + .setAggregatorSpecs( + aggregators( + new FilteredAggregatorFactory( + new SketchMergeAggregatorFactory( + "a0", + "v0", + null, + true, + null, + null + ), + selector("dim1", "nonexistent", null) + ), + new FilteredAggregatorFactory( + new SketchMergeAggregatorFactory( + "a1", + "thetasketch_dim1", + null, + true, + null, + null + ), + selector("dim1", "nonexistent", null) + ), + new FilteredAggregatorFactory( + new SketchMergeAggregatorFactory( + "a2", + "v0", + 1024, + true, + null, + null + ), + selector("dim1", "nonexistent", null) + ), + new FilteredAggregatorFactory( + new SketchMergeAggregatorFactory( + "a3", + "thetasketch_dim1", + 1024, + true, + null, + null + ), + selector("dim1", "nonexistent", null) + ) + ) + ) + .setContext(queryContext) + .build() + ), ImmutableList.of(new Object[]{"a", 0L, 0L, "0.0", "0.0"}) ); } diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java index be953775fda0..3df5c230fc48 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java @@ -172,8 +172,8 @@ public void testAutoCompactionRowWithMetricAndRowWithoutMetricShouldPreserveExis // FloatSumAggregator combine method takes in two Float but return Double new FloatSumAggregatorFactory("sum_added", "added"), new SketchMergeAggregatorFactory("thetaSketch", "user", 16384, true, false, null), - new HllSketchBuildAggregatorFactory("HLLSketchBuild", "user", 12, TgtHllType.HLL_4.name(), false), - new DoublesSketchAggregatorFactory("quantilesDoublesSketch", "delta", 128, 1000000000L) + new HllSketchBuildAggregatorFactory("HLLSketchBuild", "user", 12, TgtHllType.HLL_4.name(), false, false), + new DoublesSketchAggregatorFactory("quantilesDoublesSketch", "delta", 128, 1000000000L, null) }, false ); @@ -266,8 +266,8 @@ public void testAutoCompactionRowWithMetricAndRowWithoutMetricShouldPreserveExis new CountAggregatorFactory("count"), new LongSumAggregatorFactory("sum_added", "added"), new SketchMergeAggregatorFactory("thetaSketch", "user", 16384, true, false, null), - new HllSketchBuildAggregatorFactory("HLLSketchBuild", "user", 12, TgtHllType.HLL_4.name(), false), - new DoublesSketchAggregatorFactory("quantilesDoublesSketch", "delta", 128, 1000000000L) + new HllSketchBuildAggregatorFactory("HLLSketchBuild", "user", 12, TgtHllType.HLL_4.name(), false, false), + new DoublesSketchAggregatorFactory("quantilesDoublesSketch", "delta", 128, 1000000000L, null) }, false ); @@ -458,8 +458,8 @@ public void testAutoCompactionDutySubmitAndVerifyCompaction() throws Exception fullDatasourceName, AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING, 0, - 14762, - 14761, + 14586, + 14585, 0, 2, 2, @@ -476,7 +476,7 @@ public void testAutoCompactionDutySubmitAndVerifyCompaction() throws Exception fullDatasourceName, AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING, 0, - 23156, + 22892, 0, 0, 3, @@ -592,8 +592,8 @@ public void testAutoCompactionDutyCanUpdateTaskSlots() throws Exception getAndAssertCompactionStatus( fullDatasourceName, AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING, - 14762, - 14761, + 14586, + 14585, 0, 2, 2, @@ -601,7 +601,7 @@ public void testAutoCompactionDutyCanUpdateTaskSlots() throws Exception 1, 1, 0); - Assert.assertEquals(compactionResource.getCompactionProgress(fullDatasourceName).get("remainingSegmentSize"), "14762"); + Assert.assertEquals(compactionResource.getCompactionProgress(fullDatasourceName).get("remainingSegmentSize"), "14586"); // Run compaction again to compact the remaining day // Remaining day compacted (1 new segment). Now both days compacted (2 total) forceTriggerAutoCompaction(2); @@ -612,7 +612,7 @@ public void testAutoCompactionDutyCanUpdateTaskSlots() throws Exception fullDatasourceName, AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING, 0, - 23156, + 22892, 0, 0, 3, diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/expression/OperatorConversions.java b/sql/src/main/java/org/apache/druid/sql/calcite/expression/OperatorConversions.java index c1b0b1b81f2a..31b94ff4966c 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/expression/OperatorConversions.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/expression/OperatorConversions.java @@ -54,6 +54,7 @@ import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.query.aggregation.PostAggregator; +import org.apache.druid.query.aggregation.post.ExpressionPostAggregator; import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator; import org.apache.druid.segment.column.RowSignature; import org.apache.druid.sql.calcite.planner.Calcites; @@ -239,6 +240,7 @@ public static DruidExpression convertCallWithPostAggOperands( * @param rexNode expression meant to be applied on top of the rows * @param postAggregatorVisitor visitor that manages postagg names and tracks postaggs that were created * by the translation + * @param useExpressions whether we should consider {@link ExpressionPostAggregator} as a target * * @return rexNode referring to fields in rowOrder, or null if not possible */ @@ -247,7 +249,8 @@ public static PostAggregator toPostAggregator( final PlannerContext plannerContext, final RowSignature rowSignature, final RexNode rexNode, - final PostAggregatorVisitor postAggregatorVisitor + final PostAggregatorVisitor postAggregatorVisitor, + final boolean useExpressions ) { final SqlKind kind = rexNode.getKind(); @@ -268,17 +271,41 @@ public static PostAggregator toPostAggregator( final SqlOperatorConversion conversion = plannerContext.getOperatorTable() .lookupOperatorConversion(operator); - if (conversion == null) { - return null; - } else { - return conversion.toPostAggregator( + if (conversion != null) { + // Try call-specific translation. + final PostAggregator postAggregator = conversion.toPostAggregator( plannerContext, rowSignature, rexNode, postAggregatorVisitor ); + + if (postAggregator != null) { + return postAggregator; + } + } + } + + if (useExpressions) { + // Try to translate to expression postaggregator. + final DruidExpression druidExpression = Expressions.toDruidExpression( + plannerContext, + rowSignature, + rexNode + ); + + if (druidExpression != null) { + return new ExpressionPostAggregator( + postAggregatorVisitor.getOutputNamePrefix() + postAggregatorVisitor.getAndIncrementCounter(), + druidExpression.getExpression(), + null, + plannerContext.getExprMacroTable() + ); } - } else if (kind == SqlKind.LITERAL) { + } + + // Could not translate. + if (rexNode instanceof RexCall || kind == SqlKind.LITERAL) { return null; } else { throw new IAE("Unknown rexnode kind: " + kind); @@ -722,7 +749,7 @@ public boolean isOptional(int i) } } - private static boolean throwOrReturn( + public static boolean throwOrReturn( final boolean throwOnFailure, final SqlCallBinding callBinding, final Function exceptionMapper diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/Projection.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/Projection.java index 18ad5212ca79..e66bdd9270ce 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/Projection.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/Projection.java @@ -30,7 +30,6 @@ import org.apache.druid.math.expr.ExpressionType; import org.apache.druid.query.aggregation.PostAggregator; import org.apache.druid.query.aggregation.post.ExpressionPostAggregator; -import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.column.ValueType; @@ -128,7 +127,8 @@ private static void postAggregationHandleOtherKinds( plannerContext, inputRowSignature, postAggregatorRexNode, - postAggregatorVisitor + postAggregatorVisitor, + false ); if (pagg != null) { @@ -166,16 +166,7 @@ private static void handlePostAggregatorExpression( final DruidExpression postAggregatorExpression ) { - if (postAggregatorComplexDirectColumnIsOk(inputRowSignature, postAggregatorExpression, postAggregatorRexNode)) { - // Direct column access on a COMPLEX column, expressions cannot operate on complex columns, only postaggs - // Wrap the column access in a field access postagg so that other postaggs can use it - final PostAggregator postAggregator = new FieldAccessPostAggregator( - postAggregatorVisitor.getOutputNamePrefix() + postAggregatorVisitor.getAndIncrementCounter(), - postAggregatorExpression.getDirectColumn() - ); - postAggregatorVisitor.addPostAgg(postAggregator); - rowOrder.add(postAggregator.getName()); - } else if (postAggregatorDirectColumnIsOk(inputRowSignature, postAggregatorExpression, postAggregatorRexNode)) { + if (postAggregatorDirectColumnIsOk(inputRowSignature, postAggregatorExpression, postAggregatorRexNode)) { // Direct column access, without any type cast as far as Druid's runtime is concerned. // (There might be a SQL-level type cast that we don't care about) rowOrder.add(postAggregatorExpression.getDirectColumn()); @@ -326,38 +317,6 @@ private static boolean postAggregatorDirectColumnIsOk( return toExprType.equals(fromExprType); } - /** - * Returns true if a post-aggregation "expression" can be realized as a direct field access. This is true if it's - * a direct column access that doesn't require an implicit cast. - * - * @param aggregateRowSignature signature of the aggregation - * @param expression post-aggregation expression - * @param rexNode RexNode for the post-aggregation expression - * - * @return yes or no - */ - private static boolean postAggregatorComplexDirectColumnIsOk( - final RowSignature aggregateRowSignature, - final DruidExpression expression, - final RexNode rexNode - ) - { - if (!expression.isDirectColumnAccess()) { - return false; - } - - // Check if a cast is necessary. - final ColumnType toValueType = - aggregateRowSignature.getColumnType(expression.getDirectColumn()) - .orElseThrow( - () -> new ISE("Encountered null type for column[%s]", expression.getDirectColumn()) - ); - - final ColumnType fromValueType = Calcites.getColumnTypeForRelDataType(rexNode.getType()); - - return toValueType.is(ValueType.COMPLEX) && toValueType.equals(fromValueType); - } - public List getPostAggregators() { // If you ever see this error, it probably means a Projection was created in pre-aggregation mode, but then