diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/CardinalityVectorProcessorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/CardinalityVectorProcessorFactory.java index 3d745831b910..c69a06520986 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/CardinalityVectorProcessorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/CardinalityVectorProcessorFactory.java @@ -69,6 +69,7 @@ public CardinalityVectorProcessor makeLongProcessor(ColumnCapabilities capabilit @Override public CardinalityVectorProcessor makeObjectProcessor(ColumnCapabilities capabilities, VectorObjectSelector selector) { - return NilCardinalityVectorProcessor.INSTANCE; + // Handles string-as-object and complex types. + return new StringObjectCardinalityVectorProcessor(selector); } } diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/NilCardinalityVectorProcessor.java b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/NilCardinalityVectorProcessor.java deleted file mode 100644 index 42f1e0be50c0..000000000000 --- a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/NilCardinalityVectorProcessor.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.query.aggregation.cardinality.vector; - -import javax.annotation.Nullable; -import java.nio.ByteBuffer; - -public class NilCardinalityVectorProcessor implements CardinalityVectorProcessor -{ - public static final NilCardinalityVectorProcessor INSTANCE = new NilCardinalityVectorProcessor(); - - @Override - public void aggregate(ByteBuffer buf, int position, int startRow, int endRow) - { - // Do nothing. - } - - @Override - public void aggregate( - ByteBuffer buf, - int numRows, - int[] positions, - @Nullable int[] rows, - int positionOffset - ) - { - // Do nothing. - } -} diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/StringObjectCardinalityVectorProcessor.java b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/StringObjectCardinalityVectorProcessor.java new file mode 100644 index 000000000000..f083e6decee0 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/StringObjectCardinalityVectorProcessor.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.cardinality.vector; + +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.hll.HyperLogLogCollector; +import org.apache.druid.query.aggregation.cardinality.types.StringCardinalityAggregatorColumnSelectorStrategy; +import org.apache.druid.segment.vector.VectorObjectSelector; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; +import java.util.List; + +public class StringObjectCardinalityVectorProcessor implements CardinalityVectorProcessor +{ + private final VectorObjectSelector selector; + + public StringObjectCardinalityVectorProcessor(final VectorObjectSelector selector) + { + this.selector = selector; + } + + @Override + public void aggregate(ByteBuffer buf, int position, int startRow, int endRow) + { + // Save position, limit and restore later instead of allocating a new ByteBuffer object + final int oldPosition = buf.position(); + final int oldLimit = buf.limit(); + + try { + final Object[] vector = selector.getObjectVector(); + + buf.limit(position + HyperLogLogCollector.getLatestNumBytesForDenseStorage()); + buf.position(position); + + final HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(buf); + + for (int i = startRow; i < endRow; i++) { + addObjectIfString(collector, vector[i]); + } + } + finally { + buf.limit(oldLimit); + buf.position(oldPosition); + } + } + + @Override + public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset) + { + // Save position, limit and restore later instead of allocating a new ByteBuffer object + final int oldPosition = buf.position(); + final int oldLimit = buf.limit(); + + try { + final Object[] vector = selector.getObjectVector(); + + for (int i = 0; i < numRows; i++) { + final Object obj = vector[rows != null ? rows[i] : i]; + + if (NullHandling.replaceWithDefault() || obj != null) { + final int position = positions[i] + positionOffset; + buf.limit(position + HyperLogLogCollector.getLatestNumBytesForDenseStorage()); + buf.position(position); + final HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(buf); + addObjectIfString(collector, obj); + } + } + } + finally { + buf.limit(oldLimit); + buf.position(oldPosition); + } + } + + /** + * Adds an Object to a HyperLogLogCollector. If the object is a {@code List} or {@code String} then + * the individual Strings are added to the collector. + * + * If the object is any other type (including null) then behavior depends on null-handling mode: + * + * - In SQL-compatible mode, ignore non-strings and nulls. + * - In replace-with-default mode, treat all non-strings and nulls as empty strings. + */ + private static void addObjectIfString(final HyperLogLogCollector collector, @Nullable final Object obj) + { + if (obj instanceof String) { + StringCardinalityAggregatorColumnSelectorStrategy.addStringToCollector(collector, (String) obj); + } else if (obj instanceof List) { + //noinspection unchecked + for (String s : (List) obj) { + StringCardinalityAggregatorColumnSelectorStrategy.addStringToCollector(collector, s); + } + } else { + StringCardinalityAggregatorColumnSelectorStrategy.addStringToCollector(collector, null); + } + } +} diff --git a/processing/src/main/java/org/apache/druid/segment/VectorColumnProcessorFactory.java b/processing/src/main/java/org/apache/druid/segment/VectorColumnProcessorFactory.java index 7ebdb81a99c8..b2df1f9a37dc 100644 --- a/processing/src/main/java/org/apache/druid/segment/VectorColumnProcessorFactory.java +++ b/processing/src/main/java/org/apache/druid/segment/VectorColumnProcessorFactory.java @@ -41,8 +41,12 @@ public interface VectorColumnProcessorFactory { /** - * Called when {@link ColumnCapabilities#getType()} is STRING and the underlying column always has a single value + * Called only if {@link ColumnCapabilities#getType()} is STRING and the underlying column always has a single value * per row. + * + * Note that for STRING-typed columns where the dictionary does not exist or is not expected to be useful, + * {@link #makeObjectProcessor} may be called instead. To handle all string inputs properly, processors must implement + * all three methods (single-value, multi-value, object). */ T makeSingleValueDimensionProcessor( ColumnCapabilities capabilities, @@ -50,8 +54,12 @@ T makeSingleValueDimensionProcessor( ); /** - * Called when {@link ColumnCapabilities#getType()} is STRING and the underlying column may have multiple values + * Called only if {@link ColumnCapabilities#getType()} is STRING and the underlying column may have multiple values * per row. + * + * Note that for STRING-typed columns where the dictionary does not exist or is not expected to be useful, + * {@link #makeObjectProcessor} may be called instead. To handle all string inputs properly, processors must implement + * all three methods (single-value, multi-value, object). */ T makeMultiValueDimensionProcessor( ColumnCapabilities capabilities, @@ -74,7 +82,8 @@ T makeMultiValueDimensionProcessor( T makeLongProcessor(ColumnCapabilities capabilities, VectorValueSelector selector); /** - * Called when {@link ColumnCapabilities#getType()} is COMPLEX. + * Called when {@link ColumnCapabilities#getType()} is COMPLEX. May also be called for STRING typed columns in + * cases where the dictionary does not exist or is not expected to be useful. */ T makeObjectProcessor(@SuppressWarnings("unused") ColumnCapabilities capabilities, VectorObjectSelector selector); } diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java index 4139c8ca4723..a861803b832a 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java @@ -8855,6 +8855,78 @@ public void testGroupByCardinalityAggOnFloat() TestHelper.assertExpectedObjects(expectedResults, results, "cardinality-agg"); } + @Test + public void testGroupByCardinalityAggOnMultiStringExpression() + { + GroupByQuery query = makeQueryBuilder() + .setDataSource(QueryRunnerTestHelper.DATA_SOURCE) + .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD) + .setVirtualColumns( + new ExpressionVirtualColumn("v0", "concat(quality,market)", ValueType.STRING, TestExprMacroTable.INSTANCE) + ) + .setAggregatorSpecs( + QueryRunnerTestHelper.ROWS_COUNT, + new CardinalityAggregatorFactory( + "numVals", + ImmutableList.of(DefaultDimensionSpec.of("v0")), + false + ) + ) + .setGranularity(QueryRunnerTestHelper.ALL_GRAN) + .build(); + + List expectedResults = Collections.singletonList( + makeRow( + query, + "2011-04-01", + "rows", + 26L, + "numVals", + 13.041435202975777d + ) + ); + + Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); + TestHelper.assertExpectedObjects(expectedResults, results, "cardinality-agg"); + } + + @Test + public void testGroupByCardinalityAggOnHyperUnique() + { + // Cardinality aggregator on complex columns (like hyperUnique) returns 0. + + GroupByQuery query = makeQueryBuilder() + .setDataSource(QueryRunnerTestHelper.DATA_SOURCE) + .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD) + .setAggregatorSpecs( + QueryRunnerTestHelper.ROWS_COUNT, + new CardinalityAggregatorFactory( + "cardinality", + ImmutableList.of(DefaultDimensionSpec.of("quality_uniques")), + false + ), + new HyperUniquesAggregatorFactory("hyperUnique", "quality_uniques", false, false) + ) + .setGranularity(QueryRunnerTestHelper.ALL_GRAN) + .build(); + + List expectedResults = Collections.singletonList( + makeRow( + query, + "2011-04-01", + "rows", + 26L, + "cardinality", + NullHandling.replaceWithDefault() ? 1.0002442201269182 : 0.0d, + "hyperUnique", + 9.019833517963864d + ) + ); + + Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); + TestHelper.assertExpectedObjects(expectedResults, results, "cardinality-agg"); + } + @Test public void testGroupByLongColumn() { diff --git a/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerTest.java index 58b2a90596cf..89aff397548d 100644 --- a/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerTest.java @@ -48,9 +48,12 @@ import org.apache.druid.query.aggregation.ExpressionLambdaAggregatorFactory; import org.apache.druid.query.aggregation.FilteredAggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; +import org.apache.druid.query.aggregation.cardinality.CardinalityAggregatorFactory; import org.apache.druid.query.aggregation.first.DoubleFirstAggregatorFactory; +import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; import org.apache.druid.query.aggregation.last.DoubleLastAggregatorFactory; import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator; +import org.apache.druid.query.dimension.DefaultDimensionSpec; import org.apache.druid.query.expression.TestExprMacroTable; import org.apache.druid.query.extraction.MapLookupExtractor; import org.apache.druid.query.filter.AndDimFilter; @@ -3035,6 +3038,84 @@ public void testTimeseriesWithExpressionAggregator() assertExpectedResults(expectedResults, results); } + @Test + public void testTimeseriesCardinalityAggOnMultiStringExpression() + { + TimeseriesQuery query = Druids.newTimeseriesQueryBuilder() + .dataSource(QueryRunnerTestHelper.DATA_SOURCE) + .intervals(QueryRunnerTestHelper.FIRST_TO_THIRD) + .virtualColumns( + new ExpressionVirtualColumn("v0", "concat(quality,market)", ValueType.STRING, TestExprMacroTable.INSTANCE) + ) + .aggregators( + QueryRunnerTestHelper.ROWS_COUNT, + new CardinalityAggregatorFactory( + "numVals", + ImmutableList.of(DefaultDimensionSpec.of("v0")), + false + ) + ) + .granularity(QueryRunnerTestHelper.ALL_GRAN) + .build(); + + List> expectedResults = Collections.singletonList( + new Result<>( + DateTimes.of("2011-04-01"), + new TimeseriesResultValue( + ImmutableMap.of( + "rows", + 26L, + "numVals", + 13.041435202975777d + ) + ) + ) + ); + + Iterable> results = runner.run(QueryPlus.wrap(query)).toList(); + assertExpectedResults(expectedResults, results); + } + + @Test + public void testTimeseriesCardinalityAggOnHyperUnique() + { + // Cardinality aggregator on complex columns (like hyperUnique) returns 0. + + TimeseriesQuery query = Druids.newTimeseriesQueryBuilder() + .dataSource(QueryRunnerTestHelper.DATA_SOURCE) + .intervals(QueryRunnerTestHelper.FIRST_TO_THIRD) + .aggregators( + QueryRunnerTestHelper.ROWS_COUNT, + new CardinalityAggregatorFactory( + "cardinality", + ImmutableList.of(DefaultDimensionSpec.of("quality_uniques")), + false + ), + new HyperUniquesAggregatorFactory("hyperUnique", "quality_uniques", false, false) + ) + .granularity(QueryRunnerTestHelper.ALL_GRAN) + .build(); + + List> expectedResults = Collections.singletonList( + new Result<>( + DateTimes.of("2011-04-01"), + new TimeseriesResultValue( + ImmutableMap.of( + "rows", + 26L, + "cardinality", + NullHandling.replaceWithDefault() ? 1.0002442201269182 : 0.0d, + "hyperUnique", + 9.019833517963864d + ) + ) + ) + ); + + Iterable> results = runner.run(QueryPlus.wrap(query)).toList(); + assertExpectedResults(expectedResults, results); + } + private Map makeContext() { return makeContext(ImmutableMap.of()); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index e93af44d30f2..d6d905fb4a91 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -8010,6 +8010,37 @@ public void testApproxCountDistinct() throws Exception ); } + @Test + public void testApproxCountDistinctOnVectorizableSingleStringExpression() throws Exception + { + testQuery( + "SELECT APPROX_COUNT_DISTINCT(dim1 || 'hello') FROM druid.foo", + ImmutableList.of( + Druids.newTimeseriesQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(querySegmentSpec(Filtration.eternity())) + .granularity(Granularities.ALL) + .virtualColumns( + expressionVirtualColumn("v0", "concat(\"dim1\",'hello')", ValueType.STRING) + ) + .aggregators( + aggregators( + new CardinalityAggregatorFactory( + "a0", + null, + dimensions(DefaultDimensionSpec.of("v0")), + false, + true + ) + ) + ) + .context(TIMESERIES_CONTEXT_DEFAULT) + .build() + ), + ImmutableList.of(new Object[]{6L}) + ); + } + @Test public void testNestedGroupBy() throws Exception {