From 2a8ff88a0df3274d9f936dd72be266256f5d9d7c Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Tue, 25 Jul 2023 01:39:30 -0700 Subject: [PATCH 1/2] arrays and aggregators changes: * add support to hll and theta sketch aggs for array inputs, converting arrays to byte[] form. native agg specs have a flag to indicate if ingest time lists should be handled as arrays or mvds * add support to bloom filter and aggregator for array inputs, converting arrays to byte[] form --- .../hll/HllSketchAggregatorFactory.java | 22 +- .../hll/HllSketchBuildAggregatorFactory.java | 24 +- .../datasketches/hll/HllSketchBuildUtil.java | 26 +- .../hll/HllSketchMergeAggregatorFactory.java | 2 +- .../hll/sql/HllSketchBaseSqlAggregator.java | 3 +- .../HllSketchBuildVectorProcessorFactory.java | 41 ++- .../ObjectHllSketchBuildVectorProcessor.java | 6 +- .../datasketches/theta/SketchAggregator.java | 30 +- .../theta/SketchAggregatorFactory.java | 13 +- .../theta/SketchBufferAggregator.java | 6 +- .../theta/SketchMergeAggregatorFactory.java | 27 +- .../theta/SketchVectorAggregator.java | 9 +- .../OldSketchBuildAggregatorFactory.java | 2 +- .../OldSketchMergeAggregatorFactory.java | 2 +- .../sql/ThetaSketchBaseSqlAggregator.java | 6 +- .../hll/HllSketchAggregatorFactoryTest.java | 8 +- .../hll/HllSketchAggregatorTest.java | 170 ++++++++++ .../HllSketchBuildAggregatorFactoryTest.java | 4 +- .../hll/HllSketchBuildUtilTest.java | 4 +- .../HllSketchMergeAggregatorFactoryTest.java | 3 +- .../hll/sql/HllSketchSqlAggregatorTest.java | 216 ++++++++++-- ...UsingSketchMergeAggregatorFactoryTest.java | 2 +- .../theta/SketchAggregationTest.java | 313 +++++++++++++++++- .../theta/SketchAggregatorFactoryTest.java | 8 +- .../SketchToStringPostAggregatorTest.java | 2 +- .../sql/ThetaSketchSqlAggregatorTest.java | 287 ++++++++++++++-- .../bloom/BloomFilterAggregatorFactory.java | 22 +- .../bloom/ObjectBloomFilterAggregator.java | 3 + .../druid/query/filter/BloomDimFilter.java | 30 ++ .../bloom/BloomFilterAggregatorTest.java | 78 ++++- .../query/filter/BloomDimFilterTest.java | 28 +- .../filter/sql/BloomDimFilterSqlTest.java | 1 + .../duty/ITAutoCompactionTest.java | 7 +- .../org/apache/druid/math/expr/ExprEval.java | 21 ++ 34 files changed, 1269 insertions(+), 157 deletions(-) diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorFactory.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorFactory.java index 4bc734dc0051..1f65b0480367 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorFactory.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorFactory.java @@ -62,6 +62,8 @@ public abstract class HllSketchAggregatorFactory extends AggregatorFactory private final boolean shouldFinalize; private final boolean round; + private final boolean processAsArray; + HllSketchAggregatorFactory( final String name, final String fieldName, @@ -69,7 +71,8 @@ public abstract class HllSketchAggregatorFactory extends AggregatorFactory @Nullable final String tgtHllType, @Nullable final StringEncoding stringEncoding, final Boolean shouldFinalize, - final boolean round + final boolean round, + final boolean processAsArray ) { this.name = Objects.requireNonNull(name); @@ -79,6 +82,7 @@ public abstract class HllSketchAggregatorFactory extends AggregatorFactory this.stringEncoding = stringEncoding == null ? DEFAULT_STRING_ENCODING : stringEncoding; this.shouldFinalize = shouldFinalize == null ? DEFAULT_SHOULD_FINALIZE : shouldFinalize; this.round = round; + this.processAsArray = processAsArray; } @Override @@ -127,6 +131,13 @@ public boolean isRound() return round; } + @JsonProperty + @JsonInclude(JsonInclude.Include.NON_DEFAULT) + public boolean isProcessAsArray() + { + return processAsArray; + } + @Override public List requiredFields() { @@ -149,7 +160,8 @@ public List getRequiredColumns() tgtHllType.toString(), stringEncoding, shouldFinalize, - round + round, + false ) ); } @@ -284,13 +296,14 @@ public boolean equals(Object o) && Objects.equals(name, that.name) && Objects.equals(fieldName, that.fieldName) && tgtHllType == that.tgtHllType - && stringEncoding == that.stringEncoding; + && stringEncoding == that.stringEncoding + && processAsArray == that.processAsArray; } @Override public int hashCode() { - return Objects.hash(name, fieldName, lgK, tgtHllType, stringEncoding, shouldFinalize, round); + return Objects.hash(name, fieldName, lgK, tgtHllType, stringEncoding, shouldFinalize, round, processAsArray); } @Override @@ -304,6 +317,7 @@ public String toString() (stringEncoding != DEFAULT_STRING_ENCODING ? ", stringEncoding=" + stringEncoding : "") + (shouldFinalize != DEFAULT_SHOULD_FINALIZE ? ", shouldFinalize=" + shouldFinalize : "") + (round != DEFAULT_ROUND ? ", round=" + round : "") + + (processAsArray ? ", processAsArray=true" : "") + '}'; } diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildAggregatorFactory.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildAggregatorFactory.java index 6d42c678bbb1..ec2bf5a23334 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildAggregatorFactory.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildAggregatorFactory.java @@ -23,9 +23,10 @@ import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.datasketches.hll.HllSketch; import org.apache.datasketches.hll.TgtHllType; -import org.apache.druid.error.InvalidInput; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringEncoding; +import org.apache.druid.math.expr.ExprEval; +import org.apache.druid.math.expr.ExpressionType; import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorUtil; @@ -54,6 +55,7 @@ public class HllSketchBuildAggregatorFactory extends HllSketchAggregatorFactory { public static final ColumnType TYPE = ColumnType.ofComplex(HllSketchModule.BUILD_TYPE_NAME); + @JsonCreator public HllSketchBuildAggregatorFactory( @JsonProperty("name") final String name, @@ -62,10 +64,11 @@ public HllSketchBuildAggregatorFactory( @JsonProperty("tgtHllType") @Nullable final String tgtHllType, @JsonProperty("stringEncoding") @Nullable final StringEncoding stringEncoding, @JsonProperty("shouldFinalize") final Boolean shouldFinalize, - @JsonProperty("round") final boolean round + @JsonProperty("round") final boolean round, + @JsonProperty("processAsArray") final boolean processAsArray ) { - super(name, fieldName, lgK, tgtHllType, stringEncoding, shouldFinalize, round); + super(name, fieldName, lgK, tgtHllType, stringEncoding, shouldFinalize, round, processAsArray); } @@ -144,7 +147,8 @@ public AggregatorFactory withName(String newName) getTgtHllType(), getStringEncoding(), isShouldFinalize(), - isRound() + isRound(), + isProcessAsArray() ); } @@ -223,12 +227,20 @@ private HllSketchUpdater formulateSketchUpdater(ColumnSelectorFactory columnSele }; break; case ARRAY: - throw InvalidInput.exception("ARRAY types are not supported for hll sketch"); + final ExpressionType expressionType = ExpressionType.fromColumnTypeStrict(capabilities); + updater = sketch -> { + final Object o = selector.getObject(); + if (o != null) { + byte[] bytes = ExprEval.toBytes(expressionType, o); + sketch.get().update(bytes); + } + }; + break; default: updater = sketch -> { Object obj = selector.getObject(); if (obj != null) { - HllSketchBuildUtil.updateSketch(sketch.get(), getStringEncoding(), obj); + HllSketchBuildUtil.updateSketch(sketch.get(), getStringEncoding(), obj, isProcessAsArray()); } }; } diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildUtil.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildUtil.java index bcd4c4eb6d90..0437999ab85a 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildUtil.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildUtil.java @@ -25,6 +25,7 @@ import org.apache.druid.java.util.common.StringEncoding; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.UOE; +import org.apache.druid.math.expr.ExprEval; import org.apache.druid.segment.DimensionDictionarySelector; import javax.annotation.Nullable; @@ -33,7 +34,12 @@ public class HllSketchBuildUtil { - public static void updateSketch(final HllSketch sketch, final StringEncoding stringEncoding, final Object value) + public static void updateSketch( + final HllSketch sketch, + final StringEncoding stringEncoding, + final Object value, + final boolean processAsArray + ) { if (value instanceof Integer || value instanceof Long) { sketch.update(((Number) value).longValue()); @@ -41,11 +47,21 @@ public static void updateSketch(final HllSketch sketch, final StringEncoding str sketch.update(((Number) value).doubleValue()); } else if (value instanceof String) { updateSketchWithString(sketch, stringEncoding, (String) value); + } else if (value instanceof Object[] && processAsArray) { + byte[] arrayBytes = ExprEval.toBytesBestEffort(value); + sketch.update(arrayBytes); } else if (value instanceof List) { - // noinspection rawtypes - for (Object entry : (List) value) { - if (entry != null) { - updateSketchWithString(sketch, stringEncoding, entry.toString()); + if (processAsArray) { + final ExprEval eval = ExprEval.bestEffortArray((List) value); + final byte[] arrayBytes = ExprEval.toBytes(eval); + sketch.update(arrayBytes); + } else { + // Lists are treated as multi-value strings, which count each element as a separate distinct value + // noinspection rawtypes + for (Object entry : (List) value) { + if (entry != null) { + updateSketchWithString(sketch, stringEncoding, entry.toString()); + } } } } else if (value instanceof char[]) { diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeAggregatorFactory.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeAggregatorFactory.java index 833df8ab1a55..20d2a854c8cd 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeAggregatorFactory.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeAggregatorFactory.java @@ -64,7 +64,7 @@ public HllSketchMergeAggregatorFactory( @JsonProperty("round") final boolean round ) { - super(name, fieldName, lgK, tgtHllType, stringEncoding, shouldFinalize, round); + super(name, fieldName, lgK, tgtHllType, stringEncoding, shouldFinalize, round, false); } @Override diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchBaseSqlAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchBaseSqlAggregator.java index c6dd3e7afa02..ede81f0f21a8 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchBaseSqlAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchBaseSqlAggregator.java @@ -193,7 +193,8 @@ public Aggregation toDruidAggregation( tgtHllType, stringEncoding, finalizeSketch || SketchQueryContext.isFinalizeOuterSketches(plannerContext), - ROUND + ROUND, + inputType.isArray() ); } } diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/vector/HllSketchBuildVectorProcessorFactory.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/vector/HllSketchBuildVectorProcessorFactory.java index bce83d50d5cf..6e574a86ca50 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/vector/HllSketchBuildVectorProcessorFactory.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/vector/HllSketchBuildVectorProcessorFactory.java @@ -19,8 +19,11 @@ package org.apache.druid.query.aggregation.datasketches.hll.vector; +import org.apache.datasketches.hll.HllSketch; import org.apache.druid.error.DruidException; import org.apache.druid.java.util.common.StringEncoding; +import org.apache.druid.math.expr.ExprEval; +import org.apache.druid.math.expr.ExpressionType; import org.apache.druid.query.aggregation.datasketches.hll.HllSketchBuildBufferAggregatorHelper; import org.apache.druid.segment.VectorColumnProcessorFactory; import org.apache.druid.segment.column.ColumnCapabilities; @@ -29,6 +32,9 @@ import org.apache.druid.segment.vector.VectorObjectSelector; import org.apache.druid.segment.vector.VectorValueSelector; +import javax.annotation.Nullable; +import java.nio.ByteBuffer; + public class HllSketchBuildVectorProcessorFactory implements VectorColumnProcessorFactory { private final HllSketchBuildBufferAggregatorHelper helper; @@ -90,7 +96,40 @@ public HllSketchBuildVectorProcessor makeArrayProcessor( VectorObjectSelector selector ) { - throw DruidException.defensive("ARRAY types are not supported for hll sketch"); + final ExpressionType expressionType = ExpressionType.fromColumnTypeStrict(capabilities); + return new HllSketchBuildVectorProcessor() + { + @Override + public void aggregate(ByteBuffer buf, int position, int startRow, int endRow) + { + final Object[] vector = selector.getObjectVector(); + final HllSketch sketch = helper.getSketchAtPosition(buf, position); + + for (int i = startRow; i < endRow; i++) { + if (vector[i] != null) { + byte[] bytes = ExprEval.toBytes(expressionType, vector[i]); + sketch.update(bytes); + } + } + } + + @Override + public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset) + { + final Object[] vector = selector.getObjectVector(); + + for (int i = 0; i < numRows; i++) { + final int idx = rows != null ? rows[i] : i; + final int position = positions[i] + positionOffset; + final HllSketch sketch = helper.getSketchAtPosition(buf, position); + + if (vector[idx] != null) { + byte[] bytes = ExprEval.toBytes(expressionType, vector[idx]); + sketch.update(bytes); + } + } + } + }; } @Override diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/vector/ObjectHllSketchBuildVectorProcessor.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/vector/ObjectHllSketchBuildVectorProcessor.java index 56eceb15f5c1..51308db4a88d 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/vector/ObjectHllSketchBuildVectorProcessor.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/vector/ObjectHllSketchBuildVectorProcessor.java @@ -59,7 +59,8 @@ public void aggregate(ByteBuffer buf, int position, int startRow, int endRow) HllSketchBuildUtil.updateSketch( sketch, stringEncoding, - vector[i] + vector[i], + false ); } } @@ -79,7 +80,8 @@ public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable in HllSketchBuildUtil.updateSketch( sketch, stringEncoding, - vector[idx] + vector[idx], + false ); } } diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregator.java index 32344e408294..c614220e4fce 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregator.java @@ -24,6 +24,7 @@ import org.apache.datasketches.theta.Union; import org.apache.druid.common.config.NullHandling; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.math.expr.ExprEval; import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.segment.BaseObjectColumnValueSelector; @@ -35,14 +36,16 @@ public class SketchAggregator implements Aggregator private final BaseObjectColumnValueSelector selector; private final int size; + private final boolean processAsArray; @Nullable private Union union; - public SketchAggregator(BaseObjectColumnValueSelector selector, int size) + public SketchAggregator(BaseObjectColumnValueSelector selector, int size, boolean processAsArray) { this.selector = selector; this.size = size; + this.processAsArray = processAsArray; } private void initUnion() @@ -61,7 +64,7 @@ public void aggregate() if (union == null) { initUnion(); } - updateUnion(union, update); + updateUnion(union, update, processAsArray); } } @@ -85,7 +88,7 @@ public long aggregateWithSize() initialSketchSize = union.getCurrentBytes(); } - updateUnion(union, update); + updateUnion(union, update, processAsArray); long sketchSizeDelta = union.getCurrentBytes() - initialSketchSize; return sketchSizeDelta + unionSizeDelta; @@ -132,7 +135,7 @@ public void close() union = null; } - static void updateUnion(Union union, Object update) + static void updateUnion(Union union, Object update, boolean processAsArrays) { if (update instanceof SketchHolder) { ((SketchHolder) update).updateUnion(union); @@ -148,12 +151,21 @@ static void updateUnion(Union union, Object update) union.update((int[]) update); } else if (update instanceof long[]) { union.update((long[]) update); + } else if (update instanceof Object[] && processAsArrays) { + final byte[] arrayBytes = ExprEval.toBytesBestEffort(update); + union.update(arrayBytes); } else if (update instanceof List) { - for (Object entry : (List) update) { - if (entry != null) { - final String asString = entry.toString(); - if (!NullHandling.isNullOrEquivalent(asString)) { - union.update(asString); + if (processAsArrays) { + final ExprEval eval = ExprEval.bestEffortArray((List) update); + final byte[] arrayBytes = ExprEval.toBytes(eval); + union.update(arrayBytes); + } else { + for (Object entry : (List) update) { + if (entry != null) { + final String asString = entry.toString(); + if (!NullHandling.isNullOrEquivalent(asString)) { + union.update(asString); + } } } } diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java index 211373e873b0..d88b1af7f449 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java @@ -65,7 +65,9 @@ public abstract class SketchAggregatorFactory extends AggregatorFactory protected final int size; private final byte cacheId; - public SketchAggregatorFactory(String name, String fieldName, Integer size, byte cacheId) + protected final boolean processAsArray; + + public SketchAggregatorFactory(String name, String fieldName, Integer size, byte cacheId, boolean processAsArray) { this.name = Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name"); this.fieldName = Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName"); @@ -74,6 +76,7 @@ public SketchAggregatorFactory(String name, String fieldName, Integer size, byte Util.checkIfIntPowerOf2(this.size, "size"); this.cacheId = cacheId; + this.processAsArray = processAsArray; } @SuppressWarnings("unchecked") @@ -85,7 +88,7 @@ public Aggregator factorize(ColumnSelectorFactory metricFactory) throw InvalidInput.exception("ARRAY types are not supported for theta sketch"); } BaseObjectColumnValueSelector selector = metricFactory.makeColumnValueSelector(fieldName); - return new SketchAggregator(selector, size); + return new SketchAggregator(selector, size, processAsArray); } @Override @@ -96,7 +99,7 @@ public AggregatorAndSize factorizeWithSize(ColumnSelectorFactory metricFactory) throw InvalidInput.exception("ARRAY types are not supported for theta sketch"); } BaseObjectColumnValueSelector selector = metricFactory.makeColumnValueSelector(fieldName); - final SketchAggregator aggregator = new SketchAggregator(selector, size); + final SketchAggregator aggregator = new SketchAggregator(selector, size, processAsArray); return new AggregatorAndSize(aggregator, aggregator.getInitialSizeBytes()); } @@ -109,13 +112,13 @@ public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) throw InvalidInput.exception("ARRAY types are not supported for theta sketch"); } BaseObjectColumnValueSelector selector = metricFactory.makeColumnValueSelector(fieldName); - return new SketchBufferAggregator(selector, size, getMaxIntermediateSizeWithNulls()); + return new SketchBufferAggregator(selector, size, getMaxIntermediateSizeWithNulls(), processAsArray); } @Override public VectorAggregator factorizeVector(VectorColumnSelectorFactory selectorFactory) { - return new SketchVectorAggregator(selectorFactory, fieldName, size, getMaxIntermediateSizeWithNulls()); + return new SketchVectorAggregator(selectorFactory, fieldName, size, getMaxIntermediateSizeWithNulls(), processAsArray); } @Override diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java index 34aae3f36e18..787b3f84402b 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java @@ -31,11 +31,13 @@ public class SketchBufferAggregator implements BufferAggregator { private final BaseObjectColumnValueSelector selector; private final SketchBufferAggregatorHelper helper; + private final boolean processAsArray; - public SketchBufferAggregator(BaseObjectColumnValueSelector selector, int size, int maxIntermediateSize) + public SketchBufferAggregator(BaseObjectColumnValueSelector selector, int size, int maxIntermediateSize, boolean processAsArray) { this.selector = selector; this.helper = new SketchBufferAggregatorHelper(size, maxIntermediateSize); + this.processAsArray = processAsArray; } @Override @@ -53,7 +55,7 @@ public void aggregate(ByteBuffer buf, int position) } Union union = helper.getOrCreateUnion(buf, position); - SketchAggregator.updateUnion(union, update); + SketchAggregator.updateUnion(union, update, processAsArray); } diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchMergeAggregatorFactory.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchMergeAggregatorFactory.java index 41869d5ea509..33626448e3dc 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchMergeAggregatorFactory.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchMergeAggregatorFactory.java @@ -46,10 +46,11 @@ public SketchMergeAggregatorFactory( @JsonProperty("size") @Nullable Integer size, @JsonProperty("shouldFinalize") @Nullable Boolean shouldFinalize, @JsonProperty("isInputThetaSketch") @Nullable Boolean isInputThetaSketch, - @JsonProperty("errorBoundsStdDev") @Nullable Integer errorBoundsStdDev + @JsonProperty("errorBoundsStdDev") @Nullable Integer errorBoundsStdDev, + @JsonProperty("processAsArray") boolean processAsArray ) { - super(name, fieldName, size, AggregatorUtil.SKETCH_MERGE_CACHE_TYPE_ID); + super(name, fieldName, size, AggregatorUtil.SKETCH_MERGE_CACHE_TYPE_ID, processAsArray); this.shouldFinalize = (shouldFinalize == null) ? true : shouldFinalize; this.isInputThetaSketch = (isInputThetaSketch == null) ? false : isInputThetaSketch; this.errorBoundsStdDev = errorBoundsStdDev; @@ -65,7 +66,8 @@ public List getRequiredColumns() size, shouldFinalize, isInputThetaSketch, - errorBoundsStdDev + errorBoundsStdDev, + processAsArray ) ); } @@ -73,7 +75,7 @@ public List getRequiredColumns() @Override public AggregatorFactory getCombiningFactory() { - return new SketchMergeAggregatorFactory(name, name, size, shouldFinalize, false, errorBoundsStdDev); + return new SketchMergeAggregatorFactory(name, name, size, shouldFinalize, false, errorBoundsStdDev, processAsArray); } @Override @@ -88,7 +90,8 @@ public AggregatorFactory getMergingFactory(AggregatorFactory other) throws Aggre Math.max(size, castedOther.size), shouldFinalize, false, - errorBoundsStdDev + errorBoundsStdDev, + processAsArray ); } else { throw new AggregatorFactoryNotMergeableException(this, other); @@ -117,6 +120,13 @@ public Integer getErrorBoundsStdDev() return errorBoundsStdDev; } + @JsonProperty + @JsonInclude(JsonInclude.Include.NON_DEFAULT) + public boolean isProcessAsArray() + { + return processAsArray; + } + /** * Finalize the computation on sketch object and returns estimate from underlying * sketch. @@ -178,7 +188,8 @@ public AggregatorFactory withName(String newName) getSize(), getShouldFinalize(), getIsInputThetaSketch(), - getErrorBoundsStdDev() + getErrorBoundsStdDev(), + isProcessAsArray() ); } @@ -212,7 +223,7 @@ public boolean equals(Object o) return false; } - return isInputThetaSketch == that.isInputThetaSketch; + return isInputThetaSketch == that.isInputThetaSketch && processAsArray == that.processAsArray; } @Override @@ -222,6 +233,7 @@ public int hashCode() result = 31 * result + (shouldFinalize ? 1 : 0); result = 31 * result + (isInputThetaSketch ? 1 : 0); result = 31 * result + (errorBoundsStdDev != null ? errorBoundsStdDev.hashCode() : 0); + result = 31 * result + (processAsArray ? 1 : 0); return result; } @@ -235,6 +247,7 @@ public String toString() + ", shouldFinalize=" + shouldFinalize + ", isInputThetaSketch=" + isInputThetaSketch + ", errorBoundsStdDev=" + errorBoundsStdDev + + ", processAsArray=" + processAsArray + "}"; } } diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchVectorAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchVectorAggregator.java index a862265d561c..43125504177a 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchVectorAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchVectorAggregator.java @@ -33,15 +33,18 @@ public class SketchVectorAggregator implements VectorAggregator { private final SketchBufferAggregatorHelper helper; private final Supplier objectSupplier; + private final boolean processAsArray; SketchVectorAggregator( final VectorColumnSelectorFactory columnSelectorFactory, final String column, final int size, - final int maxIntermediateSize + final int maxIntermediateSize, + final boolean processAsArray ) { this.helper = new SketchBufferAggregatorHelper(size, maxIntermediateSize); + this.processAsArray = processAsArray; this.objectSupplier = ColumnProcessors.makeVectorProcessor( column, @@ -65,7 +68,7 @@ public void aggregate(final ByteBuffer buf, final int position, final int startR for (int i = startRow; i < endRow; i++) { final Object o = vector[i]; if (o != null) { - SketchAggregator.updateUnion(union, o); + SketchAggregator.updateUnion(union, o, processAsArray); } } } @@ -87,7 +90,7 @@ public void aggregate( if (o != null) { final int position = positions[i] + positionOffset; final Union union = helper.getOrCreateUnion(buf, position); - SketchAggregator.updateUnion(union, o); + SketchAggregator.updateUnion(union, o, processAsArray); } } } diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/oldapi/OldSketchBuildAggregatorFactory.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/oldapi/OldSketchBuildAggregatorFactory.java index 2c8d21941eb3..4868fb74d298 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/oldapi/OldSketchBuildAggregatorFactory.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/oldapi/OldSketchBuildAggregatorFactory.java @@ -35,7 +35,7 @@ public OldSketchBuildAggregatorFactory( @JsonProperty("size") Integer size ) { - super(name, fieldName, size, true, false, null); + super(name, fieldName, size, true, false, null, false); } @Override diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/oldapi/OldSketchMergeAggregatorFactory.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/oldapi/OldSketchMergeAggregatorFactory.java index e884b210a3a0..94070ec87a69 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/oldapi/OldSketchMergeAggregatorFactory.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/oldapi/OldSketchMergeAggregatorFactory.java @@ -36,7 +36,7 @@ public OldSketchMergeAggregatorFactory( @JsonProperty("shouldFinalize") Boolean shouldFinalize ) { - super(name, fieldName, size, shouldFinalize, true, null); + super(name, fieldName, size, shouldFinalize, true, null, false); } @Override diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchBaseSqlAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchBaseSqlAggregator.java index 6564b276c971..69d5e58191ac 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchBaseSqlAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchBaseSqlAggregator.java @@ -116,7 +116,8 @@ public Aggregation toDruidAggregation( sketchSize, finalizeSketch || SketchQueryContext.isFinalizeOuterSketches(plannerContext), null, - null + null, + columnArg.getDruidType() != null && columnArg.getDruidType().isArray() ); } else { final RelDataType dataType = columnRexNode.getType(); @@ -147,7 +148,8 @@ public Aggregation toDruidAggregation( sketchSize, finalizeSketch || SketchQueryContext.isFinalizeOuterSketches(plannerContext), null, - null + null, + inputType.isArray() ); } diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorFactoryTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorFactoryTest.java index 2c4ff635faa4..5d8eb0d5ede3 100644 --- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorFactoryTest.java +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorFactoryTest.java @@ -263,7 +263,7 @@ public void testToString() .collect(Collectors.toList()); for (Field field : toStringFields) { - if ("shouldFinalize".equals(field.getName()) || "stringEncoding".equals(field.getName())) { + if ("shouldFinalize".equals(field.getName()) || "stringEncoding".equals(field.getName()) || "processAsArray".equals(field.getName())) { // Skip; not included in the toString if it has the default value. continue; } @@ -290,6 +290,7 @@ public void testResultArraySignature() null, null, null, + false, false ), new HllSketchBuildAggregatorFactory( @@ -299,7 +300,8 @@ public void testResultArraySignature() null, null, null, - true + true, + false ), new HllSketchMergeAggregatorFactory( "hllMerge", @@ -382,7 +384,7 @@ private static class TestHllSketchAggregatorFactory extends HllSketchAggregatorF boolean round ) { - super(name, fieldName, lgK, tgtHllType, stringEncoding, null, round); + super(name, fieldName, lgK, tgtHllType, stringEncoding, null, round, false); } @Override diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorTest.java index b8acb0ce2c22..29adea8b5445 100644 --- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorTest.java +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorTest.java @@ -23,21 +23,34 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import org.apache.druid.data.input.ResourceInputSource; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringEncoding; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.query.NestedDataTestUtils; import org.apache.druid.query.QueryContexts; import org.apache.druid.query.Result; import org.apache.druid.query.aggregation.AggregationTestHelper; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator; import org.apache.druid.query.groupby.GroupByQuery; import org.apache.druid.query.groupby.GroupByQueryConfig; import org.apache.druid.query.groupby.GroupByQueryRunnerTest; import org.apache.druid.query.groupby.ResultRow; import org.apache.druid.query.timeseries.TimeseriesResultValue; +import org.apache.druid.segment.IncrementalIndexSegment; +import org.apache.druid.segment.IndexBuilder; +import org.apache.druid.segment.QueryableIndexSegment; +import org.apache.druid.segment.Segment; +import org.apache.druid.segment.incremental.IncrementalIndexSchema; +import org.apache.druid.segment.transform.TransformSpec; import org.apache.druid.testing.InitializedNullHandlingTest; +import org.apache.druid.timeline.SegmentId; +import org.junit.After; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -46,6 +59,7 @@ import org.junit.runners.Parameterized; import java.io.File; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -69,6 +83,8 @@ public class HllSketchAggregatorTest extends InitializedNullHandlingTest @Rule public final TemporaryFolder timeseriesFolder = new TemporaryFolder(); + private final Closer closer; + public HllSketchAggregatorTest(GroupByQueryConfig config, String vectorize, StringEncoding stringEncoding) { HllSketchModule.registerSerde(); @@ -80,6 +96,7 @@ public HllSketchAggregatorTest(GroupByQueryConfig config, String vectorize, Stri ); this.vectorize = QueryContexts.Vectorize.fromString(vectorize); this.stringEncoding = stringEncoding; + this.closer = Closer.create(); } @Parameterized.Parameters(name = "groupByConfig = {0}, vectorize = {1}, stringEncoding = {2}") @@ -98,6 +115,12 @@ public static Collection constructorFeeder() return constructors; } + @After + public void teardown() throws IOException + { + closer.close(); + } + @Test public void ingestSketches() throws Exception { @@ -417,6 +440,153 @@ public void testPostAggs() throws Exception Assert.assertEquals(expectedSummary, ((HllSketchHolder) row.get(4)).getSketch().toString()); } + @Test + public void testArrays() throws Exception + { + AggregatorFactory[] aggs = new AggregatorFactory[]{ + new HllSketchBuildAggregatorFactory("hll0", "arrayString", null, null, null, false, false, true), + new HllSketchBuildAggregatorFactory("hll1", "arrayLong", null, null, null, false, false, true), + new HllSketchBuildAggregatorFactory("hll2", "arrayDouble", null, null, null, false, false, true), + new HllSketchBuildAggregatorFactory("hll3", "arrayString", null, null, null, false, false, false), + new HllSketchBuildAggregatorFactory("hll4", "arrayLong", null, null, null, false, false, false), + new HllSketchBuildAggregatorFactory("hll5", "arrayDouble", null, null, null, false, false, false) + }; + + IndexBuilder bob = IndexBuilder.create(timeseriesHelper.getObjectMapper()) + .tmpDir(groupByFolder.newFolder()) + .schema( + IncrementalIndexSchema.builder() + .withTimestampSpec(NestedDataTestUtils.TIMESTAMP_SPEC) + .withDimensionsSpec(NestedDataTestUtils.AUTO_DISCOVERY) + .withMetrics(aggs) + .withQueryGranularity(Granularities.NONE) + .withRollup(true) + .withMinTimestamp(0) + .build() + ) + .inputSource( + ResourceInputSource.of( + NestedDataTestUtils.class.getClassLoader(), + NestedDataTestUtils.ARRAY_TYPES_DATA_FILE + ) + ) + .inputFormat(NestedDataTestUtils.DEFAULT_JSON_INPUT_FORMAT) + .transform(TransformSpec.NONE) + .inputTmpDir(groupByFolder.newFolder()); + + List realtimeSegs = ImmutableList.of( + new IncrementalIndexSegment(bob.buildIncrementalIndex(), SegmentId.dummy("test_datasource")) + ); + List segs = ImmutableList.of( + new QueryableIndexSegment(bob.buildMMappedMergedIndex(), SegmentId.dummy("test_datasource")) + ); + + GroupByQuery query = GroupByQuery.builder() + .setDataSource("test_datasource") + .setGranularity(Granularities.ALL) + .setInterval(Intervals.ETERNITY) + .setAggregatorSpecs( + new HllSketchBuildAggregatorFactory("a0", "arrayString", null, null, null, false, false, false), + new HllSketchBuildAggregatorFactory("a1", "arrayLong", null, null, null, false, false, false), + new HllSketchBuildAggregatorFactory("a2", "arrayDouble", null, null, null, false, false, false), + new HllSketchMergeAggregatorFactory("a3", "hll0", null, null, null, false, false), + new HllSketchMergeAggregatorFactory("a4", "hll1", null, null, null, false, false), + new HllSketchMergeAggregatorFactory("a5", "hll2", null, null, null, false, false), + new HllSketchMergeAggregatorFactory("a6", "hll3", null, null, null, false, false), + new HllSketchMergeAggregatorFactory("a7", "hll4", null, null, null, false, false), + new HllSketchMergeAggregatorFactory("a8", "hll5", null, null, null, false, false), + new CountAggregatorFactory("a9") + ) + .setPostAggregatorSpecs( + ImmutableList.of( + new HllSketchToEstimatePostAggregator( + "p0", + new FieldAccessPostAggregator("f0", "a0"), + false + ), + new HllSketchToEstimatePostAggregator( + "p1", + new FieldAccessPostAggregator("f1", "a1"), + false + ), + new HllSketchToEstimatePostAggregator( + "p2", + new FieldAccessPostAggregator("f2", "a2"), + false + ), + // pre-aggregated array counts + new HllSketchToEstimatePostAggregator( + "p3", + new FieldAccessPostAggregator("f3", "a3"), + false + ), + new HllSketchToEstimatePostAggregator( + "p4", + new FieldAccessPostAggregator("f4", "a4"), + false + ), + new HllSketchToEstimatePostAggregator( + "p5", + new FieldAccessPostAggregator("f5", "a5"), + false + ), + // array element counts + new HllSketchToEstimatePostAggregator( + "p6", + new FieldAccessPostAggregator("f6", "a6"), + false + ), + new HllSketchToEstimatePostAggregator( + "p7", + new FieldAccessPostAggregator("f7", "a7"), + false + ), + new HllSketchToEstimatePostAggregator( + "p8", + new FieldAccessPostAggregator("f8", "a8"), + false + ) + ) + ) + .build(); + + Sequence realtimeSeq = groupByHelper.runQueryOnSegmentsObjs(realtimeSegs, query); + Sequence seq = groupByHelper.runQueryOnSegmentsObjs(segs, query); + List realtimeList = realtimeSeq.toList(); + List list = seq.toList(); + + // expect 4 distinct arrays for each of these columns from 14 rows + Assert.assertEquals(1, realtimeList.size()); + Assert.assertEquals(14L, realtimeList.get(0).get(9)); + // array column estimate counts + Assert.assertEquals(4.0, (Double) realtimeList.get(0).get(10), 0.01); + Assert.assertEquals(4.0, (Double) realtimeList.get(0).get(11), 0.01); + Assert.assertEquals(4.0, (Double) realtimeList.get(0).get(12), 0.01); + // pre-aggregated arrays counts + Assert.assertEquals(4.0, (Double) realtimeList.get(0).get(13), 0.01); + Assert.assertEquals(4.0, (Double) realtimeList.get(0).get(14), 0.01); + Assert.assertEquals(4.0, (Double) realtimeList.get(0).get(15), 0.01); + // if processAsArray is false, count is done as string mvds so it counts the total number of elements + Assert.assertEquals(5.0, (Double) realtimeList.get(0).get(16), 0.01); + Assert.assertEquals(4.0, (Double) realtimeList.get(0).get(17), 0.01); + Assert.assertEquals(6.0, (Double) realtimeList.get(0).get(18), 0.01); + + Assert.assertEquals(1, list.size()); + Assert.assertEquals(14L, list.get(0).get(9)); + // array column estimate counts + Assert.assertEquals(4.0, (Double) list.get(0).get(10), 0.01); + Assert.assertEquals(4.0, (Double) list.get(0).get(11), 0.01); + Assert.assertEquals(4.0, (Double) list.get(0).get(12), 0.01); + // pre-aggregated arrays counts + Assert.assertEquals(4.0, (Double) list.get(0).get(13), 0.01); + Assert.assertEquals(4.0, (Double) list.get(0).get(14), 0.01); + Assert.assertEquals(4.0, (Double) list.get(0).get(15), 0.01); + // if processAsArray is false, count is done as string mvds so it counts the total number of elements + Assert.assertEquals(5.0, (Double) list.get(0).get(16), 0.01); + Assert.assertEquals(4.0, (Double) list.get(0).get(17), 0.01); + Assert.assertEquals(6.0, (Double) list.get(0).get(18), 0.01); + } + private static String buildParserJson(List dimensions, List columns) { Map timestampSpec = ImmutableMap.of( diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildAggregatorFactoryTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildAggregatorFactoryTest.java index 51ca671cd0d7..8d8ab6fcd82c 100644 --- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildAggregatorFactoryTest.java +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildAggregatorFactoryTest.java @@ -50,6 +50,7 @@ public void testSerde() throws IOException TgtHllType.HLL_8.name(), StringEncoding.UTF8, false, + true, true ); @@ -57,7 +58,7 @@ public void testSerde() throws IOException Assert.assertEquals( "{\"type\":\"HLLSketchBuild\",\"name\":\"foo\",\"fieldName\":\"bar\",\"lgK\":18,\"tgtHllType\":\"HLL_8\"," - + "\"stringEncoding\":\"utf8\",\"shouldFinalize\":false,\"round\":true}", + + "\"stringEncoding\":\"utf8\",\"shouldFinalize\":false,\"round\":true,\"processAsArray\":true}", serializedString ); @@ -79,6 +80,7 @@ public void testSerdeWithDefaults() throws IOException null, null, null, + false, false ); diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildUtilTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildUtilTest.java index eca5e6f37a4f..98c129d29413 100644 --- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildUtilTest.java +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildUtilTest.java @@ -204,12 +204,12 @@ private void updateSketch(final StringEncoding stringEncoding, final Object firs { // first != null check mimics how updateSketch is called: it's always guarded by a null check on the outer value. if (first != null) { - HllSketchBuildUtil.updateSketch(sketch, stringEncoding, first); + HllSketchBuildUtil.updateSketch(sketch, stringEncoding, first, false); } for (final Object o : others) { if (o != null) { - HllSketchBuildUtil.updateSketch(sketch, stringEncoding, o); + HllSketchBuildUtil.updateSketch(sketch, stringEncoding, o, false); } } } diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeAggregatorFactoryTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeAggregatorFactoryTest.java index 101b25b99be0..01bff8952591 100644 --- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeAggregatorFactoryTest.java +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeAggregatorFactoryTest.java @@ -93,7 +93,8 @@ public void testGetMergingFactoryBadType() throws Exception TGT_HLL_TYPE, STRING_ENCODING, SHOULD_FINALIZE, - ROUND + ROUND, + false ); targetRound.getMergingFactory(other); } diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchSqlAggregatorTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchSqlAggregatorTest.java index 349f1a57d1c0..0ed7322277bf 100644 --- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchSqlAggregatorTest.java +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchSqlAggregatorTest.java @@ -19,11 +19,13 @@ package org.apache.druid.query.aggregation.datasketches.hll.sql; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Joiner; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.inject.Injector; import org.apache.druid.common.config.NullHandling; +import org.apache.druid.data.input.ResourceInputSource; import org.apache.druid.guice.DruidInjectorBuilder; import org.apache.druid.java.util.common.StringEncoding; import org.apache.druid.java.util.common.StringUtils; @@ -32,6 +34,7 @@ import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.query.BaseQuery; import org.apache.druid.query.Druids; +import org.apache.druid.query.NestedDataTestUtils; import org.apache.druid.query.QueryDataSource; import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.query.aggregation.AggregatorFactory; @@ -171,13 +174,13 @@ public class HllSketchSqlAggregatorTest extends BaseCalciteQueryTest */ private static final List EXPECTED_PA_AGGREGATORS = ImmutableList.of( - new HllSketchBuildAggregatorFactory("a0", "dim2", null, null, null, false, true), - new HllSketchBuildAggregatorFactory("a1", "m1", null, null, null, false, true), - new HllSketchBuildAggregatorFactory("a2", "cnt", null, null, null, false, true), - new HllSketchBuildAggregatorFactory("a3", "v0", null, null, null, false, true), - new HllSketchBuildAggregatorFactory("a4", "v1", null, null, null, false, true), - new HllSketchBuildAggregatorFactory("a5", "dim2", null, null, null, true, true), - new HllSketchBuildAggregatorFactory("a6", "dim2", null, null, StringEncoding.UTF8, true, true) + new HllSketchBuildAggregatorFactory("a0", "dim2", null, null, null, false, true, false), + new HllSketchBuildAggregatorFactory("a1", "m1", null, null, null, false, true, false), + new HllSketchBuildAggregatorFactory("a2", "cnt", null, null, null, false, true, false), + new HllSketchBuildAggregatorFactory("a3", "v0", null, null, null, false, true, false), + new HllSketchBuildAggregatorFactory("a4", "v1", null, null, null, false, true, false), + new HllSketchBuildAggregatorFactory("a5", "dim2", null, null, null, true, true, false), + new HllSketchBuildAggregatorFactory("a6", "dim2", null, null, StringEncoding.UTF8, true, true, false) ); /** @@ -187,7 +190,10 @@ public class HllSketchSqlAggregatorTest extends BaseCalciteQueryTest private static final List EXPECTED_FILTERED_AGGREGATORS = EXPECTED_PA_AGGREGATORS.stream() .limit(5) - .map(factory -> new FilteredAggregatorFactory(factory, equality("dim2", "a", ColumnType.STRING))) + .map(factory -> new FilteredAggregatorFactory( + factory, + equality("dim2", "a", ColumnType.STRING) + )) .collect(Collectors.toList()); /** @@ -254,8 +260,9 @@ public SpecificSegmentsQuerySegmentWalker createQuerySegmentWalker( ) throws IOException { HllSketchModule.registerSerde(); + ObjectMapper mapper = injector.getInstance(ObjectMapper.class); final QueryableIndex index = IndexBuilder - .create() + .create(mapper) .tmpDir(temporaryFolder.newFolder()) .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance()) .schema( @@ -263,12 +270,12 @@ public SpecificSegmentsQuerySegmentWalker createQuerySegmentWalker( .withMetrics( new CountAggregatorFactory("cnt"), new DoubleSumAggregatorFactory("m1", "m1"), - new HllSketchBuildAggregatorFactory("hllsketch_dim1", "dim1", null, null, null, false, ROUND), - new HllSketchBuildAggregatorFactory("hllsketch_dim3", "dim3", null, null, null, false, false), - new HllSketchBuildAggregatorFactory("hllsketch_m1", "m1", null, null, null, false, ROUND), - new HllSketchBuildAggregatorFactory("hllsketch_f1", "f1", null, null, null, false, ROUND), - new HllSketchBuildAggregatorFactory("hllsketch_l1", "l1", null, null, null, false, ROUND), - new HllSketchBuildAggregatorFactory("hllsketch_d1", "d1", null, null, null, false, ROUND) + new HllSketchBuildAggregatorFactory("hllsketch_dim1", "dim1", null, null, null, false, ROUND, false), + new HllSketchBuildAggregatorFactory("hllsketch_dim3", "dim3", null, null, null, false, false, false), + new HllSketchBuildAggregatorFactory("hllsketch_m1", "m1", null, null, null, false, ROUND, false), + new HllSketchBuildAggregatorFactory("hllsketch_f1", "f1", null, null, null, false, ROUND, false), + new HllSketchBuildAggregatorFactory("hllsketch_l1", "l1", null, null, null, false, ROUND, false), + new HllSketchBuildAggregatorFactory("hllsketch_d1", "d1", null, null, null, false, ROUND, false) ) .withRollup(false) .build() @@ -276,6 +283,36 @@ public SpecificSegmentsQuerySegmentWalker createQuerySegmentWalker( .rows(TestDataBuilder.ROWS1_WITH_NUMERIC_DIMS) .buildMMappedIndex(); + final QueryableIndex indexAllTypesAuto = + IndexBuilder.create(mapper) + .tmpDir(temporaryFolder.newFolder()) + .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance()) + .schema( + new IncrementalIndexSchema.Builder() + .withTimestampSpec(NestedDataTestUtils.AUTO_SCHEMA.getTimestampSpec()) + .withDimensionsSpec(NestedDataTestUtils.AUTO_SCHEMA.getDimensionsSpec()) + .withMetrics( + new HllSketchBuildAggregatorFactory("hll0", "arrayString", null, null, null, false, false, true), + new HllSketchBuildAggregatorFactory("hll1", "arrayLong", null, null, null, false, false, true), + new HllSketchBuildAggregatorFactory("hll2", "arrayDouble", null, null, null, false, false, true), + new HllSketchBuildAggregatorFactory("hll3", "arrayString", null, null, null, false, false, false), + new HllSketchBuildAggregatorFactory("hll4", "arrayLong", null, null, null, false, false, false), + new HllSketchBuildAggregatorFactory("hll5", "arrayDouble", null, null, null, false, false, false), + new CountAggregatorFactory("cnt") + ) + .withRollup(false) + .build() + ) + .inputSource( + ResourceInputSource.of( + NestedDataTestUtils.class.getClassLoader(), + NestedDataTestUtils.ALL_TYPES_TEST_DATA_FILE + ) + ) + .inputFormat(TestDataBuilder.DEFAULT_JSON_INPUT_FORMAT) + .inputTmpDir(temporaryFolder.newFolder()) + .buildMMappedIndex(); + return new SpecificSegmentsQuerySegmentWalker(conglomerate).add( DataSegment.builder() .dataSource(CalciteTests.DATASOURCE1) @@ -285,6 +322,15 @@ public SpecificSegmentsQuerySegmentWalker createQuerySegmentWalker( .size(0) .build(), index + ).add( + DataSegment.builder() + .dataSource("all_types") + .interval(indexAllTypesAuto.getDataInterval()) + .version("1") + .shardSpec(new LinearShardSpec(0)) + .size(0) + .build(), + indexAllTypesAuto ); } @@ -341,13 +387,13 @@ public void testApproxCountDistinctHllSketch() .aggregators( ImmutableList.of( new LongSumAggregatorFactory("a0", "cnt"), - new HllSketchBuildAggregatorFactory("a1", "dim2", null, null, null, null, ROUND), + new HllSketchBuildAggregatorFactory("a1", "dim2", null, null, null, null, ROUND, false), new FilteredAggregatorFactory( - new HllSketchBuildAggregatorFactory("a2", "dim2", null, null, null, null, ROUND), + new HllSketchBuildAggregatorFactory("a2", "dim2", null, null, null, null, ROUND, false), not(equality("dim2", "", ColumnType.STRING)) ), - new HllSketchBuildAggregatorFactory("a3", "v0", null, null, null, null, ROUND), - new HllSketchBuildAggregatorFactory("a4", "v1", null, null, null, null, ROUND), + new HllSketchBuildAggregatorFactory("a3", "v0", null, null, null, null, ROUND, false), + new HllSketchBuildAggregatorFactory("a4", "v1", null, null, null, null, ROUND, false), new HllSketchMergeAggregatorFactory("a5", "hllsketch_dim1", 21, "HLL_8", null, null, ROUND), new HllSketchMergeAggregatorFactory("a6", "hllsketch_dim1", null, null, null, null, ROUND), new HllSketchMergeAggregatorFactory("a7", "hllsketch_dim1", 21, "HLL_4", null, null, ROUND) @@ -400,7 +446,8 @@ public void testAvgDailyCountDistinctHllSketch() null, null, null, - ROUND + ROUND, + false ) ) ) @@ -477,7 +524,7 @@ public void testApproxCountDistinctHllSketchIsRounded() .setGranularity(Granularities.ALL) .setAggregatorSpecs( aggregators( - new HllSketchBuildAggregatorFactory("a0", "m1", null, null, null, true, true) + new HllSketchBuildAggregatorFactory("a0", "m1", null, null, null, true, true, false) ) ) .setHavingSpec(having(equality("a0", 2L, ColumnType.LONG))) @@ -727,11 +774,11 @@ public void testHllSketchPostAggsFinalizeOuterSketches() ) .aggregators( ImmutableList.of( - new HllSketchBuildAggregatorFactory("a0", "dim2", null, null, null, true, true), - new HllSketchBuildAggregatorFactory("a1", "m1", null, null, null, true, true), - new HllSketchBuildAggregatorFactory("a2", "v0", null, null, null, true, true), - new HllSketchBuildAggregatorFactory("a3", "v1", null, null, null, true, true), - new HllSketchBuildAggregatorFactory("a4", "dim2", null, null, null, true, true) + new HllSketchBuildAggregatorFactory("a0", "dim2", null, null, null, true, true, false), + new HllSketchBuildAggregatorFactory("a1", "m1", null, null, null, true, true, false), + new HllSketchBuildAggregatorFactory("a2", "v0", null, null, null, true, true, false), + new HllSketchBuildAggregatorFactory("a3", "v1", null, null, null, true, true, false), + new HllSketchBuildAggregatorFactory("a4", "dim2", null, null, null, true, true, false) ) ) .postAggregators( @@ -817,7 +864,8 @@ public void testtHllSketchPostAggsPostSort() null, null, false, - true + true, + false ) ) ) @@ -863,7 +911,8 @@ public void testEmptyTimeseriesResults() null, null, null, - true + true, + false ), new HllSketchBuildAggregatorFactory( "a1", @@ -872,7 +921,8 @@ public void testEmptyTimeseriesResults() null, null, false, - true + true, + false ) ) ) @@ -909,7 +959,8 @@ public void testGroupByAggregatorDefaultValues() null, null, null, - true + true, + false ), equality("dim1", "nonexistent", ColumnType.STRING) ), @@ -921,7 +972,8 @@ public void testGroupByAggregatorDefaultValues() null, null, false, - true + true, + false ), equality("dim1", "nonexistent", ColumnType.STRING) ) @@ -961,11 +1013,11 @@ public void testGroupByAggregatorDefaultValuesFinalizeOuterSketches() .setAggregatorSpecs( aggregators( new FilteredAggregatorFactory( - new HllSketchBuildAggregatorFactory("a0", "v0", null, null, null, null, true), + new HllSketchBuildAggregatorFactory("a0", "v0", null, null, null, null, true, false), equality("dim1", "nonexistent", ColumnType.STRING) ), new FilteredAggregatorFactory( - new HllSketchBuildAggregatorFactory("a1", "v0", null, null, null, null, true), + new HllSketchBuildAggregatorFactory("a1", "v0", null, null, null, null, true, false), equality("dim1", "nonexistent", ColumnType.STRING) ) ) @@ -1260,6 +1312,104 @@ public void testFloatAndDoubleAreConsideredTheSame() ); } + @Test + public void testArrays() + { + testQuery( + "SELECT" + + " HLL_SKETCH_ESTIMATE(DS_HLL(arrayString))," + + " HLL_SKETCH_ESTIMATE(DS_HLL(arrayLong))," + + " HLL_SKETCH_ESTIMATE(DS_HLL(arrayDouble))," + + " HLL_SKETCH_ESTIMATE(DS_HLL(hll0))," + + " HLL_SKETCH_ESTIMATE(DS_HLL(hll1))," + + " HLL_SKETCH_ESTIMATE(DS_HLL(hll2))," + + " HLL_SKETCH_ESTIMATE(DS_HLL(hll3))," + + " HLL_SKETCH_ESTIMATE(DS_HLL(hll4))," + + " HLL_SKETCH_ESTIMATE(DS_HLL(hll5))" + + " FROM druid.all_types", + ImmutableList.of( + Druids.newTimeseriesQueryBuilder() + .dataSource("all_types") + .intervals(querySegmentSpec(Filtration.eternity())) + .granularity(Granularities.ALL) + .aggregators( + new HllSketchBuildAggregatorFactory("a0", "arrayString", null, null, null, false, true, true), + new HllSketchBuildAggregatorFactory("a1", "arrayLong", null, null, null, false, true, true), + new HllSketchBuildAggregatorFactory("a2", "arrayDouble", null, null, null, false, true, true), + new HllSketchMergeAggregatorFactory("a3", "hll0", null, null, null, false, true), + new HllSketchMergeAggregatorFactory("a4", "hll1", null, null, null, false, true), + new HllSketchMergeAggregatorFactory("a5", "hll2", null, null, null, false, true), + new HllSketchMergeAggregatorFactory("a6", "hll3", null, null, null, false, true), + new HllSketchMergeAggregatorFactory("a7", "hll4", null, null, null, false, true), + new HllSketchMergeAggregatorFactory("a8", "hll5", null, null, null, false, true) + ) + .postAggregators( + new HllSketchToEstimatePostAggregator( + "p1", + new FieldAccessPostAggregator("p0", "a0"), + false + ), + new HllSketchToEstimatePostAggregator( + "p3", + new FieldAccessPostAggregator("p2", "a1"), + false + ), + new HllSketchToEstimatePostAggregator( + "p5", + new FieldAccessPostAggregator("p4", "a2"), + false + ), + // pre-aggregated array counts + new HllSketchToEstimatePostAggregator( + "p7", + new FieldAccessPostAggregator("p6", "a3"), + false + ), + new HllSketchToEstimatePostAggregator( + "p9", + new FieldAccessPostAggregator("p8", "a4"), + false + ), + new HllSketchToEstimatePostAggregator( + "p11", + new FieldAccessPostAggregator("p10", "a5"), + false + ), + // array element counts + new HllSketchToEstimatePostAggregator( + "p13", + new FieldAccessPostAggregator("p12", "a6"), + false + ), + new HllSketchToEstimatePostAggregator( + "p15", + new FieldAccessPostAggregator("p14", "a7"), + false + ), + new HllSketchToEstimatePostAggregator( + "p17", + new FieldAccessPostAggregator("p16", "a8"), + false + ) + ) + .context(QUERY_CONTEXT_DEFAULT) + .build() + ), + ImmutableList.of( + new Object[]{ + 4.000000029802323D, + 4.000000029802323D, + 4.000000029802323D, + 4.000000029802323D, + 4.000000029802323D, + 4.000000029802323D, + 5.000000049670538D, + 4.000000029802323D, + 6.000000074505807D} + ) + ); + } + private ExpressionVirtualColumn makeSketchEstimateExpression(String outputName, String field) { return new ExpressionVirtualColumn( diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/BufferHashGrouperUsingSketchMergeAggregatorFactoryTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/BufferHashGrouperUsingSketchMergeAggregatorFactoryTest.java index f6d922ba3ae6..7e90ba3398c9 100644 --- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/BufferHashGrouperUsingSketchMergeAggregatorFactoryTest.java +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/BufferHashGrouperUsingSketchMergeAggregatorFactoryTest.java @@ -52,7 +52,7 @@ private static BufferHashGrouper makeGrouper( AggregatorAdapters.factorizeBuffered( columnSelectorFactory, ImmutableList.of( - new SketchMergeAggregatorFactory("sketch", "sketch", 16, false, true, 2), + new SketchMergeAggregatorFactory("sketch", "sketch", 16, false, true, 2, false), new CountAggregatorFactory("count") ) ), diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregationTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregationTest.java index 9487fb8651ef..91a8f5de8e37 100644 --- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregationTest.java +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregationTest.java @@ -31,14 +31,19 @@ import org.apache.datasketches.theta.Union; import org.apache.datasketches.theta.UpdateSketch; import org.apache.druid.data.input.MapBasedRow; +import org.apache.druid.data.input.ResourceInputSource; import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.query.NestedDataTestUtils; import org.apache.druid.query.Query; import org.apache.druid.query.QueryContexts; import org.apache.druid.query.aggregation.AggregationTestHelper; import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.aggregation.PostAggregator; import org.apache.druid.query.aggregation.TestObjectColumnSelector; import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator; @@ -48,6 +53,13 @@ import org.apache.druid.query.groupby.ResultRow; import org.apache.druid.query.groupby.epinephelinae.GroupByTestColumnSelectorFactory; import org.apache.druid.query.groupby.epinephelinae.GrouperTestUtil; +import org.apache.druid.segment.IncrementalIndexSegment; +import org.apache.druid.segment.IndexBuilder; +import org.apache.druid.segment.QueryableIndexSegment; +import org.apache.druid.segment.Segment; +import org.apache.druid.segment.incremental.IncrementalIndexSchema; +import org.apache.druid.segment.transform.TransformSpec; +import org.apache.druid.timeline.SegmentId; import org.junit.After; import org.junit.Assert; import org.junit.Rule; @@ -78,6 +90,8 @@ public class SketchAggregationTest @Rule public final TemporaryFolder tempFolder = new TemporaryFolder(); + private final Closer closer; + public SketchAggregationTest(final GroupByQueryConfig config, final String vectorize) { SketchModule.registerSerde(); @@ -87,6 +101,7 @@ public SketchAggregationTest(final GroupByQueryConfig config, final String vecto tempFolder ); this.vectorize = QueryContexts.Vectorize.fromString(vectorize); + this.closer = Closer.create(); } @Parameterized.Parameters(name = "config = {0}, vectorize = {1}") @@ -104,6 +119,7 @@ public static Collection constructorFeeder() @After public void teardown() throws IOException { + closer.close(); helper.close(); } @@ -299,10 +315,10 @@ public void testThetaCardinalityOnSimpleColumn() throws Exception @Test public void testSketchMergeAggregatorFactorySerde() throws Exception { - assertAggregatorFactorySerde(new SketchMergeAggregatorFactory("name", "fieldName", 16, null, null, null)); - assertAggregatorFactorySerde(new SketchMergeAggregatorFactory("name", "fieldName", 16, false, true, null)); - assertAggregatorFactorySerde(new SketchMergeAggregatorFactory("name", "fieldName", 16, true, false, null)); - assertAggregatorFactorySerde(new SketchMergeAggregatorFactory("name", "fieldName", 16, true, false, 2)); + assertAggregatorFactorySerde(new SketchMergeAggregatorFactory("name", "fieldName", 16, null, null, null, false)); + assertAggregatorFactorySerde(new SketchMergeAggregatorFactory("name", "fieldName", 16, false, true, null, false)); + assertAggregatorFactorySerde(new SketchMergeAggregatorFactory("name", "fieldName", 16, true, false, null, false)); + assertAggregatorFactorySerde(new SketchMergeAggregatorFactory("name", "fieldName", 16, true, false, 2, false)); } @Test @@ -310,16 +326,16 @@ public void testSketchMergeFinalization() { SketchHolder sketch = SketchHolder.of(Sketches.updateSketchBuilder().setNominalEntries(128).build()); - SketchMergeAggregatorFactory agg = new SketchMergeAggregatorFactory("name", "fieldName", 16, null, null, null); + SketchMergeAggregatorFactory agg = new SketchMergeAggregatorFactory("name", "fieldName", 16, null, null, null, false); Assert.assertEquals(0.0, ((Double) agg.finalizeComputation(sketch)).doubleValue(), 0.0001); - agg = new SketchMergeAggregatorFactory("name", "fieldName", 16, true, null, null); + agg = new SketchMergeAggregatorFactory("name", "fieldName", 16, true, null, null, false); Assert.assertEquals(0.0, ((Double) agg.finalizeComputation(sketch)).doubleValue(), 0.0001); - agg = new SketchMergeAggregatorFactory("name", "fieldName", 16, false, null, null); + agg = new SketchMergeAggregatorFactory("name", "fieldName", 16, false, null, null, false); Assert.assertEquals(sketch, agg.finalizeComputation(sketch)); - agg = new SketchMergeAggregatorFactory("name", "fieldName", 16, true, null, 2); + agg = new SketchMergeAggregatorFactory("name", "fieldName", 16, true, null, 2, false); SketchEstimateWithErrorBounds est = (SketchEstimateWithErrorBounds) agg.finalizeComputation(sketch); Assert.assertEquals(0.0, est.getEstimate(), 0.0001); Assert.assertEquals(0.0, est.getHighBound(), 0.0001); @@ -328,6 +344,270 @@ public void testSketchMergeFinalization() } + @Test + public void testArrays() throws Exception + { + AggregatorFactory[] aggs = new AggregatorFactory[]{ + new SketchMergeAggregatorFactory( + "sketch0", + "arrayString", + null, + null, + null, + null, + true + ), + new SketchMergeAggregatorFactory( + "sketch1", + "arrayLong", + null, + null, + null, + null, + true + ), + new SketchMergeAggregatorFactory( + "sketch2", + "arrayDouble", + null, + null, + null, + null, + true + ), + new SketchMergeAggregatorFactory( + "sketch3", + "arrayString", + null, + null, + null, + null, + false + ), + new SketchMergeAggregatorFactory( + "sketch4", + "arrayLong", + null, + null, + null, + null, + false + ), + new SketchMergeAggregatorFactory( + "sketch5", + "arrayDouble", + null, + null, + null, + null, + false + ) + }; + IndexBuilder bob = IndexBuilder.create(helper.getObjectMapper()) + .tmpDir(tempFolder.newFolder()) + .schema( + IncrementalIndexSchema.builder() + .withTimestampSpec(NestedDataTestUtils.TIMESTAMP_SPEC) + .withDimensionsSpec(NestedDataTestUtils.AUTO_DISCOVERY) + .withMetrics(aggs) + .withQueryGranularity(Granularities.NONE) + .withRollup(true) + .withMinTimestamp(0) + .build() + ) + .inputSource( + ResourceInputSource.of( + NestedDataTestUtils.class.getClassLoader(), + NestedDataTestUtils.ARRAY_TYPES_DATA_FILE + ) + ) + .inputFormat(NestedDataTestUtils.DEFAULT_JSON_INPUT_FORMAT) + .transform(TransformSpec.NONE) + .inputTmpDir(tempFolder.newFolder()); + + List realtimeSegs = ImmutableList.of( + new IncrementalIndexSegment(bob.buildIncrementalIndex(), SegmentId.dummy("test_datasource")) + ); + List segs = ImmutableList.of( + new QueryableIndexSegment(bob.buildMMappedMergedIndex(), SegmentId.dummy("test_datasource")) + ); + + GroupByQuery query = GroupByQuery.builder() + .setDataSource("test_datasource") + .setGranularity(Granularities.ALL) + .setInterval(Intervals.ETERNITY) + .setAggregatorSpecs( + new SketchMergeAggregatorFactory( + "a0", + "arrayString", + null, + null, + null, + null, + true + ), + new SketchMergeAggregatorFactory( + "a1", + "arrayLong", + null, + null, + null, + null, + true + ), + new SketchMergeAggregatorFactory( + "a2", + "arrayDouble", + null, + null, + null, + null, + true + ), + new SketchMergeAggregatorFactory( + "a3", + "sketch0", + null, + null, + true, + null, + false + ), + new SketchMergeAggregatorFactory( + "a4", + "sketch1", + null, + null, + true, + null, + false + ), + new SketchMergeAggregatorFactory( + "a5", + "sketch2", + null, + null, + true, + null, + false + ), + new SketchMergeAggregatorFactory( + "a6", + "sketch3", + null, + null, + true, + null, + false + ), + new SketchMergeAggregatorFactory( + "a7", + "sketch4", + null, + null, + true, + null, + false + ), + new SketchMergeAggregatorFactory( + "a8", + "sketch5", + null, + null, + true, + null, + false + ), + new CountAggregatorFactory("a9") + ) + .setPostAggregatorSpecs( + ImmutableList.of( + new SketchEstimatePostAggregator( + "p0", + new FieldAccessPostAggregator("f0", "a0"), + null + ), + new SketchEstimatePostAggregator( + "p1", + new FieldAccessPostAggregator("f1", "a1"), + null + ), + new SketchEstimatePostAggregator( + "p2", + new FieldAccessPostAggregator("f2", "a2"), + null + ), + new SketchEstimatePostAggregator( + "p3", + new FieldAccessPostAggregator("f3", "a3"), + null + ), + new SketchEstimatePostAggregator( + "p4", + new FieldAccessPostAggregator("f4", "a4"), + null + ), + new SketchEstimatePostAggregator( + "p5", + new FieldAccessPostAggregator("f5", "a5"), + null + ), + new SketchEstimatePostAggregator( + "p6", + new FieldAccessPostAggregator("f6", "a6"), + null + ), + new SketchEstimatePostAggregator( + "p7", + new FieldAccessPostAggregator("f7", "a7"), + null + ), + new SketchEstimatePostAggregator( + "p8", + new FieldAccessPostAggregator("f8", "a8"), + null + ) + ) + ) + .build(); + + Sequence realtimeSeq = helper.runQueryOnSegmentsObjs(realtimeSegs, query); + Sequence seq = helper.runQueryOnSegmentsObjs(segs, query); + List realtimeList = realtimeSeq.toList(); + List list = seq.toList(); + + // expect 4 distinct arrays for each of these columns from 14 rows + Assert.assertEquals(1, realtimeList.size()); + Assert.assertEquals(14L, realtimeList.get(0).get(9)); + // array column estimate counts + Assert.assertEquals(4.0, (Double) realtimeList.get(0).get(10), 0.01); + Assert.assertEquals(4.0, (Double) realtimeList.get(0).get(11), 0.01); + Assert.assertEquals(4.0, (Double) realtimeList.get(0).get(12), 0.01); + // pre-aggregated arrays counts + Assert.assertEquals(4.0, (Double) realtimeList.get(0).get(13), 0.01); + Assert.assertEquals(4.0, (Double) realtimeList.get(0).get(14), 0.01); + Assert.assertEquals(4.0, (Double) realtimeList.get(0).get(15), 0.01); + // if processAsArray is false, count is done as string mvds so it counts the total number of elements + Assert.assertEquals(5.0, (Double) realtimeList.get(0).get(16), 0.01); + Assert.assertEquals(4.0, (Double) realtimeList.get(0).get(17), 0.01); + Assert.assertEquals(6.0, (Double) realtimeList.get(0).get(18), 0.01); + + Assert.assertEquals(1, list.size()); + Assert.assertEquals(14L, list.get(0).get(9)); + // array column estimate counts + Assert.assertEquals(4.0, (Double) list.get(0).get(10), 0.01); + Assert.assertEquals(4.0, (Double) list.get(0).get(11), 0.01); + Assert.assertEquals(4.0, (Double) list.get(0).get(12), 0.01); + // pre-aggregated arrays counts + Assert.assertEquals(4.0, (Double) list.get(0).get(13), 0.01); + Assert.assertEquals(4.0, (Double) list.get(0).get(14), 0.01); + Assert.assertEquals(4.0, (Double) list.get(0).get(15), 0.01); + // if processAsArray is false, count is done as string mvds so it counts the total number of elements + Assert.assertEquals(5.0, (Double) list.get(0).get(16), 0.01); + Assert.assertEquals(4.0, (Double) list.get(0).get(17), 0.01); + Assert.assertEquals(6.0, (Double) list.get(0).get(18), 0.01); + } + private void assertAggregatorFactorySerde(AggregatorFactory agg) throws Exception { Assert.assertEquals( @@ -404,7 +684,8 @@ public void testCacheKey() 16, null, null, - null + null, + false ); final SketchMergeAggregatorFactory factory2 = new SketchMergeAggregatorFactory( "name", @@ -412,7 +693,8 @@ public void testCacheKey() 16, null, null, - null + null, + false ); final SketchMergeAggregatorFactory factory3 = new SketchMergeAggregatorFactory( "name", @@ -420,7 +702,8 @@ public void testCacheKey() 32, null, null, - null + null, + false ); Assert.assertTrue(Arrays.equals(factory1.getCacheKey(), factory2.getCacheKey())); @@ -505,7 +788,7 @@ public void testRelocation() columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("sketch", sketchHolder))); SketchHolder[] holders = helper.runRelocateVerificationTest( - new SketchMergeAggregatorFactory("sketch", "sketch", 16, false, true, 2), + new SketchMergeAggregatorFactory("sketch", "sketch", 16, false, true, 2, false), columnSelectorFactory, SketchHolder.class ); @@ -522,7 +805,7 @@ public void testUpdateUnionWithNullInList() value.add("bar"); List[] columnValues = new List[]{value}; final TestObjectColumnSelector selector = new TestObjectColumnSelector(columnValues); - final Aggregator agg = new SketchAggregator(selector, 4096); + final Aggregator agg = new SketchAggregator(selector, 4096, false); agg.aggregate(); Assert.assertFalse(agg.isNull()); Assert.assertNotNull(agg.get()); @@ -537,7 +820,7 @@ public void testUpdateUnionWithDouble() { Double[] columnValues = new Double[]{2.0}; final TestObjectColumnSelector selector = new TestObjectColumnSelector(columnValues); - final Aggregator agg = new SketchAggregator(selector, 4096); + final Aggregator agg = new SketchAggregator(selector, 4096, false); agg.aggregate(); Assert.assertFalse(agg.isNull()); Assert.assertNotNull(agg.get()); @@ -556,7 +839,7 @@ public void testAggregateWithSize() } final TestObjectColumnSelector selector = new TestObjectColumnSelector<>(columnValues); - final SketchAggregator agg = new SketchAggregator(selector, 128); + final SketchAggregator agg = new SketchAggregator(selector, 128, false); // Verify initial size of sketch Assert.assertEquals(48L, agg.getInitialSizeBytes()); diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactoryTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactoryTest.java index 1d70ff30f251..5e9104cf1c77 100644 --- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactoryTest.java +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactoryTest.java @@ -41,10 +41,10 @@ public class SketchAggregatorFactoryTest { private static final SketchMergeAggregatorFactory AGGREGATOR_16384 = - new SketchMergeAggregatorFactory("x", "x", 16384, null, false, null); + new SketchMergeAggregatorFactory("x", "x", 16384, null, false, null, false); private static final SketchMergeAggregatorFactory AGGREGATOR_32768 = - new SketchMergeAggregatorFactory("x", "x", 32768, null, false, null); + new SketchMergeAggregatorFactory("x", "x", 32768, null, false, null, false); @Test public void testGuessAggregatorHeapFootprint() @@ -94,8 +94,8 @@ public void testResultArraySignature() new OldSketchBuildAggregatorFactory("oldBuild", "col", 16), new OldSketchMergeAggregatorFactory("oldMerge", "col", 16, false), new OldSketchMergeAggregatorFactory("oldMergeFinalize", "col", 16, true), - new SketchMergeAggregatorFactory("merge", "col", 16, false, false, null), - new SketchMergeAggregatorFactory("mergeFinalize", "col", 16, true, false, null) + new SketchMergeAggregatorFactory("merge", "col", 16, false, false, null, false), + new SketchMergeAggregatorFactory("mergeFinalize", "col", 16, true, false, null, false) ) .postAggregators( new FieldAccessPostAggregator("oldBuild-access", "oldBuild"), diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/SketchToStringPostAggregatorTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/SketchToStringPostAggregatorTest.java index f2d19af813d1..b52d37a0b276 100644 --- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/SketchToStringPostAggregatorTest.java +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/SketchToStringPostAggregatorTest.java @@ -111,7 +111,7 @@ public void testCompute() { // not going to iterate over the selector since getting a summary of an empty sketch is sufficient final TestObjectColumnSelector selector = new TestObjectColumnSelector(new Object[0]); - final Aggregator agg = new SketchAggregator(selector, 4096); + final Aggregator agg = new SketchAggregator(selector, 4096, false); final Map fields = new HashMap<>(); fields.put("sketch", agg.get()); diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchSqlAggregatorTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchSqlAggregatorTest.java index 3946ce558b19..d6ab8a9cf1c6 100644 --- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchSqlAggregatorTest.java +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/sql/ThetaSketchSqlAggregatorTest.java @@ -19,16 +19,19 @@ package org.apache.druid.query.aggregation.datasketches.theta.sql; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.inject.Injector; import org.apache.druid.common.config.NullHandling; +import org.apache.druid.data.input.ResourceInputSource; import org.apache.druid.guice.DruidInjectorBuilder; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.PeriodGranularity; import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.query.Druids; +import org.apache.druid.query.NestedDataTestUtils; import org.apache.druid.query.QueryDataSource; import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.query.aggregation.CountAggregatorFactory; @@ -115,7 +118,8 @@ public SpecificSegmentsQuerySegmentWalker createQuerySegmentWalker( { SketchModule.registerSerde(); - final QueryableIndex index = IndexBuilder.create() + ObjectMapper mapper = injector.getInstance(ObjectMapper.class); + final QueryableIndex index = IndexBuilder.create(mapper) .tmpDir(temporaryFolder.newFolder()) .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance()) .schema( @@ -129,7 +133,8 @@ public SpecificSegmentsQuerySegmentWalker createQuerySegmentWalker( null, false, false, - null + null, + false ) ) .withRollup(false) @@ -138,6 +143,84 @@ public SpecificSegmentsQuerySegmentWalker createQuerySegmentWalker( .rows(TestDataBuilder.ROWS1) .buildMMappedIndex(); + final QueryableIndex indexAllTypesAuto = + IndexBuilder.create(mapper) + .tmpDir(temporaryFolder.newFolder()) + .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance()) + .schema( + new IncrementalIndexSchema.Builder() + .withTimestampSpec(NestedDataTestUtils.AUTO_SCHEMA.getTimestampSpec()) + .withDimensionsSpec(NestedDataTestUtils.AUTO_SCHEMA.getDimensionsSpec()) + .withMetrics( + new SketchMergeAggregatorFactory( + "sketch0", + "arrayString", + null, + null, + null, + null, + true + ), + new SketchMergeAggregatorFactory( + "sketch1", + "arrayLong", + null, + null, + null, + null, + true + ), + new SketchMergeAggregatorFactory( + "sketch2", + "arrayDouble", + null, + null, + null, + null, + true + ), + new SketchMergeAggregatorFactory( + "sketch3", + "arrayString", + null, + null, + null, + null, + false + ), + new SketchMergeAggregatorFactory( + "sketch4", + "arrayLong", + null, + null, + null, + null, + false + ), + new SketchMergeAggregatorFactory( + "sketch5", + "arrayDouble", + null, + null, + null, + null, + false + ), + new CountAggregatorFactory("cnt") + ) + .withRollup(false) + .build() + ) + .inputSource( + ResourceInputSource.of( + NestedDataTestUtils.class.getClassLoader(), + NestedDataTestUtils.ALL_TYPES_TEST_DATA_FILE + ) + ) + .inputFormat(TestDataBuilder.DEFAULT_JSON_INPUT_FORMAT) + .inputTmpDir(temporaryFolder.newFolder()) + .buildMMappedIndex(); + return new SpecificSegmentsQuerySegmentWalker(conglomerate).add( DataSegment.builder() .dataSource(DATA_SOURCE) @@ -147,6 +230,15 @@ public SpecificSegmentsQuerySegmentWalker createQuerySegmentWalker( .size(0) .build(), index + ).add( + DataSegment.builder() + .dataSource("all_types") + .interval(indexAllTypesAuto.getDataInterval()) + .version("1") + .shardSpec(new LinearShardSpec(0)) + .size(0) + .build(), + indexAllTypesAuto ); } @@ -230,7 +322,8 @@ public void testApproxCountDistinctThetaSketch() null, null, null, - null + null, + false ), new FilteredAggregatorFactory( new SketchMergeAggregatorFactory( @@ -239,7 +332,8 @@ public void testApproxCountDistinctThetaSketch() null, null, null, - null + null, + false ), not(equality("dim2", "", ColumnType.STRING)) ), @@ -249,7 +343,8 @@ public void testApproxCountDistinctThetaSketch() null, null, null, - null + null, + false ), new SketchMergeAggregatorFactory( "a4", @@ -257,10 +352,11 @@ public void testApproxCountDistinctThetaSketch() null, null, null, - null + null, + false ), - new SketchMergeAggregatorFactory("a5", "thetasketch_dim1", 32768, null, null, null), - new SketchMergeAggregatorFactory("a6", "thetasketch_dim1", null, null, null, null) + new SketchMergeAggregatorFactory("a5", "thetasketch_dim1", 32768, null, null, null, false), + new SketchMergeAggregatorFactory("a6", "thetasketch_dim1", null, null, null, null, false) ) ) .context(QUERY_CONTEXT_DEFAULT) @@ -307,7 +403,8 @@ public void testAvgDailyCountDistinctThetaSketch() null, null, null, - null + null, + false ) ) ) @@ -433,7 +530,8 @@ public void testThetaSketchPostAggs() null, false, null, - null + null, + false ), new SketchMergeAggregatorFactory( "a2", @@ -441,7 +539,8 @@ public void testThetaSketchPostAggs() null, false, null, - null + null, + false ), new SketchMergeAggregatorFactory( "a3", @@ -449,7 +548,8 @@ public void testThetaSketchPostAggs() null, false, null, - null + null, + false ) ) ) @@ -610,7 +710,8 @@ public void testThetaSketchPostAggsFinalizeOuterSketches() null, null, null, - null + null, + false ), new SketchMergeAggregatorFactory( "a2", @@ -618,7 +719,8 @@ public void testThetaSketchPostAggsFinalizeOuterSketches() null, null, null, - null + null, + false ), new SketchMergeAggregatorFactory( "a3", @@ -626,7 +728,8 @@ public void testThetaSketchPostAggsFinalizeOuterSketches() null, null, null, - null + null, + false ) ) ) @@ -736,7 +839,8 @@ public void testThetaSketchPostAggsPostSort() null, false, null, - null + null, + false ) ) ) @@ -792,7 +896,8 @@ public void testThetaSketchPostAggsPostSortFinalizeOuterSketches() null, null, null, - null + null, + false ) ) ) @@ -840,7 +945,8 @@ public void testEmptyTimeseriesResults() null, null, null, - null + null, + false ), new SketchMergeAggregatorFactory( "a1", @@ -848,7 +954,8 @@ public void testEmptyTimeseriesResults() null, null, null, - null + null, + false ), new SketchMergeAggregatorFactory( "a2", @@ -856,7 +963,8 @@ public void testEmptyTimeseriesResults() 1024, false, null, - null + null, + false ), new SketchMergeAggregatorFactory( "a3", @@ -864,7 +972,8 @@ public void testEmptyTimeseriesResults() 1024, false, null, - null + null, + false ) ) ) @@ -903,7 +1012,8 @@ public void testGroupByAggregatorDefaultValues() null, true, null, - null + null, + false ), equality("dim1", "nonexistent", ColumnType.STRING) ), @@ -914,7 +1024,8 @@ public void testGroupByAggregatorDefaultValues() null, true, null, - null + null, + false ), equality("dim1", "nonexistent", ColumnType.STRING) ), @@ -925,7 +1036,8 @@ public void testGroupByAggregatorDefaultValues() 1024, false, null, - null + null, + false ), equality("dim1", "nonexistent", ColumnType.STRING) ), @@ -936,7 +1048,8 @@ public void testGroupByAggregatorDefaultValues() 1024, false, null, - null + null, + false ), equality("dim1", "nonexistent", ColumnType.STRING) ) @@ -984,7 +1097,8 @@ public void testGroupByAggregatorDefaultValuesFinalizeOuterSketches() null, true, null, - null + null, + false ), equality("dim1", "nonexistent", ColumnType.STRING) ), @@ -995,7 +1109,8 @@ public void testGroupByAggregatorDefaultValuesFinalizeOuterSketches() null, true, null, - null + null, + false ), equality("dim1", "nonexistent", ColumnType.STRING) ), @@ -1006,7 +1121,8 @@ public void testGroupByAggregatorDefaultValuesFinalizeOuterSketches() 1024, true, null, - null + null, + false ), equality("dim1", "nonexistent", ColumnType.STRING) ), @@ -1017,7 +1133,8 @@ public void testGroupByAggregatorDefaultValuesFinalizeOuterSketches() 1024, true, null, - null + null, + false ), equality("dim1", "nonexistent", ColumnType.STRING) ) @@ -1189,4 +1306,116 @@ public void testThetaEstimateAsVirtualColumnWithTopN() ) ); } + + @Test + public void testArrays() + { + testQuery( + "SELECT\n" + + " APPROX_COUNT_DISTINCT_DS_THETA(arrayString), " + + " APPROX_COUNT_DISTINCT_DS_THETA(arrayLong), " + + " APPROX_COUNT_DISTINCT_DS_THETA(arrayDouble), " + + " APPROX_COUNT_DISTINCT_DS_THETA(sketch0), " + + " APPROX_COUNT_DISTINCT_DS_THETA(sketch1), " + + " APPROX_COUNT_DISTINCT_DS_THETA(sketch2), " + + " APPROX_COUNT_DISTINCT_DS_THETA(sketch3), " + + " APPROX_COUNT_DISTINCT_DS_THETA(sketch4), " + + " APPROX_COUNT_DISTINCT_DS_THETA(sketch5) " + + "FROM druid.all_types", + ImmutableList.of( + Druids.newTimeseriesQueryBuilder() + .dataSource("all_types") + .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity()))) + .granularity(Granularities.ALL) + .aggregators( + ImmutableList.of( + new SketchMergeAggregatorFactory( + "a0", + "arrayString", + null, + null, + null, + null, + true + ), + new SketchMergeAggregatorFactory( + "a1", + "arrayLong", + null, + null, + null, + null, + true + ), + new SketchMergeAggregatorFactory( + "a2", + "arrayDouble", + null, + null, + null, + null, + true + ), + new SketchMergeAggregatorFactory( + "a3", + "sketch0", + null, + null, + false, + null, + false + ), + new SketchMergeAggregatorFactory( + "a4", + "sketch1", + null, + null, + false, + null, + false + ), + new SketchMergeAggregatorFactory( + "a5", + "sketch2", + null, + null, + false, + null, + false + ), + new SketchMergeAggregatorFactory( + "a6", + "sketch3", + null, + null, + false, + null, + false + ), + new SketchMergeAggregatorFactory( + "a7", + "sketch4", + null, + null, + false, + null, + false + ), + new SketchMergeAggregatorFactory( + "a8", + "sketch5", + null, + null, + false, + null, + false + ) + ) + ) + .context(QUERY_CONTEXT_DEFAULT) + .build() + ), + ImmutableList.of(new Object[]{4L, 4L, 4L, 4L, 4L, 4L, 5L, 4L, 6L}) + ); + } } diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorFactory.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorFactory.java index 374f9ef3e6fb..ce99e5b9b9e5 100644 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorFactory.java +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorFactory.java @@ -287,13 +287,29 @@ private BaseBloomFilterAggregator factorizeInternal(ColumnSelectorFactory column maxNumEntries, onHeap ); - case COMPLEX: - // in an ideal world, we would check complex type, but until then assume it's a bloom filter - return new BloomFilterMergeAggregator( + case ARRAY: + return new ByteBloomFilterAggregator( columnFactory.makeColumnValueSelector(field.getDimension()), + capabilities, maxNumEntries, onHeap ); + case COMPLEX: + if (BloomFilterSerializersModule.BLOOM_FILTER_TYPE_NAME.equals(capabilities.getComplexTypeName())) { + return new BloomFilterMergeAggregator( + columnFactory.makeColumnValueSelector(field.getDimension()), + maxNumEntries, + onHeap + ); + } else { + // fall back to bytes aggregator + return new ByteBloomFilterAggregator( + columnFactory.makeColumnValueSelector(field.getDimension()), + capabilities, + maxNumEntries, + onHeap + ); + } default: throw new IAE( "Cannot create bloom filter %s for invalid column type [%s]", diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/ObjectBloomFilterAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/ObjectBloomFilterAggregator.java index 0ad7a179fdb6..87e9f6721b7b 100644 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/ObjectBloomFilterAggregator.java +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/ObjectBloomFilterAggregator.java @@ -19,6 +19,7 @@ package org.apache.druid.query.aggregation.bloom; +import org.apache.druid.math.expr.ExprEval; import org.apache.druid.query.filter.BloomKFilter; import org.apache.druid.segment.BaseObjectColumnValueSelector; @@ -54,6 +55,8 @@ void bufferAdd(ByteBuffer buf) BloomKFilter.addFloat(buf, (float) object); } else if (object instanceof String) { BloomKFilter.addString(buf, (String) object); + } else if (object instanceof Object[]) { + BloomKFilter.addBytes(buf, ExprEval.toBytesBestEffort(object)); } else { BloomKFilter.addBytes(buf, null, 0, 0); } diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/filter/BloomDimFilter.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/filter/BloomDimFilter.java index a22fec727464..ab002496ee9c 100644 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/filter/BloomDimFilter.java +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/filter/BloomDimFilter.java @@ -28,8 +28,12 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.RangeSet; import com.google.common.hash.HashCode; +import org.apache.druid.math.expr.ExprEval; +import org.apache.druid.math.expr.ExpressionType; import org.apache.druid.query.cache.CacheKeyBuilder; import org.apache.druid.query.extraction.ExtractionFn; +import org.apache.druid.segment.column.TypeSignature; +import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.filter.DimensionPredicateFilter; import javax.annotation.Nullable; @@ -165,6 +169,32 @@ public boolean applyNull() } }; } + + @Override + public Predicate makeArrayPredicate(@Nullable TypeSignature arrayType) + { + final ExpressionType expressionType = arrayType == null + ? null + : ExpressionType.fromColumnTypeStrict(arrayType); + if (expressionType != null) { + return input -> { + if (input == null) { + return bloomKFilter.testBytes(null, 0, 0); + } + final byte[] bytes = ExprEval.toBytes(expressionType, input); + return bloomKFilter.testBytes(bytes); + }; + } else { + // fall back to per row detection + return input -> { + if (input == null) { + return bloomKFilter.testBytes(null, 0, 0); + } + final byte[] bytes = ExprEval.toBytesBestEffort(input); + return bloomKFilter.testBytes(bytes); + }; + } + } }, extractionFn, filterTuning diff --git a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorTest.java b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorTest.java index 5888b7d13dde..41c553d56719 100644 --- a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorTest.java +++ b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorTest.java @@ -28,6 +28,8 @@ import org.apache.druid.guice.BloomFilterSerializersModule; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.math.expr.ExprEval; +import org.apache.druid.math.expr.ExpressionType; import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.BufferAggregator; @@ -91,6 +93,13 @@ public class BloomFilterAggregatorTest extends InitializedNullHandlingTest private static final Float[] FLOAT_VALUES1 = new Float[]{0.4f, 0.8f, 23.2f}; private static final Long[] LONG_VALUES1 = new Long[]{10241L, 12312355L, 0L, 81L}; + private static final Object[] ARRAY_VALUES = new Object[]{ + new Object[]{1L, 2L}, + new Object[]{3L, 4L}, + new Object[]{0L, 1000L}, + new Object[]{null, 123L} + }; + private static final int MAX_NUM_VALUES = 15; private static BloomKFilter filter1; @@ -102,6 +111,7 @@ public class BloomFilterAggregatorTest extends InitializedNullHandlingTest private static String serializedLongFilter; private static String serializedDoubleFilter; private static String serializedFloatFilter; + private static String serializedArrayFilter; static { try { @@ -134,6 +144,11 @@ public class BloomFilterAggregatorTest extends InitializedNullHandlingTest } serializedDoubleFilter = filterToString(doubleFilter); + BloomKFilter arrayFilter = new BloomKFilter(MAX_NUM_VALUES); + for (Object o : ARRAY_VALUES) { + arrayFilter.addBytes(ExprEval.toBytes(ExpressionType.LONG_ARRAY, o)); + } + serializedArrayFilter = filterToString(arrayFilter); } catch (Exception ex) { throw new RuntimeException(ex); @@ -261,7 +276,7 @@ public void testAggregateLongValues() throws IOException TestLongColumnSelector selector = new TestLongColumnSelector(Arrays.asList(LONG_VALUES1)); LongBloomFilterAggregator agg = new LongBloomFilterAggregator(selector, MAX_NUM_VALUES, true); - for (Long ignored : LONG_VALUES1) { + for (int i = 0; i < LONG_VALUES1.length; i++) { aggregateColumn(Collections.singletonList(selector), agg); } @@ -278,7 +293,7 @@ public void testAggregateFloatValues() throws IOException TestFloatColumnSelector selector = new TestFloatColumnSelector(Arrays.asList(FLOAT_VALUES1)); FloatBloomFilterAggregator agg = new FloatBloomFilterAggregator(selector, MAX_NUM_VALUES, true); - for (Float ignored : FLOAT_VALUES1) { + for (int i = 0; i < FLOAT_VALUES1.length; i++) { aggregateColumn(Collections.singletonList(selector), agg); } @@ -295,7 +310,7 @@ public void testAggregateDoubleValues() throws IOException TestDoubleColumnSelector selector = new TestDoubleColumnSelector(Arrays.asList(DOUBLE_VALUES1)); DoubleBloomFilterAggregator agg = new DoubleBloomFilterAggregator(selector, MAX_NUM_VALUES, true); - for (Double ignored : DOUBLE_VALUES1) { + for (int i = 0; i < DOUBLE_VALUES1.length; i++) { aggregateColumn(Collections.singletonList(selector), agg); } @@ -395,6 +410,49 @@ public void testBufferAggregateDoubleValues() throws IOException Assert.assertEquals(serializedDoubleFilter, serialized); } + @Test + public void testAggregateArrayValues() throws IOException + { + TestObjectColumnSelector selector = new TestObjectColumnSelector( + Arrays.asList(ARRAY_VALUES) + ); + ObjectBloomFilterAggregator agg = new ObjectBloomFilterAggregator(selector, MAX_NUM_VALUES, true); + + for (int i = 0; i < ARRAY_VALUES.length; i++) { + aggregateColumn(Collections.singletonList(selector), agg); + } + + BloomKFilter bloomKFilter = BloomKFilter.deserialize( + (ByteBuffer) valueAggregatorFactory.finalizeComputation(agg.get()) + ); + String serialized = filterToString(bloomKFilter); + Assert.assertEquals(serializedArrayFilter, serialized); + } + + @Test + public void testBufferAggregateArrayValues() throws IOException + { + TestObjectColumnSelector selector = new TestObjectColumnSelector( + Arrays.asList(ARRAY_VALUES) + ); + ObjectBloomFilterAggregator agg = new ObjectBloomFilterAggregator(selector, MAX_NUM_VALUES, true); + + int maxSize = valueAggregatorFactory.getMaxIntermediateSizeWithNulls(); + ByteBuffer buf = ByteBuffer.allocate(maxSize + 64); + int pos = 10; + buf.limit(pos + maxSize); + + agg.init(buf, pos); + + IntStream.range(0, ARRAY_VALUES.length) + .forEach(i -> bufferAggregateColumn(Collections.singletonList(selector), agg, buf, pos)); + BloomKFilter bloomKFilter = BloomKFilter.deserialize( + (ByteBuffer) valueAggregatorFactory.finalizeComputation(agg.get(buf, pos)) + ); + String serialized = filterToString(bloomKFilter); + Assert.assertEquals(serializedArrayFilter, serialized); + } + @Test public void testCombineValues() throws IOException { @@ -672,4 +730,18 @@ public double getDouble() return values.get(pos); } } + + public static class TestObjectColumnSelector extends SteppableSelector implements ColumnValueSelector + { + public TestObjectColumnSelector(List values) + { + super(values); + } + + @Override + public Object getObject() + { + return values.get(pos); + } + } } diff --git a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/filter/BloomDimFilterTest.java b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/filter/BloomDimFilterTest.java index 26290da5578c..371eae91331d 100644 --- a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/filter/BloomDimFilterTest.java +++ b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/filter/BloomDimFilterTest.java @@ -33,6 +33,8 @@ import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Pair; +import org.apache.druid.math.expr.ExprEval; +import org.apache.druid.math.expr.ExpressionType; import org.apache.druid.query.extraction.MapLookupExtractor; import org.apache.druid.query.extraction.TimeDimExtractionFn; import org.apache.druid.query.lookup.LookupExtractionFn; @@ -201,22 +203,32 @@ public void testMultiValueStringColumn() throws IOException if (NullHandling.replaceWithDefault()) { assertFilterMatchesSkipArrays( new BloomDimFilter("dim2", bloomKFilter(1000, (String) null), null), - ImmutableList.of("1", "2", "5") + isAutoSchema() ? ImmutableList.of("5") : ImmutableList.of("1", "2", "5") ); } else { assertFilterMatchesSkipArrays( new BloomDimFilter("dim2", bloomKFilter(1000, (String) null), null), - ImmutableList.of("1", "5") + isAutoSchema() ? ImmutableList.of("5") : ImmutableList.of("1", "5") ); - assertFilterMatchesSkipArrays( + assertFilterMatches( new BloomDimFilter("dim2", bloomKFilter(1000, ""), null), - ImmutableList.of("2") + isAutoSchema() ? ImmutableList.of() : ImmutableList.of("2") ); + if (isAutoSchema()) { + assertFilterMatches( + new BloomDimFilter( + "dim2", + bloomKFilter(1000, ExprEval.toBytes(ExpressionType.STRING_ARRAY, ImmutableList.of("a", "b"))), + null + ), + ImmutableList.of("0") + ); + } } - assertFilterMatchesSkipArrays(new BloomDimFilter("dim2", bloomKFilter(1000, "a"), null), ImmutableList.of("0", "3")); - assertFilterMatchesSkipArrays(new BloomDimFilter("dim2", bloomKFilter(1000, "b"), null), ImmutableList.of("0")); - assertFilterMatchesSkipArrays(new BloomDimFilter("dim2", bloomKFilter(1000, "c"), null), ImmutableList.of("4")); - assertFilterMatchesSkipArrays(new BloomDimFilter("dim2", bloomKFilter(1000, "d"), null), ImmutableList.of()); + assertFilterMatches(new BloomDimFilter("dim2", bloomKFilter(1000, "a"), null), isAutoSchema() ? ImmutableList.of() : ImmutableList.of("0", "3")); + assertFilterMatches(new BloomDimFilter("dim2", bloomKFilter(1000, "b"), null), isAutoSchema() ? ImmutableList.of() : ImmutableList.of("0")); + assertFilterMatches(new BloomDimFilter("dim2", bloomKFilter(1000, "c"), null), isAutoSchema() ? ImmutableList.of() : ImmutableList.of("4")); + assertFilterMatches(new BloomDimFilter("dim2", bloomKFilter(1000, "d"), null), ImmutableList.of()); } @Test diff --git a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/filter/sql/BloomDimFilterSqlTest.java b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/filter/sql/BloomDimFilterSqlTest.java index 15152694ce4d..2a696be6aba9 100644 --- a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/filter/sql/BloomDimFilterSqlTest.java +++ b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/filter/sql/BloomDimFilterSqlTest.java @@ -93,6 +93,7 @@ public void testBloomFilterExprFilter() throws IOException } byte[] bytes = BloomFilterSerializersModule.bloomKFilterToBytes(filter); String base64 = StringUtils.encodeBase64String(bytes); + skipVectorize(); // fool the planner to make an expression virtual column to test bloom filter Druid expression testQuery( diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java index 3c40affa7834..8aabb8a344e5 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java @@ -171,8 +171,8 @@ public void testAutoCompactionRowWithMetricAndRowWithoutMetricShouldPreserveExis new CountAggregatorFactory("count"), // FloatSumAggregator combine method takes in two Float but return Double new FloatSumAggregatorFactory("sum_added", "added"), - new SketchMergeAggregatorFactory("thetaSketch", "user", 16384, true, false, null), - new HllSketchBuildAggregatorFactory("HLLSketchBuild", "user", 12, TgtHllType.HLL_4.name(), null, false, false), + new SketchMergeAggregatorFactory("thetaSketch", "user", 16384, true, false, null, false), + new HllSketchBuildAggregatorFactory("HLLSketchBuild", "user", 12, TgtHllType.HLL_4.name(), null, false, false, false), new DoublesSketchAggregatorFactory("quantilesDoublesSketch", "delta", 128, 1000000000L, null) }, false @@ -265,7 +265,7 @@ public void testAutoCompactionRowWithMetricAndRowWithoutMetricShouldPreserveExis new AggregatorFactory[]{ new CountAggregatorFactory("count"), new LongSumAggregatorFactory("sum_added", "added"), - new SketchMergeAggregatorFactory("thetaSketch", "user", 16384, true, false, null), + new SketchMergeAggregatorFactory("thetaSketch", "user", 16384, true, false, null, false), new HllSketchBuildAggregatorFactory( "HLLSketchBuild", "user", @@ -273,6 +273,7 @@ public void testAutoCompactionRowWithMetricAndRowWithoutMetricShouldPreserveExis TgtHllType.HLL_4.name(), null, false, + false, false ), new DoublesSketchAggregatorFactory("quantilesDoublesSketch", "delta", 128, 1000000000L, null) diff --git a/processing/src/main/java/org/apache/druid/math/expr/ExprEval.java b/processing/src/main/java/org/apache/druid/math/expr/ExprEval.java index 01d27e9006fe..abbe36030b94 100644 --- a/processing/src/main/java/org/apache/druid/math/expr/ExprEval.java +++ b/processing/src/main/java/org/apache/druid/math/expr/ExprEval.java @@ -142,6 +142,27 @@ public static void serialize(ByteBuffer buffer, int position, ExpressionType typ } } + public static byte[] toBytes(ExpressionType expressionType, Object o) + { + final ExprEval eval = ExprEval.ofType(expressionType, o); + return toBytes(eval); + } + + public static byte[] toBytesBestEffort(Object o) + { + final ExprEval eval = ExprEval.bestEffortOf(o); + return toBytes(eval); + } + + public static byte[] toBytes(ExprEval eval) + { + final NullableTypeStrategy strategy = eval.type().getNullableStrategy(); + final int size = strategy.estimateSizeBytes(eval.valueOrDefault()); + final ByteBuffer buffer = ByteBuffer.allocate(size); + strategy.write(buffer, eval.valueOrDefault(), size); + return buffer.array(); + } + /** * Converts a List to an appropriate array type, optionally doing some conversion to make multi-valued strings * consistent across selector types, which are not consistent in treatment of null, [], and [null]. From b67d3a988ca5540926372000ba21cae392d114d5 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Tue, 25 Jul 2023 03:03:25 -0700 Subject: [PATCH 2/2] oops --- .../bloom/ByteBloomFilterAggregator.java | 56 +++++++++++++++++++ 1 file changed, 56 insertions(+) create mode 100644 extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/ByteBloomFilterAggregator.java diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/ByteBloomFilterAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/ByteBloomFilterAggregator.java new file mode 100644 index 000000000000..299b535321a7 --- /dev/null +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/ByteBloomFilterAggregator.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.bloom; + +import org.apache.druid.math.expr.ExprEval; +import org.apache.druid.math.expr.ExpressionType; +import org.apache.druid.query.filter.BloomKFilter; +import org.apache.druid.segment.BaseObjectColumnValueSelector; +import org.apache.druid.segment.column.TypeSignature; +import org.apache.druid.segment.column.ValueType; + +import java.nio.ByteBuffer; + +public class ByteBloomFilterAggregator extends BaseBloomFilterAggregator> +{ + private final ExpressionType columnType; + + ByteBloomFilterAggregator( + BaseObjectColumnValueSelector baseObjectColumnValueSelector, + TypeSignature columnType, + int maxNumEntries, + boolean onHeap + ) + { + super(baseObjectColumnValueSelector, maxNumEntries, onHeap); + this.columnType = ExpressionType.fromColumnTypeStrict(columnType); + } + + @Override + void bufferAdd(ByteBuffer buf) + { + final Object val = selector.getObject(); + if (val == null) { + BloomKFilter.addBytes(buf, null, 0, 0); + } else { + BloomKFilter.addBytes(buf, ExprEval.toBytes(columnType, val)); + } + } +}