diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/DataSketchesHllBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/DataSketchesHllBenchmark.java index bed30c85c64b..28dc5db99054 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/DataSketchesHllBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/DataSketchesHllBenchmark.java @@ -65,6 +65,7 @@ public class DataSketchesHllBenchmark null, null, null, + null, false ); diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlBenchmark.java index ba9debb28815..1172d823e024 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlBenchmark.java @@ -32,8 +32,10 @@ import org.apache.druid.query.QueryContexts; import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.query.aggregation.datasketches.hll.sql.HllSketchApproxCountDistinctSqlAggregator; +import org.apache.druid.query.aggregation.datasketches.hll.sql.HllSketchApproxCountDistinctUtf8SqlAggregator; import org.apache.druid.query.aggregation.datasketches.quantiles.sql.DoublesSketchApproxQuantileSqlAggregator; import org.apache.druid.query.aggregation.datasketches.quantiles.sql.DoublesSketchObjectSqlAggregator; +import org.apache.druid.query.aggregation.datasketches.theta.sql.ThetaSketchApproxCountDistinctSqlAggregator; import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.QueryableIndexSegment; @@ -405,26 +407,35 @@ public class SqlBenchmark "SELECT * FROM foo WHERE dimSequential IN ('1', '2', '3', '4', '5', '10', '11', '20', '21', '23', '40', '50', '64', '70', '100')", "SELECT * FROM foo WHERE dimSequential > '10' AND dimSequential < '8500'", "SELECT dimSequential, dimZipf, SUM(sumLongSequential) FROM foo WHERE dimSequential IN ('1', '2', '3', '4', '5', '10', '11', '20', '21', '23', '40', '50', '64', '70', '100') GROUP BY 1, 2", - "SELECT dimSequential, dimZipf, SUM(sumLongSequential) FROM foo WHERE dimSequential > '10' AND dimSequential < '8500' GROUP BY 1, 2" - + "SELECT dimSequential, dimZipf, SUM(sumLongSequential) FROM foo WHERE dimSequential > '10' AND dimSequential < '8500' GROUP BY 1, 2", + // 28, 29, 30, 31: Approximate count distinct of strings + "SELECT APPROX_COUNT_DISTINCT_BUILTIN(dimZipf) FROM foo", + "SELECT APPROX_COUNT_DISTINCT_DS_HLL(dimZipf) FROM foo", + "SELECT APPROX_COUNT_DISTINCT_DS_HLL_UTF8(dimZipf) FROM foo", + "SELECT APPROX_COUNT_DISTINCT_DS_THETA(dimZipf) FROM foo" ); @Param({"5000000"}) private int rowsPerSegment; - @Param({"false", "force"}) + // Can be "false", "true", or "force" + @Param({"force"}) private String vectorize; - @Param({"none", "front-coded-4", "front-coded-16"}) + + // Can be "none" or "front-coded-N" + @Param({"none", "front-coded-4"}) private String stringEncoding; - @Param({"4", "5", "6", "7", "8", "10", "11", "12", "19", "21", "22", "23", "26", "27"}) + @Param({"28", "29", "30", "31"}) private String query; - @Param({STORAGE_MMAP, STORAGE_FRAME_ROW, STORAGE_FRAME_COLUMNAR}) + // Can be STORAGE_MMAP, STORAGE_FRAME_ROW, or STORAGE_FRAME_COLUMNAR + @Param({STORAGE_MMAP}) private String storageType; private SqlEngine engine; + @Nullable private PlannerFactory plannerFactory; private final Closer closer = Closer.create(); @@ -520,13 +531,19 @@ private static DruidOperatorTable createOperatorTable() try { final Set extractionOperators = new HashSet<>(); extractionOperators.add(CalciteTests.INJECTOR.getInstance(QueryLookupOperatorConversion.class)); - final Set aggregators = new HashSet<>(); - aggregators.add(CalciteTests.INJECTOR.getInstance(DoublesSketchApproxQuantileSqlAggregator.class)); - aggregators.add(CalciteTests.INJECTOR.getInstance(DoublesSketchObjectSqlAggregator.class)); final ApproxCountDistinctSqlAggregator countDistinctSqlAggregator = new ApproxCountDistinctSqlAggregator(new HllSketchApproxCountDistinctSqlAggregator()); - aggregators.add(new CountSqlAggregator(countDistinctSqlAggregator)); - aggregators.add(countDistinctSqlAggregator); + final Set aggregators = new HashSet<>( + ImmutableList.of( + new DoublesSketchApproxQuantileSqlAggregator(), + new DoublesSketchObjectSqlAggregator(), + new HllSketchApproxCountDistinctSqlAggregator(), + new HllSketchApproxCountDistinctUtf8SqlAggregator(), + new ThetaSketchApproxCountDistinctSqlAggregator(), + new CountSqlAggregator(countDistinctSqlAggregator), + countDistinctSqlAggregator + ) + ); return new DruidOperatorTable(aggregators, extractionOperators); } catch (Exception e) { diff --git a/codestyle/spotbugs-exclude.xml b/codestyle/spotbugs-exclude.xml index 79a6faf61c5a..19da270192db 100644 --- a/codestyle/spotbugs-exclude.xml +++ b/codestyle/spotbugs-exclude.xml @@ -45,6 +45,7 @@ + diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorFactory.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorFactory.java index 2b65e5a40e29..4bc734dc0051 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorFactory.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorFactory.java @@ -25,6 +25,8 @@ import org.apache.datasketches.hll.TgtHllType; import org.apache.datasketches.hll.Union; import org.apache.druid.jackson.DefaultTrueJsonIncludeFilter; +import org.apache.druid.java.util.common.StringEncoding; +import org.apache.druid.java.util.common.StringEncodingDefaultUTF16LEJsonIncludeFilter; import org.apache.druid.query.aggregation.AggregateCombiner; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.ObjectAggregateCombiner; @@ -47,6 +49,7 @@ public abstract class HllSketchAggregatorFactory extends AggregatorFactory public static final boolean DEFAULT_SHOULD_FINALIZE = true; public static final int DEFAULT_LG_K = 12; public static final TgtHllType DEFAULT_TGT_HLL_TYPE = TgtHllType.HLL_4; + public static final StringEncoding DEFAULT_STRING_ENCODING = StringEncoding.UTF16LE; static final Comparator COMPARATOR = Comparator.nullsFirst(Comparator.comparingDouble(HllSketchHolder::getEstimate)); @@ -55,6 +58,7 @@ public abstract class HllSketchAggregatorFactory extends AggregatorFactory private final String fieldName; private final int lgK; private final TgtHllType tgtHllType; + private final StringEncoding stringEncoding; private final boolean shouldFinalize; private final boolean round; @@ -63,6 +67,7 @@ public abstract class HllSketchAggregatorFactory extends AggregatorFactory final String fieldName, @Nullable final Integer lgK, @Nullable final String tgtHllType, + @Nullable final StringEncoding stringEncoding, final Boolean shouldFinalize, final boolean round ) @@ -71,6 +76,7 @@ public abstract class HllSketchAggregatorFactory extends AggregatorFactory this.fieldName = Objects.requireNonNull(fieldName); this.lgK = lgK == null ? DEFAULT_LG_K : lgK; this.tgtHllType = tgtHllType == null ? DEFAULT_TGT_HLL_TYPE : TgtHllType.valueOf(tgtHllType); + this.stringEncoding = stringEncoding == null ? DEFAULT_STRING_ENCODING : stringEncoding; this.shouldFinalize = shouldFinalize == null ? DEFAULT_SHOULD_FINALIZE : shouldFinalize; this.round = round; } @@ -100,6 +106,13 @@ public String getTgtHllType() return tgtHllType.toString(); } + @JsonProperty + @JsonInclude(value = JsonInclude.Include.CUSTOM, valueFilter = StringEncodingDefaultUTF16LEJsonIncludeFilter.class) + public StringEncoding getStringEncoding() + { + return stringEncoding; + } + @JsonProperty @JsonInclude(value = JsonInclude.Include.CUSTOM, valueFilter = DefaultTrueJsonIncludeFilter.class) public boolean isShouldFinalize() @@ -121,14 +134,23 @@ public List requiredFields() } /** - * This is a convoluted way to return a list of input field names this aggregator needs. - * Currently the returned factories are only used to obtain a field name by calling getName() method. + * Used by groupBy v1 to create a "transfer aggregator". + * + * {@inheritDoc} */ @Override public List getRequiredColumns() { return Collections.singletonList( - new HllSketchBuildAggregatorFactory(fieldName, fieldName, lgK, tgtHllType.toString(), shouldFinalize, round) + new HllSketchBuildAggregatorFactory( + fieldName, + fieldName, + lgK, + tgtHllType.toString(), + stringEncoding, + shouldFinalize, + round + ) ); } @@ -228,6 +250,7 @@ public AggregatorFactory getCombiningFactory() getName(), getLgK(), getTgtHllType(), + getStringEncoding(), isShouldFinalize(), isRound() ); @@ -236,8 +259,13 @@ public AggregatorFactory getCombiningFactory() @Override public byte[] getCacheKey() { - return new CacheKeyBuilder(getCacheTypeId()).appendString(name).appendString(fieldName) - .appendInt(lgK).appendInt(tgtHllType.ordinal()).build(); + return new CacheKeyBuilder(getCacheTypeId()) + .appendString(name) + .appendString(fieldName) + .appendInt(lgK) + .appendInt(tgtHllType.ordinal()) + .appendCacheable(stringEncoding) + .build(); } @Override @@ -255,13 +283,14 @@ public boolean equals(Object o) && round == that.round && Objects.equals(name, that.name) && Objects.equals(fieldName, that.fieldName) - && tgtHllType == that.tgtHllType; + && tgtHllType == that.tgtHllType + && stringEncoding == that.stringEncoding; } @Override public int hashCode() { - return Objects.hash(name, fieldName, lgK, tgtHllType, shouldFinalize, round); + return Objects.hash(name, fieldName, lgK, tgtHllType, stringEncoding, shouldFinalize, round); } @Override @@ -272,6 +301,7 @@ public String toString() ", fieldName='" + fieldName + '\'' + ", lgK=" + lgK + ", tgtHllType=" + tgtHllType + + (stringEncoding != DEFAULT_STRING_ENCODING ? ", stringEncoding=" + stringEncoding : "") + (shouldFinalize != DEFAULT_SHOULD_FINALIZE ? ", shouldFinalize=" + shouldFinalize : "") + (round != DEFAULT_ROUND ? ", round=" + round : "") + '}'; diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildAggregator.java index c2d848311f98..7a086b7257d7 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildAggregator.java @@ -21,12 +21,10 @@ import org.apache.datasketches.hll.HllSketch; import org.apache.datasketches.hll.TgtHllType; -import org.apache.druid.common.config.NullHandling; -import org.apache.druid.java.util.common.IAE; import org.apache.druid.query.aggregation.Aggregator; -import org.apache.druid.segment.ColumnValueSelector; -import java.util.List; +import java.util.function.Consumer; +import java.util.function.Supplier; /** * This aggregator builds sketches from raw data. @@ -34,17 +32,16 @@ */ public class HllSketchBuildAggregator implements Aggregator { - - private final ColumnValueSelector selector; + private final Consumer> processor; private HllSketch sketch; public HllSketchBuildAggregator( - final ColumnValueSelector selector, + final Consumer> processor, final int lgK, final TgtHllType tgtHllType ) { - this.selector = selector; + this.processor = processor; this.sketch = new HllSketch(lgK, tgtHllType); } @@ -54,15 +51,9 @@ public HllSketchBuildAggregator( * See https://github.com/druid-io/druid/pull/3956 */ @Override - public void aggregate() + public synchronized void aggregate() { - final Object value = selector.getObject(); - if (value == null) { - return; - } - synchronized (this) { - updateSketch(sketch, value); - } + processor.accept(() -> sketch); } /* @@ -93,36 +84,4 @@ public long getLong() { throw new UnsupportedOperationException("Not implemented"); } - - static void updateSketch(final HllSketch sketch, final Object value) - { - if (value instanceof Integer || value instanceof Long) { - sketch.update(((Number) value).longValue()); - } else if (value instanceof Float || value instanceof Double) { - sketch.update(((Number) value).doubleValue()); - } else if (value instanceof String) { - sketch.update(((String) value).toCharArray()); - } else if (value instanceof List) { - // noinspection rawtypes - for (Object entry : (List) value) { - if (entry != null) { - final String asString = entry.toString(); - if (!NullHandling.isNullOrEquivalent(asString)) { - sketch.update(asString); - } - } - } - } else if (value instanceof char[]) { - sketch.update((char[]) value); - } else if (value instanceof byte[]) { - sketch.update((byte[]) value); - } else if (value instanceof int[]) { - sketch.update((int[]) value); - } else if (value instanceof long[]) { - sketch.update((long[]) value); - } else { - throw new IAE("Unsupported type " + value.getClass()); - } - } - } diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildAggregatorFactory.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildAggregatorFactory.java index 226bda132455..2762d007b2e8 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildAggregatorFactory.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildAggregatorFactory.java @@ -24,20 +24,23 @@ import org.apache.datasketches.hll.HllSketch; import org.apache.datasketches.hll.TgtHllType; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StringEncoding; import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.query.aggregation.VectorAggregator; import org.apache.druid.segment.ColumnInspector; +import org.apache.druid.segment.ColumnProcessors; import org.apache.druid.segment.ColumnSelectorFactory; -import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.vector.VectorColumnSelectorFactory; import javax.annotation.Nullable; +import java.util.function.Consumer; +import java.util.function.Supplier; /** * This aggregator factory is for building sketches from raw data. @@ -53,11 +56,12 @@ public HllSketchBuildAggregatorFactory( @JsonProperty("fieldName") final String fieldName, @JsonProperty("lgK") @Nullable final Integer lgK, @JsonProperty("tgtHllType") @Nullable final String tgtHllType, + @JsonProperty("stringEncoding") @Nullable final StringEncoding stringEncoding, @JsonProperty("shouldFinalize") final Boolean shouldFinalize, @JsonProperty("round") final boolean round ) { - super(name, fieldName, lgK, tgtHllType, shouldFinalize, round); + super(name, fieldName, lgK, tgtHllType, stringEncoding, shouldFinalize, round); } @@ -76,20 +80,37 @@ protected byte getCacheTypeId() @Override public Aggregator factorize(final ColumnSelectorFactory columnSelectorFactory) { - final ColumnValueSelector selector = columnSelectorFactory.makeColumnValueSelector(getFieldName()); validateInputs(columnSelectorFactory.getColumnCapabilities(getFieldName())); - return new HllSketchBuildAggregator(selector, getLgK(), TgtHllType.valueOf(getTgtHllType())); + + final Consumer> processor = ColumnProcessors.makeProcessor( + getFieldName(), + new HllSketchBuildColumnProcessorFactory(getStringEncoding()), + columnSelectorFactory + ); + + return new HllSketchBuildAggregator( + processor, + getLgK(), + TgtHllType.valueOf(getTgtHllType()) + ); } @Override public BufferAggregator factorizeBuffered(final ColumnSelectorFactory columnSelectorFactory) { - final ColumnValueSelector selector = columnSelectorFactory.makeColumnValueSelector(getFieldName()); validateInputs(columnSelectorFactory.getColumnCapabilities(getFieldName())); + + final Consumer> processor = ColumnProcessors.makeProcessor( + getFieldName(), + new HllSketchBuildColumnProcessorFactory(getStringEncoding()), + columnSelectorFactory + ); + return new HllSketchBuildBufferAggregator( - selector, + processor, getLgK(), TgtHllType.valueOf(getTgtHllType()), + getStringEncoding(), getMaxIntermediateSize() ); } @@ -104,11 +125,13 @@ public boolean canVectorize(ColumnInspector columnInspector) public VectorAggregator factorizeVector(VectorColumnSelectorFactory selectorFactory) { validateInputs(selectorFactory.getColumnCapabilities(getFieldName())); - return new HllSketchBuildVectorAggregator( + + return HllSketchBuildVectorAggregator.create( selectorFactory, getFieldName(), getLgK(), TgtHllType.valueOf(getTgtHllType()), + getStringEncoding(), getMaxIntermediateSize() ); } @@ -131,6 +154,7 @@ public AggregatorFactory withName(String newName) getFieldName(), getLgK(), getTgtHllType(), + getStringEncoding(), isShouldFinalize(), isRound() ); diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregator.java index 7570e48d6907..10cde4aa25d5 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregator.java @@ -19,12 +19,15 @@ package org.apache.druid.query.aggregation.datasketches.hll; +import org.apache.datasketches.hll.HllSketch; import org.apache.datasketches.hll.TgtHllType; +import org.apache.druid.java.util.common.StringEncoding; import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; -import org.apache.druid.segment.ColumnValueSelector; import java.nio.ByteBuffer; +import java.util.function.Consumer; +import java.util.function.Supplier; /** * This aggregator builds sketches from raw data. @@ -32,18 +35,21 @@ */ public class HllSketchBuildBufferAggregator implements BufferAggregator { - private final ColumnValueSelector selector; + private final Consumer> processor; private final HllSketchBuildBufferAggregatorHelper helper; + private final StringEncoding stringEncoding; public HllSketchBuildBufferAggregator( - final ColumnValueSelector selector, + final Consumer> processor, final int lgK, final TgtHllType tgtHllType, + final StringEncoding stringEncoding, final int size ) { - this.selector = selector; + this.processor = processor; this.helper = new HllSketchBuildBufferAggregatorHelper(lgK, tgtHllType, size); + this.stringEncoding = stringEncoding; } @Override @@ -55,12 +61,7 @@ public void init(final ByteBuffer buf, final int position) @Override public void aggregate(final ByteBuffer buf, final int position) { - final Object value = selector.getObject(); - if (value == null) { - return; - } - - HllSketchBuildAggregator.updateSketch(helper.getSketchAtPosition(buf, position), value); + processor.accept(() -> helper.getSketchAtPosition(buf, position)); } @Override @@ -100,10 +101,11 @@ public void relocate(final int oldPosition, final int newPosition, final ByteBuf @Override public void inspectRuntimeShape(RuntimeShapeInspector inspector) { - inspector.visit("selector", selector); + inspector.visit("processor", processor); // lgK should be inspected because different execution paths exist in HllSketch.update() that is called from // @CalledFromHotLoop-annotated aggregate() depending on the lgK. // See https://github.com/apache/druid/pull/6893#discussion_r250726028 inspector.visit("lgK", helper.getLgK()); + inspector.visit("stringEncoding", stringEncoding); } } diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildColumnProcessorFactory.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildColumnProcessorFactory.java new file mode 100644 index 000000000000..d0823889578c --- /dev/null +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildColumnProcessorFactory.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.datasketches.hll; + +import org.apache.datasketches.hll.HllSketch; +import org.apache.druid.java.util.common.StringEncoding; +import org.apache.druid.segment.BaseDoubleColumnValueSelector; +import org.apache.druid.segment.BaseFloatColumnValueSelector; +import org.apache.druid.segment.BaseLongColumnValueSelector; +import org.apache.druid.segment.BaseObjectColumnValueSelector; +import org.apache.druid.segment.ColumnProcessorFactory; +import org.apache.druid.segment.DimensionSelector; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.data.IndexedInts; + +import java.util.function.Consumer; +import java.util.function.Supplier; + +/** + * Scalar (non-vectorized) column processor factory. + */ +public class HllSketchBuildColumnProcessorFactory implements ColumnProcessorFactory>> +{ + private final StringEncoding stringEncoding; + + HllSketchBuildColumnProcessorFactory(StringEncoding stringEncoding) + { + this.stringEncoding = stringEncoding; + } + + @Override + public ColumnType defaultType() + { + return ColumnType.STRING; + } + + @Override + public Consumer> makeDimensionProcessor(DimensionSelector selector, boolean multiValue) + { + return sketch -> { + final IndexedInts row = selector.getRow(); + final int sz = row.size(); + + for (int i = 0; i < sz; i++) { + HllSketchBuildUtil.updateSketchWithDictionarySelector(sketch.get(), stringEncoding, selector, row.get(i)); + } + }; + } + + @Override + public Consumer> makeFloatProcessor(BaseFloatColumnValueSelector selector) + { + return sketch -> { + if (!selector.isNull()) { + // Important that this is *double* typed, since HllSketchBuildAggregator treats doubles and floats the same. + final double value = selector.getFloat(); + sketch.get().update(value); + } + }; + } + + @Override + public Consumer> makeDoubleProcessor(BaseDoubleColumnValueSelector selector) + { + return sketch -> { + if (!selector.isNull()) { + sketch.get().update(selector.getDouble()); + } + }; + } + + @Override + public Consumer> makeLongProcessor(BaseLongColumnValueSelector selector) + { + return sketch -> { + if (!selector.isNull()) { + sketch.get().update(selector.getLong()); + } + }; + } + + @Override + public Consumer> makeComplexProcessor(BaseObjectColumnValueSelector selector) + { + return sketch -> { + final Object o = selector.getObject(); + + if (o != null) { + HllSketchBuildUtil.updateSketch(sketch.get(), stringEncoding, o); + } + }; + } +} diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildUtil.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildUtil.java new file mode 100644 index 000000000000..bcd4c4eb6d90 --- /dev/null +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildUtil.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.datasketches.hll; + +import org.apache.datasketches.hll.HllSketch; +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.StringEncoding; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.UOE; +import org.apache.druid.segment.DimensionDictionarySelector; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; +import java.util.List; + +public class HllSketchBuildUtil +{ + public static void updateSketch(final HllSketch sketch, final StringEncoding stringEncoding, final Object value) + { + if (value instanceof Integer || value instanceof Long) { + sketch.update(((Number) value).longValue()); + } else if (value instanceof Float || value instanceof Double) { + sketch.update(((Number) value).doubleValue()); + } else if (value instanceof String) { + updateSketchWithString(sketch, stringEncoding, (String) value); + } else if (value instanceof List) { + // noinspection rawtypes + for (Object entry : (List) value) { + if (entry != null) { + updateSketchWithString(sketch, stringEncoding, entry.toString()); + } + } + } else if (value instanceof char[]) { + sketch.update((char[]) value); + } else if (value instanceof byte[]) { + sketch.update((byte[]) value); + } else if (value instanceof int[]) { + sketch.update((int[]) value); + } else if (value instanceof long[]) { + sketch.update((long[]) value); + } else { + throw new IAE("Unsupported type " + value.getClass()); + } + } + + public static void updateSketchWithDictionarySelector( + final HllSketch sketch, + final StringEncoding stringEncoding, + final DimensionDictionarySelector selector, + final int id + ) + { + if (stringEncoding == StringEncoding.UTF8 && selector.supportsLookupNameUtf8()) { + final ByteBuffer buf = selector.lookupNameUtf8(id); + + if (buf != null) { + sketch.update(buf); + } + } else { + updateSketchWithString(sketch, stringEncoding, selector.lookupName(id)); + } + } + + private static void updateSketchWithString( + final HllSketch sketch, + final StringEncoding stringEncoding, + @Nullable final String value + ) + { + if (NullHandling.isNullOrEquivalent(value)) { + return; + } + + switch (stringEncoding) { + case UTF8: + sketch.update(StringUtils.toUtf8(value)); + break; + case UTF16LE: + sketch.update(value.toCharArray()); + break; + default: + throw new UOE("Unsupported string encoding [%s]", stringEncoding); + } + } +} diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildVectorAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildVectorAggregator.java index 275636bb68f5..2901f282303d 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildVectorAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildVectorAggregator.java @@ -20,35 +20,52 @@ package org.apache.druid.query.aggregation.datasketches.hll; import org.apache.datasketches.hll.TgtHllType; +import org.apache.druid.java.util.common.StringEncoding; import org.apache.druid.query.aggregation.VectorAggregator; -import org.apache.druid.query.aggregation.datasketches.util.ToObjectVectorColumnProcessorFactory; +import org.apache.druid.query.aggregation.datasketches.hll.vector.HllSketchBuildVectorProcessor; +import org.apache.druid.query.aggregation.datasketches.hll.vector.HllSketchBuildVectorProcessorFactory; import org.apache.druid.segment.ColumnProcessors; import org.apache.druid.segment.vector.VectorColumnSelectorFactory; import javax.annotation.Nullable; import java.nio.ByteBuffer; -import java.util.function.Supplier; public class HllSketchBuildVectorAggregator implements VectorAggregator { + private final HllSketchBuildVectorProcessor processor; private final HllSketchBuildBufferAggregatorHelper helper; - private final Supplier objectSupplier; - HllSketchBuildVectorAggregator( + private HllSketchBuildVectorAggregator( + final HllSketchBuildVectorProcessor processor, + final HllSketchBuildBufferAggregatorHelper helper + ) + { + this.processor = processor; + this.helper = helper; + } + + public static HllSketchBuildVectorAggregator create( final VectorColumnSelectorFactory columnSelectorFactory, final String column, final int lgK, final TgtHllType tgtHllType, + final StringEncoding stringEncoding, final int size ) { - this.helper = new HllSketchBuildBufferAggregatorHelper(lgK, tgtHllType, size); - this.objectSupplier = - ColumnProcessors.makeVectorProcessor( - column, - ToObjectVectorColumnProcessorFactory.INSTANCE, - columnSelectorFactory - ); + final HllSketchBuildBufferAggregatorHelper helper = new HllSketchBuildBufferAggregatorHelper( + lgK, + tgtHllType, + size + ); + + final HllSketchBuildVectorProcessor processor = ColumnProcessors.makeVectorProcessor( + column, + new HllSketchBuildVectorProcessorFactory(helper, stringEncoding), + columnSelectorFactory + ); + + return new HllSketchBuildVectorAggregator(processor, helper); } @Override @@ -58,36 +75,15 @@ public void init(final ByteBuffer buf, final int position) } @Override - public void aggregate(final ByteBuffer buf, final int position, final int startRow, final int endRow) + public void aggregate(ByteBuffer buf, int position, int startRow, int endRow) { - final Object[] vector = objectSupplier.get(); - for (int i = startRow; i < endRow; i++) { - final Object value = vector[i]; - if (value != null) { - HllSketchBuildAggregator.updateSketch(helper.getSketchAtPosition(buf, position), value); - } - } + processor.aggregate(buf, position, startRow, endRow); } @Override - public void aggregate( - final ByteBuffer buf, - final int numRows, - final int[] positions, - @Nullable final int[] rows, - final int positionOffset - ) + public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset) { - final Object[] vector = objectSupplier.get(); - - for (int i = 0; i < numRows; i++) { - final Object o = vector[rows != null ? rows[i] : i]; - - if (o != null) { - final int position = positions[i] + positionOffset; - HllSketchBuildAggregator.updateSketch(helper.getSketchAtPosition(buf, position), o); - } - } + processor.aggregate(buf, numRows, positions, rows, positionOffset); } @Override diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchHolder.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchHolder.java index 81748bd61809..df0b884eaaec 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchHolder.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchHolder.java @@ -162,4 +162,10 @@ public HllSketchHolder merge(HllSketchHolder other) return this; } } + + @Override + public String toString() + { + return "HllSketchHolder{" + (union != null ? union.toString() : sketch.toString()) + "}"; + } } diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeAggregatorFactory.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeAggregatorFactory.java index 4645e82ea2c2..833df8ab1a55 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeAggregatorFactory.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeAggregatorFactory.java @@ -24,6 +24,7 @@ import org.apache.datasketches.hll.HllSketch; import org.apache.datasketches.hll.TgtHllType; import org.apache.datasketches.hll.Union; +import org.apache.druid.java.util.common.StringEncoding; import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorFactoryNotMergeableException; @@ -41,6 +42,12 @@ /** * This aggregator factory is for merging existing sketches. * The input column must contain {@link HllSketch} + * + * Note: aggregators generated by this class do not directly use "stringEncoding", but it is part of this class + * anyway so we can preserve enough information to ensure that we are merging sketches in a valid way. (Sketches with + * incompatible string encodings cannot be merged meaningfully.) Currently, the only way this is exposed is through + * {@link #getMergingFactory}, which will throw {@link AggregatorFactoryNotMergeableException} if presented with + * two aggregators with two different encodings. */ public class HllSketchMergeAggregatorFactory extends HllSketchAggregatorFactory { @@ -52,17 +59,20 @@ public HllSketchMergeAggregatorFactory( @JsonProperty("fieldName") final String fieldName, @JsonProperty("lgK") @Nullable final Integer lgK, @JsonProperty("tgtHllType") @Nullable final String tgtHllType, + @JsonProperty("stringEncoding") @Nullable final StringEncoding stringEncoding, @JsonProperty("shouldFinalize") final Boolean shouldFinalize, @JsonProperty("round") final boolean round ) { - super(name, fieldName, lgK, tgtHllType, shouldFinalize, round); + super(name, fieldName, lgK, tgtHllType, stringEncoding, shouldFinalize, round); } @Override public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException { - if (other.getName().equals(this.getName()) && other instanceof HllSketchMergeAggregatorFactory) { + if (other.getName().equals(this.getName()) + && other instanceof HllSketchMergeAggregatorFactory + && getStringEncoding() == ((HllSketchMergeAggregatorFactory) other).getStringEncoding()) { HllSketchMergeAggregatorFactory castedOther = (HllSketchMergeAggregatorFactory) other; if (castedOther.isShouldFinalize() == isShouldFinalize()) { @@ -71,6 +81,7 @@ public AggregatorFactory getMergingFactory(AggregatorFactory other) throws Aggre getName(), Math.max(getLgK(), castedOther.getLgK()), getTgtHllType().compareTo(castedOther.getTgtHllType()) < 0 ? castedOther.getTgtHllType() : getTgtHllType(), + getStringEncoding(), isShouldFinalize(), isRound() || castedOther.isRound() ); @@ -145,9 +156,9 @@ public AggregatorFactory withName(String newName) getFieldName(), getLgK(), getTgtHllType(), + getStringEncoding(), isShouldFinalize(), isRound() ); } - } diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeVectorAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeVectorAggregator.java index aee9bbd16721..8c7e214aa982 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeVectorAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeVectorAggregator.java @@ -85,6 +85,7 @@ public void aggregate( final int positionOffset ) { + final WritableMemory mem = WritableMemory.writableWrap(buf, ByteOrder.LITTLE_ENDIAN); final Object[] vector = objectSupplier.get(); for (int i = 0; i < numRows; i++) { @@ -92,11 +93,7 @@ public void aggregate( if (o != null) { final int position = positions[i] + positionOffset; - - final WritableMemory mem = WritableMemory.writableWrap(buf, ByteOrder.LITTLE_ENDIAN) - .writableRegion(position, helper.getSize()); - - final Union union = Union.writableWrap(mem); + final Union union = Union.writableWrap(mem.writableRegion(position, helper.getSize())); union.update(o.getSketch()); } } diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchModule.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchModule.java index 68da26a94b44..ea2f11ca785b 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchModule.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchModule.java @@ -29,6 +29,7 @@ import org.apache.druid.initialization.DruidModule; import org.apache.druid.query.aggregation.datasketches.hll.sql.HllPostAggExprMacros; import org.apache.druid.query.aggregation.datasketches.hll.sql.HllSketchApproxCountDistinctSqlAggregator; +import org.apache.druid.query.aggregation.datasketches.hll.sql.HllSketchApproxCountDistinctUtf8SqlAggregator; import org.apache.druid.query.aggregation.datasketches.hll.sql.HllSketchEstimateOperatorConversion; import org.apache.druid.query.aggregation.datasketches.hll.sql.HllSketchEstimateWithErrorBoundsOperatorConversion; import org.apache.druid.query.aggregation.datasketches.hll.sql.HllSketchObjectSqlAggregator; @@ -59,6 +60,7 @@ public void configure(final Binder binder) { registerSerde(); SqlBindings.addAggregator(binder, HllSketchApproxCountDistinctSqlAggregator.class); + SqlBindings.addAggregator(binder, HllSketchApproxCountDistinctUtf8SqlAggregator.class); SqlBindings.addAggregator(binder, HllSketchObjectSqlAggregator.class); SqlBindings.addOperatorConversion(binder, HllSketchEstimateOperatorConversion.class); diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchApproxCountDistinctSqlAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchApproxCountDistinctSqlAggregator.java index 29f7c819be12..757674d6aa68 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchApproxCountDistinctSqlAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchApproxCountDistinctSqlAggregator.java @@ -24,6 +24,7 @@ import org.apache.calcite.sql.type.InferTypes; import org.apache.calcite.sql.type.SqlTypeFamily; import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.druid.java.util.common.StringEncoding; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.post.FinalizingFieldAccessPostAggregator; import org.apache.druid.sql.calcite.aggregation.Aggregation; @@ -48,7 +49,7 @@ public class HllSketchApproxCountDistinctSqlAggregator extends HllSketchBaseSqlA public HllSketchApproxCountDistinctSqlAggregator() { - super(true); + super(true, StringEncoding.UTF16LE); } @Override @@ -66,10 +67,7 @@ protected Aggregation toAggregation( { return Aggregation.create( Collections.singletonList(aggregatorFactory), - finalizeAggregations ? new FinalizingFieldAccessPostAggregator( - name, - aggregatorFactory.getName() - ) : null + finalizeAggregations ? new FinalizingFieldAccessPostAggregator(name, aggregatorFactory.getName()) : null ); } } diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchApproxCountDistinctUtf8SqlAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchApproxCountDistinctUtf8SqlAggregator.java new file mode 100644 index 000000000000..070fbd9f7337 --- /dev/null +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchApproxCountDistinctUtf8SqlAggregator.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.datasketches.hll.sql; + +import org.apache.calcite.sql.SqlAggFunction; +import org.apache.calcite.sql.SqlFunctionCategory; +import org.apache.calcite.sql.type.InferTypes; +import org.apache.calcite.sql.type.SqlTypeFamily; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.druid.java.util.common.StringEncoding; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.post.FinalizingFieldAccessPostAggregator; +import org.apache.druid.sql.calcite.aggregation.Aggregation; +import org.apache.druid.sql.calcite.aggregation.SqlAggregator; +import org.apache.druid.sql.calcite.expression.OperatorConversions; + +import java.util.Collections; + +/** + * Like {@link HllSketchApproxCountDistinctSqlAggregator}, but uses {@link StringEncoding#UTF8} instead of + * {@link org.apache.druid.query.aggregation.datasketches.hll.HllSketchAggregatorFactory#DEFAULT_STRING_ENCODING}. + * + * Currently undocumented. Only accepts strings, not sketches. The purpose of this function is to allow us to experiment + * with UTF-8-based HLL counting while we figure out how the API should work. + * + * See https://github.com/apache/druid/pull/11201 for details. + */ +public class HllSketchApproxCountDistinctUtf8SqlAggregator + extends HllSketchBaseSqlAggregator + implements SqlAggregator +{ + public static final String NAME = "APPROX_COUNT_DISTINCT_DS_HLL_UTF8"; + private static final SqlAggFunction FUNCTION_INSTANCE = + OperatorConversions.aggregatorBuilder(NAME) + .operandNames("column", "lgK", "tgtHllType") + .operandTypes(SqlTypeFamily.STRING, SqlTypeFamily.NUMERIC, SqlTypeFamily.STRING) + .operandTypeInference(InferTypes.VARCHAR_1024) + .requiredOperandCount(1) + .literalOperands(1, 2) + .returnTypeNonNull(SqlTypeName.BIGINT) + .functionCategory(SqlFunctionCategory.NUMERIC) + .build(); + + public HllSketchApproxCountDistinctUtf8SqlAggregator() + { + super(true, StringEncoding.UTF8); + } + + @Override + public SqlAggFunction calciteFunction() + { + return FUNCTION_INSTANCE; + } + + @Override + protected Aggregation toAggregation( + String name, + boolean finalizeAggregations, + AggregatorFactory aggregatorFactory + ) + { + return Aggregation.create( + Collections.singletonList(aggregatorFactory), + finalizeAggregations ? new FinalizingFieldAccessPostAggregator(name, aggregatorFactory.getName()) : null + ); + } +} diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchBaseSqlAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchBaseSqlAggregator.java index 09f2609801dc..c6dd3e7afa02 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchBaseSqlAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchBaseSqlAggregator.java @@ -27,6 +27,7 @@ import org.apache.calcite.rex.RexNode; import org.apache.calcite.sql.SqlKind; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StringEncoding; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.datasketches.SketchQueryContext; import org.apache.druid.query.aggregation.datasketches.hll.HllSketchAggregatorFactory; @@ -53,10 +54,12 @@ public abstract class HllSketchBaseSqlAggregator implements SqlAggregator private static final boolean ROUND = true; private final boolean finalizeSketch; + private final StringEncoding stringEncoding; - protected HllSketchBaseSqlAggregator(boolean finalizeSketch) + protected HllSketchBaseSqlAggregator(boolean finalizeSketch, StringEncoding stringEncoding) { this.finalizeSketch = finalizeSketch; + this.stringEncoding = stringEncoding; } @Nullable @@ -137,6 +140,11 @@ public Aggregation toDruidAggregation( columnArg.getDirectColumn(), logK, tgtHllType, + + // For HllSketchMergeAggregatorFactory, stringEncoding is only advisory to aid in detection of mismatched + // merges. It does not affect the results of the aggregator. At this point in the code, we do not know what + // the input encoding of the original sketches was, so we set it to the default. + HllSketchAggregatorFactory.DEFAULT_STRING_ENCODING, finalizeSketch || SketchQueryContext.isFinalizeOuterSketches(plannerContext), ROUND ); @@ -169,6 +177,11 @@ public Aggregation toDruidAggregation( dimensionSpec.getOutputName(), logK, tgtHllType, + + // For HllSketchMergeAggregatorFactory, stringEncoding is only advisory to aid in detection of mismatched + // merges. It does not affect the results of the aggregator. At this point in the code, we do not know what + // the input encoding of the original sketches was, so we set it to the default. + HllSketchAggregatorFactory.DEFAULT_STRING_ENCODING, finalizeSketch || SketchQueryContext.isFinalizeOuterSketches(plannerContext), ROUND ); @@ -178,6 +191,7 @@ public Aggregation toDruidAggregation( dimensionSpec.getDimension(), logK, tgtHllType, + stringEncoding, finalizeSketch || SketchQueryContext.isFinalizeOuterSketches(plannerContext), ROUND ); diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchObjectSqlAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchObjectSqlAggregator.java index 8e695148b978..9d8ade636f1b 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchObjectSqlAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchObjectSqlAggregator.java @@ -25,6 +25,7 @@ import org.apache.calcite.sql.type.SqlTypeFamily; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.datasketches.hll.HllSketchAggregatorFactory; import org.apache.druid.sql.calcite.aggregation.Aggregation; import org.apache.druid.sql.calcite.aggregation.SqlAggregator; import org.apache.druid.sql.calcite.expression.OperatorConversions; @@ -47,7 +48,7 @@ public class HllSketchObjectSqlAggregator extends HllSketchBaseSqlAggregator imp public HllSketchObjectSqlAggregator() { - super(false); + super(false, HllSketchAggregatorFactory.DEFAULT_STRING_ENCODING); } @Override diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/vector/DoubleHllSketchBuildVectorProcessor.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/vector/DoubleHllSketchBuildVectorProcessor.java new file mode 100644 index 000000000000..7655a19992c0 --- /dev/null +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/vector/DoubleHllSketchBuildVectorProcessor.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.datasketches.hll.vector; + +import org.apache.datasketches.hll.HllSketch; +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.query.aggregation.datasketches.hll.HllSketchBuildBufferAggregatorHelper; +import org.apache.druid.segment.vector.VectorValueSelector; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; + +public class DoubleHllSketchBuildVectorProcessor implements HllSketchBuildVectorProcessor +{ + private final HllSketchBuildBufferAggregatorHelper helper; + private final VectorValueSelector selector; + + public DoubleHllSketchBuildVectorProcessor( + final HllSketchBuildBufferAggregatorHelper helper, + final VectorValueSelector selector + ) + { + this.helper = helper; + this.selector = selector; + } + + @Override + public void aggregate(ByteBuffer buf, int position, int startRow, int endRow) + { + final double[] vector = selector.getDoubleVector(); + final boolean[] nullVector = selector.getNullVector(); + + final HllSketch sketch = helper.getSketchAtPosition(buf, position); + + for (int i = startRow; i < endRow; i++) { + if (NullHandling.replaceWithDefault() || nullVector == null || !nullVector[i]) { + sketch.update(vector[i]); + } + } + } + + @Override + public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset) + { + final double[] vector = selector.getDoubleVector(); + final boolean[] nullVector = selector.getNullVector(); + + for (int i = 0; i < numRows; i++) { + final int idx = rows != null ? rows[i] : i; + if (NullHandling.replaceWithDefault() || nullVector == null || !nullVector[idx]) { + final int position = positions[i] + positionOffset; + final HllSketch sketch = helper.getSketchAtPosition(buf, position); + sketch.update(vector[idx]); + } + } + } +} diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/vector/HllSketchBuildVectorProcessor.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/vector/HllSketchBuildVectorProcessor.java new file mode 100644 index 000000000000..d5127b0dea5d --- /dev/null +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/vector/HllSketchBuildVectorProcessor.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.datasketches.hll.vector; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; + +/** + * Processor for {@link org.apache.druid.query.aggregation.datasketches.hll.HllSketchBuildVectorAggregator}. + * + * @see HllSketchBuildVectorProcessorFactory + */ +public interface HllSketchBuildVectorProcessor +{ + void aggregate(ByteBuffer buf, int position, int startRow, int endRow); + + void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset); +} diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/vector/HllSketchBuildVectorProcessorFactory.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/vector/HllSketchBuildVectorProcessorFactory.java new file mode 100644 index 000000000000..aac55a2e0b72 --- /dev/null +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/vector/HllSketchBuildVectorProcessorFactory.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.datasketches.hll.vector; + +import org.apache.druid.java.util.common.StringEncoding; +import org.apache.druid.query.aggregation.datasketches.hll.HllSketchBuildBufferAggregatorHelper; +import org.apache.druid.segment.VectorColumnProcessorFactory; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector; +import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector; +import org.apache.druid.segment.vector.VectorObjectSelector; +import org.apache.druid.segment.vector.VectorValueSelector; + +public class HllSketchBuildVectorProcessorFactory implements VectorColumnProcessorFactory +{ + private final HllSketchBuildBufferAggregatorHelper helper; + private final StringEncoding stringEncoding; + + public HllSketchBuildVectorProcessorFactory( + final HllSketchBuildBufferAggregatorHelper helper, + final StringEncoding stringEncoding + ) + { + this.helper = helper; + this.stringEncoding = stringEncoding; + } + + @Override + public HllSketchBuildVectorProcessor makeSingleValueDimensionProcessor( + ColumnCapabilities capabilities, + SingleValueDimensionVectorSelector selector + ) + { + return new SingleValueStringHllSketchBuildVectorProcessor(helper, stringEncoding, selector); + } + + @Override + public HllSketchBuildVectorProcessor makeMultiValueDimensionProcessor( + ColumnCapabilities capabilities, + MultiValueDimensionVectorSelector selector + ) + { + return new MultiValueStringHllSketchBuildVectorProcessor(helper, stringEncoding, selector); + } + + @Override + public HllSketchBuildVectorProcessor makeFloatProcessor(ColumnCapabilities capabilities, VectorValueSelector selector) + { + // No specialized "float" version, for consistency with HllSketchBuildAggregator#updateSketch (it treats floats + // and doubles identically). + return new DoubleHllSketchBuildVectorProcessor(helper, selector); + } + + @Override + public HllSketchBuildVectorProcessor makeDoubleProcessor( + ColumnCapabilities capabilities, + VectorValueSelector selector + ) + { + return new DoubleHllSketchBuildVectorProcessor(helper, selector); + } + + @Override + public HllSketchBuildVectorProcessor makeLongProcessor(ColumnCapabilities capabilities, VectorValueSelector selector) + { + return new LongHllSketchBuildVectorProcessor(helper, selector); + } + + @Override + public HllSketchBuildVectorProcessor makeObjectProcessor( + ColumnCapabilities capabilities, + VectorObjectSelector selector + ) + { + return new ObjectHllSketchBuildVectorProcessor(helper, stringEncoding, selector); + } +} diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/vector/LongHllSketchBuildVectorProcessor.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/vector/LongHllSketchBuildVectorProcessor.java new file mode 100644 index 000000000000..398aef660b7c --- /dev/null +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/vector/LongHllSketchBuildVectorProcessor.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.datasketches.hll.vector; + +import org.apache.datasketches.hll.HllSketch; +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.query.aggregation.datasketches.hll.HllSketchBuildBufferAggregatorHelper; +import org.apache.druid.segment.vector.VectorValueSelector; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; + +public class LongHllSketchBuildVectorProcessor implements HllSketchBuildVectorProcessor +{ + private final HllSketchBuildBufferAggregatorHelper helper; + private final VectorValueSelector selector; + + public LongHllSketchBuildVectorProcessor( + final HllSketchBuildBufferAggregatorHelper helper, + final VectorValueSelector selector + ) + { + this.helper = helper; + this.selector = selector; + } + + @Override + public void aggregate(ByteBuffer buf, int position, int startRow, int endRow) + { + final long[] vector = selector.getLongVector(); + final boolean[] nullVector = selector.getNullVector(); + + final HllSketch sketch = helper.getSketchAtPosition(buf, position); + + for (int i = startRow; i < endRow; i++) { + if (NullHandling.replaceWithDefault() || nullVector == null || !nullVector[i]) { + sketch.update(vector[i]); + } + } + } + + @Override + public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset) + { + final long[] vector = selector.getLongVector(); + final boolean[] nullVector = selector.getNullVector(); + + for (int i = 0; i < numRows; i++) { + final int idx = rows != null ? rows[i] : i; + if (NullHandling.replaceWithDefault() || nullVector == null || !nullVector[idx]) { + final int position = positions[i] + positionOffset; + final HllSketch sketch = helper.getSketchAtPosition(buf, position); + sketch.update(vector[idx]); + } + } + } +} diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/vector/MultiValueStringHllSketchBuildVectorProcessor.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/vector/MultiValueStringHllSketchBuildVectorProcessor.java new file mode 100644 index 000000000000..074c18bbd6e2 --- /dev/null +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/vector/MultiValueStringHllSketchBuildVectorProcessor.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.datasketches.hll.vector; + +import org.apache.datasketches.hll.HllSketch; +import org.apache.druid.java.util.common.StringEncoding; +import org.apache.druid.query.aggregation.datasketches.hll.HllSketchBuildBufferAggregatorHelper; +import org.apache.druid.query.aggregation.datasketches.hll.HllSketchBuildUtil; +import org.apache.druid.segment.data.IndexedInts; +import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; + +public class MultiValueStringHllSketchBuildVectorProcessor implements HllSketchBuildVectorProcessor +{ + private final HllSketchBuildBufferAggregatorHelper helper; + private final StringEncoding stringEncoding; + private final MultiValueDimensionVectorSelector selector; + + public MultiValueStringHllSketchBuildVectorProcessor( + final HllSketchBuildBufferAggregatorHelper helper, + final StringEncoding stringEncoding, + final MultiValueDimensionVectorSelector selector + ) + { + this.helper = helper; + this.stringEncoding = stringEncoding; + this.selector = selector; + } + + @Override + public void aggregate(ByteBuffer buf, int position, int startRow, int endRow) + { + final IndexedInts[] vector = selector.getRowVector(); + final HllSketch sketch = helper.getSketchAtPosition(buf, position); + + for (int i = startRow; i < endRow; i++) { + final IndexedInts ids = vector[i]; + final int sz = ids.size(); + + for (int j = 0; j < sz; j++) { + HllSketchBuildUtil.updateSketchWithDictionarySelector( + sketch, + stringEncoding, + selector, + ids.get(j) + ); + } + } + } + + @Override + public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset) + { + final IndexedInts[] vector = selector.getRowVector(); + + for (int i = 0; i < numRows; i++) { + final int idx = rows != null ? rows[i] : i; + final int position = positions[i] + positionOffset; + final HllSketch sketch = helper.getSketchAtPosition(buf, position); + + final IndexedInts ids = vector[idx]; + final int sz = ids.size(); + + for (int j = 0; j < sz; j++) { + HllSketchBuildUtil.updateSketchWithDictionarySelector( + sketch, + stringEncoding, + selector, + ids.get(j) + ); + } + } + } +} diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/vector/ObjectHllSketchBuildVectorProcessor.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/vector/ObjectHllSketchBuildVectorProcessor.java new file mode 100644 index 000000000000..56eceb15f5c1 --- /dev/null +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/vector/ObjectHllSketchBuildVectorProcessor.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.datasketches.hll.vector; + +import org.apache.datasketches.hll.HllSketch; +import org.apache.druid.java.util.common.StringEncoding; +import org.apache.druid.query.aggregation.datasketches.hll.HllSketchBuildBufferAggregatorHelper; +import org.apache.druid.query.aggregation.datasketches.hll.HllSketchBuildUtil; +import org.apache.druid.segment.vector.VectorObjectSelector; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; + +/** + * Processor that handles cases where string columns are presented as object selectors instead of dimension selectors. + */ +public class ObjectHllSketchBuildVectorProcessor implements HllSketchBuildVectorProcessor +{ + private final HllSketchBuildBufferAggregatorHelper helper; + private final StringEncoding stringEncoding; + private final VectorObjectSelector selector; + + public ObjectHllSketchBuildVectorProcessor( + final HllSketchBuildBufferAggregatorHelper helper, + final StringEncoding stringEncoding, + final VectorObjectSelector selector + ) + { + this.helper = helper; + this.stringEncoding = stringEncoding; + this.selector = selector; + } + + @Override + public void aggregate(ByteBuffer buf, int position, int startRow, int endRow) + { + final Object[] vector = selector.getObjectVector(); + final HllSketch sketch = helper.getSketchAtPosition(buf, position); + + for (int i = startRow; i < endRow; i++) { + if (vector[i] != null) { + HllSketchBuildUtil.updateSketch( + sketch, + stringEncoding, + vector[i] + ); + } + } + } + + @Override + public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset) + { + final Object[] vector = selector.getObjectVector(); + + for (int i = 0; i < numRows; i++) { + final int idx = rows != null ? rows[i] : i; + final int position = positions[i] + positionOffset; + final HllSketch sketch = helper.getSketchAtPosition(buf, position); + + if (vector[idx] != null) { + HllSketchBuildUtil.updateSketch( + sketch, + stringEncoding, + vector[idx] + ); + } + } + } +} diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/vector/SingleValueStringHllSketchBuildVectorProcessor.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/vector/SingleValueStringHllSketchBuildVectorProcessor.java new file mode 100644 index 000000000000..6c93b5669b81 --- /dev/null +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/vector/SingleValueStringHllSketchBuildVectorProcessor.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.datasketches.hll.vector; + +import org.apache.datasketches.hll.HllSketch; +import org.apache.druid.java.util.common.StringEncoding; +import org.apache.druid.query.aggregation.datasketches.hll.HllSketchBuildBufferAggregatorHelper; +import org.apache.druid.query.aggregation.datasketches.hll.HllSketchBuildUtil; +import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; + +public class SingleValueStringHllSketchBuildVectorProcessor implements HllSketchBuildVectorProcessor +{ + private final HllSketchBuildBufferAggregatorHelper helper; + private final StringEncoding stringEncoding; + private final SingleValueDimensionVectorSelector selector; + + public SingleValueStringHllSketchBuildVectorProcessor( + final HllSketchBuildBufferAggregatorHelper helper, + final StringEncoding stringEncoding, + final SingleValueDimensionVectorSelector selector + ) + { + this.helper = helper; + this.stringEncoding = stringEncoding; + this.selector = selector; + } + + @Override + public void aggregate(ByteBuffer buf, int position, int startRow, int endRow) + { + final int[] vector = selector.getRowVector(); + final HllSketch sketch = helper.getSketchAtPosition(buf, position); + + for (int i = startRow; i < endRow; i++) { + HllSketchBuildUtil.updateSketchWithDictionarySelector( + sketch, + stringEncoding, + selector, + vector[i] + ); + } + } + + @Override + public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset) + { + final int[] vector = selector.getRowVector(); + + for (int i = 0; i < numRows; i++) { + final int idx = rows != null ? rows[i] : i; + final int position = positions[i] + positionOffset; + final HllSketch sketch = helper.getSketchAtPosition(buf, position); + + HllSketchBuildUtil.updateSketchWithDictionarySelector( + sketch, + stringEncoding, + selector, + vector[idx] + ); + } + } +} diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorFactoryTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorFactoryTest.java index 12198fda6c90..2c4ff635faa4 100644 --- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorFactoryTest.java +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorFactoryTest.java @@ -21,6 +21,7 @@ import org.apache.datasketches.hll.HllSketch; import org.apache.datasketches.hll.TgtHllType; +import org.apache.druid.java.util.common.StringEncoding; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.query.Druids; import org.apache.druid.query.aggregation.Aggregator; @@ -54,6 +55,7 @@ public class HllSketchAggregatorFactoryTest private static final String FIELD_NAME = "fieldName"; private static final int LG_K = HllSketchAggregatorFactory.DEFAULT_LG_K; private static final String TGT_HLL_TYPE = TgtHllType.HLL_4.name(); + private static final StringEncoding STRING_ENCODING = StringEncoding.UTF16LE; private static final boolean ROUND = true; private static final double ESTIMATE = Math.PI; @@ -62,7 +64,7 @@ public class HllSketchAggregatorFactoryTest @Before public void setUp() { - target = new TestHllSketchAggregatorFactory(NAME, FIELD_NAME, LG_K, TGT_HLL_TYPE, ROUND); + target = new TestHllSketchAggregatorFactory(NAME, FIELD_NAME, LG_K, TGT_HLL_TYPE, STRING_ENCODING, ROUND); } @Test @@ -71,6 +73,12 @@ public void testIsRound() Assert.assertEquals(ROUND, target.isRound()); } + @Test + public void testStringEncoding() + { + Assert.assertEquals(STRING_ENCODING, target.getStringEncoding()); + } + @Test public void testGetRequiredColumns() { @@ -126,6 +134,7 @@ public void testFinalizeComputatioNoRound() FIELD_NAME, LG_K, TGT_HLL_TYPE, + STRING_ENCODING, !ROUND ); Object actual = t.finalizeComputation(getMockSketch()); @@ -136,7 +145,9 @@ public void testFinalizeComputatioNoRound() @Test public void testEqualsSameObject() { + //noinspection EqualsWithItself Assert.assertEquals(target, target); + Assert.assertArrayEquals(target.getCacheKey(), target.getCacheKey()); } @Test @@ -159,9 +170,11 @@ public void testEqualsOtherDiffName() FIELD_NAME, LG_K, TGT_HLL_TYPE, + STRING_ENCODING, ROUND ); Assert.assertNotEquals(target, other); + Assert.assertFalse(Arrays.equals(target.getCacheKey(), other.getCacheKey())); } @Test @@ -172,9 +185,11 @@ public void testEqualsOtherDiffFieldName() FIELD_NAME + "-diff", LG_K, TGT_HLL_TYPE, + STRING_ENCODING, ROUND ); Assert.assertNotEquals(target, other); + Assert.assertFalse(Arrays.equals(target.getCacheKey(), other.getCacheKey())); } @Test @@ -185,9 +200,11 @@ public void testEqualsOtherDiffLgK() FIELD_NAME, LG_K + 1, TGT_HLL_TYPE, + STRING_ENCODING, ROUND ); Assert.assertNotEquals(target, other); + Assert.assertFalse(Arrays.equals(target.getCacheKey(), other.getCacheKey())); } @Test @@ -198,9 +215,11 @@ public void testEqualsOtherDiffTgtHllType() FIELD_NAME, LG_K, TgtHllType.HLL_8.name(), + STRING_ENCODING, ROUND ); Assert.assertNotEquals(target, other); + Assert.assertFalse(Arrays.equals(target.getCacheKey(), other.getCacheKey())); } @Test @@ -211,9 +230,13 @@ public void testEqualsOtherDiffRound() FIELD_NAME, LG_K, TGT_HLL_TYPE, + STRING_ENCODING, !ROUND ); Assert.assertNotEquals(target, other); + + // Rounding does not affect per-segment results, so it does not affect cache key + Assert.assertArrayEquals(target.getCacheKey(), other.getCacheKey()); } @Test @@ -224,9 +247,11 @@ public void testEqualsOtherMatches() FIELD_NAME, LG_K, TGT_HLL_TYPE, + STRING_ENCODING, ROUND ); Assert.assertEquals(target, other); + Assert.assertArrayEquals(target.getCacheKey(), other.getCacheKey()); } @Test @@ -238,7 +263,7 @@ public void testToString() .collect(Collectors.toList()); for (Field field : toStringFields) { - if ("shouldFinalize".equals(field.getName())) { + if ("shouldFinalize".equals(field.getName()) || "stringEncoding".equals(field.getName())) { // Skip; not included in the toString if it has the default value. continue; } @@ -264,6 +289,7 @@ public void testResultArraySignature() null, null, null, + null, false ), new HllSketchBuildAggregatorFactory( @@ -272,6 +298,7 @@ public void testResultArraySignature() null, null, null, + null, true ), new HllSketchMergeAggregatorFactory( @@ -280,6 +307,7 @@ public void testResultArraySignature() null, null, null, + null, false ), new HllSketchMergeAggregatorFactory( @@ -288,6 +316,7 @@ public void testResultArraySignature() null, null, null, + null, true ) ) @@ -349,10 +378,11 @@ private static class TestHllSketchAggregatorFactory extends HllSketchAggregatorF String fieldName, @Nullable Integer lgK, @Nullable String tgtHllType, + @Nullable StringEncoding stringEncoding, boolean round ) { - super(name, fieldName, lgK, tgtHllType, null, round); + super(name, fieldName, lgK, tgtHllType, stringEncoding, null, round); } @Override @@ -388,7 +418,14 @@ public int getMaxIntermediateSize() @Override public AggregatorFactory withName(String newName) { - return new TestHllSketchAggregatorFactory(newName, getFieldName(), getLgK(), getTgtHllType(), isRound()); + return new TestHllSketchAggregatorFactory( + newName, + getFieldName(), + getLgK(), + getTgtHllType(), + getStringEncoding(), + isRound() + ); } } } diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorTest.java index 91f78ab40ff3..b8acb0ce2c22 100644 --- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorTest.java +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorTest.java @@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.StringEncoding; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.guava.Sequence; @@ -60,6 +61,7 @@ public class HllSketchAggregatorTest extends InitializedNullHandlingTest private final AggregationTestHelper groupByHelper; private final AggregationTestHelper timeseriesHelper; private final QueryContexts.Vectorize vectorize; + private final StringEncoding stringEncoding; @Rule public final TemporaryFolder groupByFolder = new TemporaryFolder(); @@ -67,7 +69,7 @@ public class HllSketchAggregatorTest extends InitializedNullHandlingTest @Rule public final TemporaryFolder timeseriesFolder = new TemporaryFolder(); - public HllSketchAggregatorTest(GroupByQueryConfig config, String vectorize) + public HllSketchAggregatorTest(GroupByQueryConfig config, String vectorize, StringEncoding stringEncoding) { HllSketchModule.registerSerde(); groupByHelper = AggregationTestHelper.createGroupByQueryAggregationTestHelper( @@ -77,15 +79,20 @@ public HllSketchAggregatorTest(GroupByQueryConfig config, String vectorize) new HllSketchModule().getJacksonModules(), timeseriesFolder ); this.vectorize = QueryContexts.Vectorize.fromString(vectorize); + this.stringEncoding = stringEncoding; } - @Parameterized.Parameters(name = "config = {0}, vectorize = {1}") + @Parameterized.Parameters(name = "groupByConfig = {0}, vectorize = {1}, stringEncoding = {2}") public static Collection constructorFeeder() { final List constructors = new ArrayList<>(); for (GroupByQueryConfig config : GroupByQueryRunnerTest.testConfigs()) { - for (String vectorize : new String[]{"false", "true", "force"}) { - constructors.add(new Object[]{config, vectorize}); + for (String vectorize : new String[]{"false", "force"}) { + for (StringEncoding stringEncoding : StringEncoding.values()) { + if (!("v1".equals(config.getDefaultStrategy()) && "force".equals(vectorize))) { + constructors.add(new Object[]{config, vectorize, stringEncoding}); + } + } } } return constructors; @@ -100,11 +107,11 @@ public void ingestSketches() throws Exception Arrays.asList("dim", "multiDim"), Arrays.asList("timestamp", "dim", "multiDim", "sketch") ), - buildAggregatorJson("HLLSketchMerge", "sketch", !ROUND), + buildAggregatorJson("HLLSketchMerge", "sketch", !ROUND, stringEncoding), 0, // minTimestamp Granularities.NONE, - 10, // maxRowCount - buildGroupByQueryJson("HLLSketchMerge", "sketch", !ROUND) + 200, // maxRowCount + buildGroupByQueryJson("HLLSketchMerge", "sketch", !ROUND, stringEncoding) ); List results = seq.toList(); Assert.assertEquals(1, results.size()); @@ -120,17 +127,36 @@ public void ingestSketchesTimeseries() throws Exception Arrays.asList("dim", "multiDim"), Arrays.asList("timestamp", "dim", "multiDim", "sketch") ); - final String aggregators = buildAggregatorJson("HLLSketchMerge", "sketch", !ROUND); + final String aggregators = + buildAggregatorJson("HLLSketchMerge", "sketch", !ROUND, HllSketchAggregatorFactory.DEFAULT_STRING_ENCODING); final int minTimestamp = 0; final Granularity gran = Granularities.NONE; final int maxRowCount = 10; final String queryJson = buildTimeseriesQueryJson("HLLSketchMerge", "sketch", !ROUND); File segmentDir1 = timeseriesFolder.newFolder(); - timeseriesHelper.createIndex(inputFile, parserJson, aggregators, segmentDir1, minTimestamp, gran, maxRowCount, true); + timeseriesHelper.createIndex( + inputFile, + parserJson, + aggregators, + segmentDir1, + minTimestamp, + gran, + maxRowCount, + true + ); File segmentDir2 = timeseriesFolder.newFolder(); - timeseriesHelper.createIndex(inputFile, parserJson, aggregators, segmentDir2, minTimestamp, gran, maxRowCount, true); + timeseriesHelper.createIndex( + inputFile, + parserJson, + aggregators, + segmentDir2, + minTimestamp, + gran, + maxRowCount, + true + ); Sequence seq = timeseriesHelper.runQueryOnSegments(Arrays.asList(segmentDir1, segmentDir2), queryJson); List results = seq.toList(); @@ -148,11 +174,11 @@ public void buildSketchesAtIngestionTime() throws Exception Collections.singletonList("dim"), Arrays.asList("timestamp", "dim", "multiDim", "id") ), - buildAggregatorJson("HLLSketchBuild", "id", !ROUND), + buildAggregatorJson("HLLSketchBuild", "id", !ROUND, stringEncoding), 0, // minTimestamp Granularities.NONE, - 10, // maxRowCount - buildGroupByQueryJson("HLLSketchMerge", "sketch", !ROUND) + 200, // maxRowCount + buildGroupByQueryJson("HLLSketchMerge", "sketch", !ROUND, stringEncoding) ); List results = seq.toList(); Assert.assertEquals(1, results.size()); @@ -169,10 +195,10 @@ public void buildSketchesAtIngestionTimeTimeseries() throws Exception Collections.singletonList("dim"), Arrays.asList("timestamp", "dim", "multiDim", "id") ), - buildAggregatorJson("HLLSketchBuild", "id", !ROUND), + buildAggregatorJson("HLLSketchBuild", "id", !ROUND, stringEncoding), 0, // minTimestamp Granularities.NONE, - 10, // maxRowCount + 200, // maxRowCount buildTimeseriesQueryJson("HLLSketchMerge", "sketch", !ROUND) ); List results = seq.toList(); @@ -193,8 +219,8 @@ public void buildSketchesAtQueryTime() throws Exception "[]", 0, // minTimestamp Granularities.NONE, - 10, // maxRowCount - buildGroupByQueryJson("HLLSketchBuild", "id", !ROUND) + 200, // maxRowCount + buildGroupByQueryJson("HLLSketchBuild", "id", !ROUND, stringEncoding) ); List results = seq.toList(); Assert.assertEquals(1, results.size()); @@ -214,7 +240,7 @@ public void buildSketchesAtQueryTimeTimeseries() throws Exception "[]", 0, // minTimestamp Granularities.NONE, - 10, // maxRowCount + 200, // maxRowCount buildTimeseriesQueryJson("HLLSketchBuild", "id", !ROUND) ); List results = seq.toList(); @@ -241,8 +267,8 @@ public void unsuccessfulComplexTypesInHLL() throws Exception metricSpec, 0, // minTimestamp Granularities.NONE, - 10, // maxRowCount - buildGroupByQueryJson("HLLSketchBuild", "index_hll", !ROUND) + 200, // maxRowCount + buildGroupByQueryJson("HLLSketchMerge", "sketch", ROUND, stringEncoding) ); } catch (RuntimeException e) { @@ -263,8 +289,8 @@ public void buildSketchesAtQueryTimeMultiValue() throws Exception "[]", 0, // minTimestamp Granularities.NONE, - 10, // maxRowCount - buildGroupByQueryJson("HLLSketchBuild", "multiDim", !ROUND) + 200, // maxRowCount + buildGroupByQueryJson("HLLSketchBuild", "multiDim", !ROUND, stringEncoding) ); List results = seq.toList(); Assert.assertEquals(1, results.size()); @@ -284,8 +310,8 @@ public void roundBuildSketch() throws Exception "[]", 0, // minTimestamp Granularities.NONE, - 10, // maxRowCount - buildGroupByQueryJson("HLLSketchBuild", "id", ROUND) + 200, // maxRowCount + buildGroupByQueryJson("HLLSketchBuild", "id", ROUND, stringEncoding) ); List results = seq.toList(); Assert.assertEquals(1, results.size()); @@ -302,11 +328,11 @@ public void roundMergeSketch() throws Exception Arrays.asList("dim", "multiDim"), Arrays.asList("timestamp", "dim", "multiDim", "sketch") ), - buildAggregatorJson("HLLSketchMerge", "sketch", ROUND), + buildAggregatorJson("HLLSketchMerge", "sketch", ROUND, stringEncoding), 0, // minTimestamp Granularities.NONE, - 10, // maxRowCount - buildGroupByQueryJson("HLLSketchMerge", "sketch", ROUND) + 200, // maxRowCount + buildGroupByQueryJson("HLLSketchMerge", "sketch", ROUND, stringEncoding) ); List results = seq.toList(); Assert.assertEquals(1, results.size()); @@ -323,17 +349,17 @@ public void testPostAggs() throws Exception Arrays.asList("dim", "multiDim"), Arrays.asList("timestamp", "dim", "multiDim", "sketch") ), - buildAggregatorJson("HLLSketchMerge", "sketch", ROUND), + buildAggregatorJson("HLLSketchMerge", "sketch", ROUND, stringEncoding), 0, // minTimestamp Granularities.NONE, - 10, // maxRowCount + 200, // maxRowCount groupByHelper.getObjectMapper().writeValueAsString( GroupByQuery.builder() .setDataSource("test_datasource") .setGranularity(Granularities.ALL) .setInterval(Intervals.ETERNITY) .setAggregatorSpecs( - new HllSketchMergeAggregatorFactory("sketch", "sketch", null, null, null, false) + new HllSketchMergeAggregatorFactory("sketch", "sketch", null, null, null, null, false) ) .setPostAggregatorSpecs( ImmutableList.of( @@ -362,7 +388,8 @@ public void testPostAggs() throws Exception ), new FieldAccessPostAggregator("f2", "sketch")), null, null - ) + ), + new FieldAccessPostAggregator("fieldAccess", "sketch") ) ) .build() @@ -378,6 +405,7 @@ public void testPostAggs() throws Exception + " UB : 200.01008469948434\n" + " OutOfOrder Flag: false\n" + " Coupon Count : 200\n"; + List results = seq.toList(); Assert.assertEquals(1, results.size()); ResultRow row = results.get(0); @@ -430,13 +458,15 @@ private static String toJson(Object object) private static String buildAggregatorJson( String aggregationType, String aggregationFieldName, - boolean aggregationRound + boolean aggregationRound, + StringEncoding stringEncoding ) { Map aggregator = buildAggregatorObject( aggregationType, aggregationFieldName, - aggregationRound + aggregationRound, + stringEncoding ); return toJson(Collections.singletonList(aggregator)); } @@ -444,27 +474,32 @@ private static String buildAggregatorJson( private static Map buildAggregatorObject( String aggregationType, String aggregationFieldName, - boolean aggregationRound + boolean aggregationRound, + StringEncoding stringEncoding ) { - return ImmutableMap.of( - "type", aggregationType, - "name", "sketch", - "fieldName", aggregationFieldName, - "round", aggregationRound - ); + return ImmutableMap.builder() + .put("type", aggregationType) + .put("name", "sketch") + .put("fieldName", aggregationFieldName) + .put("round", aggregationRound) + .put("tgtHllType", "HLL_8") + .put("stringEncoding", stringEncoding.toString()) + .build(); } private String buildGroupByQueryJson( String aggregationType, String aggregationFieldName, - boolean aggregationRound + boolean aggregationRound, + StringEncoding stringEncoding ) { Map aggregation = buildAggregatorObject( aggregationType, aggregationFieldName, - aggregationRound + aggregationRound, + stringEncoding ); Map object = new ImmutableMap.Builder() .put("queryType", "groupBy") @@ -472,6 +507,12 @@ private String buildGroupByQueryJson( .put("granularity", "ALL") .put("dimensions", Collections.emptyList()) .put("aggregations", Collections.singletonList(aggregation)) + .put( + "postAggregations", + Collections.singletonList( + ImmutableMap.of("type", "fieldAccess", "name", "sketch_raw", "fieldName", "sketch") + ) + ) .put("intervals", Collections.singletonList("2017-01-01T00:00:00.000Z/2017-01-31T00:00:00.000Z")) .put("context", ImmutableMap.of(QueryContexts.VECTORIZE_KEY, vectorize.toString())) .build(); @@ -487,7 +528,8 @@ private String buildTimeseriesQueryJson( Map aggregation = buildAggregatorObject( aggregationType, aggregationFieldName, - aggregationRound + aggregationRound, + HllSketchAggregatorFactory.DEFAULT_STRING_ENCODING ); Map object = new ImmutableMap.Builder() .put("queryType", "timeseries") diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildAggregatorFactoryTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildAggregatorFactoryTest.java new file mode 100644 index 000000000000..51ca671cd0d7 --- /dev/null +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildAggregatorFactoryTest.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.datasketches.hll; + +import com.fasterxml.jackson.databind.ObjectMapper; +import nl.jqno.equalsverifier.EqualsVerifier; +import org.apache.datasketches.hll.TgtHllType; +import org.apache.druid.java.util.common.StringEncoding; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.segment.TestHelper; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; + +public class HllSketchBuildAggregatorFactoryTest +{ + private final ObjectMapper jsonMapper; + + public HllSketchBuildAggregatorFactoryTest() + { + this.jsonMapper = TestHelper.makeJsonMapper().copy(); + jsonMapper.registerModules(new HllSketchModule().getJacksonModules()); + } + + @Test + public void testSerde() throws IOException + { + final HllSketchBuildAggregatorFactory factory = new HllSketchBuildAggregatorFactory( + "foo", + "bar", + 18, + TgtHllType.HLL_8.name(), + StringEncoding.UTF8, + false, + true + ); + + final String serializedString = jsonMapper.writeValueAsString(factory); + + Assert.assertEquals( + "{\"type\":\"HLLSketchBuild\",\"name\":\"foo\",\"fieldName\":\"bar\",\"lgK\":18,\"tgtHllType\":\"HLL_8\"," + + "\"stringEncoding\":\"utf8\",\"shouldFinalize\":false,\"round\":true}", + serializedString + ); + + final AggregatorFactory factory2 = jsonMapper.readValue( + serializedString, + AggregatorFactory.class + ); + + Assert.assertEquals(factory, factory2); + } + + @Test + public void testSerdeWithDefaults() throws IOException + { + final HllSketchBuildAggregatorFactory factory = new HllSketchBuildAggregatorFactory( + "foo", + "bar", + null, + null, + null, + null, + false + ); + + final String serializedString = jsonMapper.writeValueAsString(factory); + + Assert.assertEquals( + "{\"type\":\"HLLSketchBuild\"," + + "\"name\":\"foo\"," + + "\"fieldName\":\"bar\"," + + "\"lgK\":12," + + "\"tgtHllType\":\"HLL_4\"" + + "}", + serializedString + ); + + final AggregatorFactory factory2 = jsonMapper.readValue( + serializedString, + AggregatorFactory.class + ); + + Assert.assertEquals(factory, factory2); + } + + @Test + public void testEquals() + { + EqualsVerifier.forClass(HllSketchBuildAggregatorFactory.class).usingGetClass().verify(); + } +} diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildAggregatorTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildAggregatorTest.java deleted file mode 100644 index f1163407a911..000000000000 --- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildAggregatorTest.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.query.aggregation.datasketches.hll; - -import org.apache.datasketches.hll.HllSketch; -import org.apache.druid.testing.InitializedNullHandlingTest; -import org.junit.Assert; -import org.junit.Test; - -import java.util.Arrays; - -/** - * Tests for {@link HllSketchBuildAggregator#updateSketch}. - * - * Tests of the aggregator generally should go in {@link HllSketchAggregatorTest} instead. - */ -public class HllSketchBuildAggregatorTest extends InitializedNullHandlingTest -{ - private final HllSketch sketch = new HllSketch(HllSketch.DEFAULT_LG_K); - - @Test - public void testUpdateSketchVariousNumbers() - { - updateSketch(1L, -2L, 1L, -2, 1L, 2.0, 2f, Double.doubleToLongBits(2.0), 3.0); - assertSketchEstimate(4); - } - - @Test - public void testUpdateSketchStrings() - { - updateSketch("foo", null, "bar", ""); - assertSketchEstimate(2); - } - - @Test - public void testUpdateSketchListsOfStrings() - { - updateSketch( - Arrays.asList("1", "2"), - Arrays.asList("2", "", "3", "11"), - Arrays.asList("1", null, "3", "12"), - Arrays.asList("1", "3", "13") - ); - - assertSketchEstimate(6); - } - - @Test - public void testUpdateSketchCharArray() - { - updateSketch( - new char[]{1, 2}, - new char[]{2, 3, 11}, - new char[]{1, 2}, - new char[]{1, 3, 13} - ); - - assertSketchEstimate(3); - } - - @Test - public void testUpdateSketchByteArray() - { - updateSketch( - new byte[]{1, 2}, - new byte[]{2, 3, 11}, - new byte[]{1, 2}, - new byte[]{1, 3, 13} - ); - - assertSketchEstimate(3); - } - - @Test - public void testUpdateSketchIntArray() - { - updateSketch( - new int[]{1, 2}, - new int[]{2, 3, 11}, - new int[]{1, 2}, - new int[]{1, 3, 13} - ); - - assertSketchEstimate(3); - } - - @Test - public void testUpdateSketchLongArray() - { - updateSketch( - new long[]{1, 2}, - new long[]{2, 3, 11}, - new long[]{1, 2}, - new long[]{1, 3, 13} - ); - - assertSketchEstimate(3); - } - - private void updateSketch(final Object first, final Object... others) - { - // first != null check mimics how updateSketch is called: it's always guarded by a null check on the outer value. - if (first != null) { - HllSketchBuildAggregator.updateSketch(sketch, first); - } - - for (final Object o : others) { - if (o != null) { - HllSketchBuildAggregator.updateSketch(sketch, o); - } - } - } - - private void assertSketchEstimate(final long estimate) - { - Assert.assertEquals((double) estimate, sketch.getEstimate(), 0.1); - } -} diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildUtilTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildUtilTest.java new file mode 100644 index 000000000000..eca5e6f37a4f --- /dev/null +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildUtilTest.java @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.datasketches.hll; + +import com.google.common.collect.ImmutableMap; +import org.apache.datasketches.hll.HllSketch; +import org.apache.druid.java.util.common.StringEncoding; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.segment.DimensionDictionarySelector; +import org.apache.druid.segment.IdLookup; +import org.apache.druid.testing.InitializedNullHandlingTest; +import org.junit.Assert; +import org.junit.Test; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Map; + +/** + * Tests for {@link HllSketchBuildUtil#updateSketch}. + */ +public class HllSketchBuildUtilTest extends InitializedNullHandlingTest +{ + private static final Map DICTIONARY = ImmutableMap.of( + 1, "bar", + 2, "foo" + ); + + private final HllSketch sketch = new HllSketch(HllSketch.DEFAULT_LG_K); + + @Test + public void testUpdateSketchListsOfStringsUTF16LE() + { + updateSketch( + StringEncoding.UTF16LE, + Arrays.asList("1", "2"), + Arrays.asList("2", "", "3", "11"), + Arrays.asList("1", null, "3", "12"), + Arrays.asList("1", "3", "13") + ); + + assertSketchEstimate(6); + } + + @Test + public void testUpdateSketchListsOfStringsUTF8() + { + updateSketch( + StringEncoding.UTF16LE, + Arrays.asList("1", "2"), + Arrays.asList("2", "", "3", "11"), + Arrays.asList("1", null, "3", "12"), + Arrays.asList("1", "3", "13") + ); + + assertSketchEstimate(6); + } + + @Test + public void testUpdateSketchCharArray() + { + updateSketch( + StringEncoding.UTF16LE, + new char[]{1, 2}, + new char[]{2, 3, 11}, + new char[]{1, 2}, + new char[]{1, 3, 13} + ); + + assertSketchEstimate(3); + } + + @Test + public void testUpdateSketchByteArray() + { + updateSketch( + StringEncoding.UTF16LE, + new byte[]{1, 2}, + new byte[]{2, 3, 11}, + new byte[]{1, 2}, + new byte[]{1, 3, 13} + ); + + assertSketchEstimate(3); + } + + @Test + public void testUpdateSketchIntArray() + { + updateSketch( + StringEncoding.UTF16LE, + new int[]{1, 2}, + new int[]{2, 3, 11}, + new int[]{1, 2}, + new int[]{1, 3, 13} + ); + + assertSketchEstimate(3); + } + + @Test + public void testUpdateSketchLongArray() + { + updateSketch( + StringEncoding.UTF16LE, + new long[]{1, 2}, + new long[]{2, 3, 11}, + new long[]{1, 2}, + new long[]{1, 3, 13} + ); + + assertSketchEstimate(3); + } + + @Test + public void testUpdateSketchWithDictionarySelector8to8() + { + final TestDictionarySelector selector = new TestDictionarySelector(DICTIONARY, true); + updateSketchWithDictionarySelector(StringEncoding.UTF8, selector, 0, 1, 2, 1); + assertSketchEstimate(2); + } + + @Test + public void testUpdateSketchWithDictionarySelector8to16() + { + final TestDictionarySelector selector = new TestDictionarySelector(DICTIONARY, true); + updateSketchWithDictionarySelector(StringEncoding.UTF16LE, selector, 0, 1, 2, 1); + assertSketchEstimate(2); + } + + @Test + public void testUpdateSketchWithDictionarySelector16to8() + { + final TestDictionarySelector selector = new TestDictionarySelector(DICTIONARY, false); + updateSketchWithDictionarySelector(StringEncoding.UTF8, selector, 0, 1, 2, 1); + assertSketchEstimate(2); + } + + @Test + public void testUpdateSketchWithDictionarySelector16to16() + { + final TestDictionarySelector selector = new TestDictionarySelector(DICTIONARY, false); + updateSketchWithDictionarySelector(StringEncoding.UTF16LE, selector, 0, 1, 2, 1); + assertSketchEstimate(2); + } + + @Test + public void testUpdateSketchWithDictionarySelectorMixedTo8() + { + final TestDictionarySelector selector1 = new TestDictionarySelector(DICTIONARY, false); + final TestDictionarySelector selector2 = new TestDictionarySelector(DICTIONARY, true); + updateSketchWithDictionarySelector(StringEncoding.UTF8, selector1, 0, 1, 2, 1); + updateSketchWithDictionarySelector(StringEncoding.UTF8, selector2, 0, 1, 2, 1); + assertSketchEstimate(2); // Duplicates are de-duplicated + } + + @Test + public void testUpdateSketchWithDictionarySelectorMixedTo16() + { + final TestDictionarySelector selector1 = new TestDictionarySelector(DICTIONARY, false); + final TestDictionarySelector selector2 = new TestDictionarySelector(DICTIONARY, true); + updateSketchWithDictionarySelector(StringEncoding.UTF16LE, selector1, 0, 1, 2, 1); + updateSketchWithDictionarySelector(StringEncoding.UTF16LE, selector2, 0, 1, 2, 1); + assertSketchEstimate(2); // Duplicates are de-duplicated + } + + @Test + public void testUpdateSketchWithDictionarySelector8ToMixed() + { + final TestDictionarySelector selector = new TestDictionarySelector(DICTIONARY, true); + updateSketchWithDictionarySelector(StringEncoding.UTF8, selector, 0, 1, 2, 1); + updateSketchWithDictionarySelector(StringEncoding.UTF16LE, selector, 0, 1, 2, 1); + assertSketchEstimate(4); // Incompatible hashes + } + + @Test + public void testUpdateSketchWithDictionarySelector16ToMixed() + { + final TestDictionarySelector selector = new TestDictionarySelector(DICTIONARY, false); + updateSketchWithDictionarySelector(StringEncoding.UTF8, selector, 0, 1, 2, 1); + updateSketchWithDictionarySelector(StringEncoding.UTF16LE, selector, 0, 1, 2, 1); + assertSketchEstimate(4); // Incompatible hashes + } + + private void updateSketch(final StringEncoding stringEncoding, final Object first, final Object... others) + { + // first != null check mimics how updateSketch is called: it's always guarded by a null check on the outer value. + if (first != null) { + HllSketchBuildUtil.updateSketch(sketch, stringEncoding, first); + } + + for (final Object o : others) { + if (o != null) { + HllSketchBuildUtil.updateSketch(sketch, stringEncoding, o); + } + } + } + + private void updateSketchWithDictionarySelector( + final StringEncoding stringEncoding, + final DimensionDictionarySelector selector, + final int... ids + ) + { + for (int id : ids) { + HllSketchBuildUtil.updateSketchWithDictionarySelector(sketch, stringEncoding, selector, id); + } + } + + private void assertSketchEstimate(final long estimate) + { + Assert.assertEquals((double) estimate, sketch.getEstimate(), 0.1); + } + + private static class TestDictionarySelector implements DimensionDictionarySelector + { + private final Map dictionary; + private final boolean supportsLookupNameUtf8; + + public TestDictionarySelector(final Map dictionary, final boolean supportsLookupNameUtf8) + { + this.dictionary = dictionary; + this.supportsLookupNameUtf8 = supportsLookupNameUtf8; + } + + @Override + public int getValueCardinality() + { + // Unused by this test + throw new UnsupportedOperationException(); + } + + @Nullable + @Override + public String lookupName(int id) + { + return dictionary.get(id); + } + + @Nullable + @Override + public ByteBuffer lookupNameUtf8(int id) + { + if (!supportsLookupNameUtf8) { + throw new UnsupportedOperationException(); + } + + final String s = dictionary.get(id); + + if (s == null) { + return null; + } else { + return ByteBuffer.wrap(StringUtils.toUtf8(s)); + } + } + + @Override + public boolean supportsLookupNameUtf8() + { + return supportsLookupNameUtf8; + } + + @Override + public boolean nameLookupPossibleInAdvance() + { + return true; + } + + @Nullable + @Override + public IdLookup idLookup() + { + return null; + } + } +} diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeAggregatorFactoryTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeAggregatorFactoryTest.java index ee1bd965e190..101b25b99be0 100644 --- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeAggregatorFactoryTest.java +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeAggregatorFactoryTest.java @@ -19,18 +19,26 @@ package org.apache.druid.query.aggregation.datasketches.hll; +import com.fasterxml.jackson.databind.ObjectMapper; +import nl.jqno.equalsverifier.EqualsVerifier; import org.apache.datasketches.hll.TgtHllType; +import org.apache.druid.java.util.common.StringEncoding; +import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorFactoryNotMergeableException; +import org.apache.druid.segment.TestHelper; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.io.IOException; + public class HllSketchMergeAggregatorFactoryTest { private static final String NAME = "name"; private static final String FIELD_NAME = "fieldName"; private static final int LG_K = 2; private static final String TGT_HLL_TYPE = TgtHllType.HLL_6.name(); + private static final StringEncoding STRING_ENCODING = StringEncoding.UTF16LE; private static final boolean SHOULD_FINALIZE = true; private static final boolean ROUND = true; @@ -40,8 +48,24 @@ public class HllSketchMergeAggregatorFactoryTest @Before public void setUp() { - targetRound = new HllSketchMergeAggregatorFactory(NAME, FIELD_NAME, LG_K, TGT_HLL_TYPE, SHOULD_FINALIZE, ROUND); - targetNoRound = new HllSketchMergeAggregatorFactory(NAME, FIELD_NAME, LG_K, TGT_HLL_TYPE, SHOULD_FINALIZE, !ROUND); + targetRound = new HllSketchMergeAggregatorFactory( + NAME, + FIELD_NAME, + LG_K, + TGT_HLL_TYPE, + STRING_ENCODING, + SHOULD_FINALIZE, + ROUND + ); + targetNoRound = new HllSketchMergeAggregatorFactory( + NAME, + FIELD_NAME, + LG_K, + TGT_HLL_TYPE, + STRING_ENCODING, + SHOULD_FINALIZE, + !ROUND + ); } @Test(expected = AggregatorFactoryNotMergeableException.class) @@ -52,6 +76,7 @@ public void testGetMergingFactoryBadName() throws Exception FIELD_NAME, LG_K, TGT_HLL_TYPE, + STRING_ENCODING, SHOULD_FINALIZE, ROUND ); @@ -66,12 +91,29 @@ public void testGetMergingFactoryBadType() throws Exception FIELD_NAME, LG_K, TGT_HLL_TYPE, + STRING_ENCODING, SHOULD_FINALIZE, ROUND ); targetRound.getMergingFactory(other); } + @Test(expected = AggregatorFactoryNotMergeableException.class) + public void testGetMergingFactoryDifferentStringEncoding() throws Exception + { + HllSketchMergeAggregatorFactory other = new HllSketchMergeAggregatorFactory( + NAME, + FIELD_NAME, + LG_K, + TGT_HLL_TYPE, + StringEncoding.UTF8, + SHOULD_FINALIZE, + ROUND + ); + HllSketchAggregatorFactory result = (HllSketchAggregatorFactory) targetRound.getMergingFactory(other); + Assert.assertEquals(LG_K, result.getLgK()); + } + @Test public void testGetMergingFactoryOtherSmallerLgK() throws Exception { @@ -81,6 +123,7 @@ public void testGetMergingFactoryOtherSmallerLgK() throws Exception FIELD_NAME, smallerLgK, TGT_HLL_TYPE, + STRING_ENCODING, SHOULD_FINALIZE, ROUND ); @@ -97,6 +140,7 @@ public void testGetMergingFactoryOtherLargerLgK() throws Exception FIELD_NAME, largerLgK, TGT_HLL_TYPE, + STRING_ENCODING, SHOULD_FINALIZE, ROUND ); @@ -113,6 +157,7 @@ public void testGetMergingFactoryOtherSmallerTgtHllType() throws Exception FIELD_NAME, LG_K, smallerTgtHllType, + STRING_ENCODING, SHOULD_FINALIZE, ROUND ); @@ -129,6 +174,7 @@ public void testGetMergingFactoryOtherLargerTgtHllType() throws Exception FIELD_NAME, LG_K, largerTgtHllType, + STRING_ENCODING, SHOULD_FINALIZE, ROUND ); @@ -164,6 +210,80 @@ public void testGetMergingFactoryThisRoundOtherRound() throws Exception Assert.assertTrue(result.isRound()); } + @Test + public void testSerde() throws IOException + { + final ObjectMapper jsonMapper = TestHelper.makeJsonMapper().copy(); + jsonMapper.registerModules(new HllSketchModule().getJacksonModules()); + + final HllSketchMergeAggregatorFactory factory = new HllSketchMergeAggregatorFactory( + "foo", + "bar", + 18, + TgtHllType.HLL_8.name(), + StringEncoding.UTF8, + false, + true + ); + + final String serializedString = jsonMapper.writeValueAsString(factory); + + Assert.assertEquals( + "{\"type\":\"HLLSketchMerge\",\"name\":\"foo\",\"fieldName\":\"bar\",\"lgK\":18,\"tgtHllType\":\"HLL_8\"," + + "\"stringEncoding\":\"utf8\",\"shouldFinalize\":false,\"round\":true}", + serializedString + ); + + final AggregatorFactory factory2 = jsonMapper.readValue( + serializedString, + AggregatorFactory.class + ); + + Assert.assertEquals(factory, factory2); + } + + @Test + public void testSerdeWithDefaults() throws IOException + { + final ObjectMapper jsonMapper = TestHelper.makeJsonMapper().copy(); + jsonMapper.registerModules(new HllSketchModule().getJacksonModules()); + + final HllSketchMergeAggregatorFactory factory = new HllSketchMergeAggregatorFactory( + "foo", + "bar", + null, + null, + null, + null, + false + ); + + final String serializedString = jsonMapper.writeValueAsString(factory); + + Assert.assertEquals( + "{\"type\":\"HLLSketchMerge\"," + + "\"name\":\"foo\"," + + "\"fieldName\":\"bar\"," + + "\"lgK\":12," + + "\"tgtHllType\":\"HLL_4\"" + + "}", + serializedString + ); + + final AggregatorFactory factory2 = jsonMapper.readValue( + serializedString, + AggregatorFactory.class + ); + + Assert.assertEquals(factory, factory2); + } + + @Test + public void testEquals() + { + EqualsVerifier.forClass(HllSketchBuildAggregatorFactory.class).usingGetClass().verify(); + } + @Test public void testWithName() throws Exception { diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchToEstimatePostAggregatorTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchToEstimatePostAggregatorTest.java index 2c4a31ef7164..f84e318fb047 100644 --- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchToEstimatePostAggregatorTest.java +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchToEstimatePostAggregatorTest.java @@ -94,6 +94,7 @@ public void testResultArraySignature() null, null, null, + null, false ) ) diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchSqlAggregatorTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchSqlAggregatorTest.java index 99c93a1cedd2..498bb06d9afd 100644 --- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchSqlAggregatorTest.java +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/hll/sql/HllSketchSqlAggregatorTest.java @@ -24,6 +24,7 @@ import com.google.inject.Injector; import org.apache.druid.common.config.NullHandling; import org.apache.druid.guice.DruidInjectorBuilder; +import org.apache.druid.java.util.common.StringEncoding; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.PeriodGranularity; @@ -32,10 +33,12 @@ import org.apache.druid.query.Druids; import org.apache.druid.query.QueryDataSource; import org.apache.druid.query.QueryRunnerFactoryConglomerate; +import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory; import org.apache.druid.query.aggregation.FilteredAggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; +import org.apache.druid.query.aggregation.PostAggregator; import org.apache.druid.query.aggregation.datasketches.SketchQueryContext; import org.apache.druid.query.aggregation.datasketches.hll.HllSketchBuildAggregatorFactory; import org.apache.druid.query.aggregation.datasketches.hll.HllSketchMergeAggregatorFactory; @@ -61,6 +64,8 @@ import org.apache.druid.query.topn.TopNQueryBuilder; import org.apache.druid.segment.IndexBuilder; import org.apache.druid.segment.QueryableIndex; +import org.apache.druid.segment.VirtualColumn; +import org.apache.druid.segment.VirtualColumns; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.incremental.IncrementalIndexSchema; import org.apache.druid.segment.join.JoinableFactoryWrapper; @@ -84,11 +89,128 @@ import java.util.Collections; import java.util.List; import java.util.Properties; +import java.util.stream.Collectors; public class HllSketchSqlAggregatorTest extends BaseCalciteQueryTest { private static final boolean ROUND = true; + // For testHllSketchPostAggsGroupBy, testHllSketchPostAggsTimeseries + private static final Object[] EXPECTED_PA_RESULT = + new Object[]{ + "\"AgEHDAMIAgDhUv8P63iABQ==\"", + "\"AgEHDAMIBgALpZ0PjpTfBY5ElQo+C7UE4jA+DKfcYQQ=\"", + "\"AgEHDAMIAQAr8vsG\"", + 2.000000004967054d, + 3.000000004967054d, + 3.000000014901161d, + 2.000000004967054d, + "[2.000000004967054,2.0,2.0001997319422404]", + "[2.000000004967054,2.0,2.000099863468538]", + "\"AgEHDAMIBgC1EYgH1mlHBwsKPwu5SK8MIiUxB7iZVwU=\"", + 2L, + "### HLL SKETCH SUMMARY: \n" + + " Log Config K : 12\n" + + " Hll Target : HLL_4\n" + + " Current Mode : LIST\n" + + " Memory : false\n" + + " LB : 2.0\n" + + " Estimate : 2.000000004967054\n" + + " UB : 2.000099863468538\n" + + " OutOfOrder Flag: false\n" + + " Coupon Count : 2\n", + "### HLL SKETCH SUMMARY: \n" + + " LOG CONFIG K : 12\n" + + " HLL TARGET : HLL_4\n" + + " CURRENT MODE : LIST\n" + + " MEMORY : FALSE\n" + + " LB : 2.0\n" + + " ESTIMATE : 2.000000004967054\n" + + " UB : 2.000099863468538\n" + + " OUTOFORDER FLAG: FALSE\n" + + " COUPON COUNT : 2\n", + 2.0, + 2L + }; + + /** + * Expected virtual columns for {@link #testHllSketchPostAggsTimeseries()}, + * {@link #testHllSketchPostAggsGroupBy()}, {@link #testHllSketchFilteredAggregatorsTimeseries()}, and + * {@link #testHllSketchFilteredAggregatorsGroupBy()}. + */ + private static final List EXPECTED_PA_VIRTUAL_COLUMNS = + ImmutableList.of( + new ExpressionVirtualColumn( + "v0", + "concat(\"dim2\",'hello')", + ColumnType.STRING, + TestExprMacroTable.INSTANCE + ), + new ExpressionVirtualColumn( + "v1", + "pow(abs((\"m1\" + 100)),2)", + ColumnType.DOUBLE, + TestExprMacroTable.INSTANCE + ) + ); + + /** + * Expected aggregators for {@link #testHllSketchPostAggsTimeseries()} and {@link #testHllSketchPostAggsGroupBy()}. + */ + private static final List EXPECTED_PA_AGGREGATORS = + ImmutableList.of( + new HllSketchBuildAggregatorFactory("a0", "dim2", null, null, null, false, true), + new HllSketchBuildAggregatorFactory("a1", "m1", null, null, null, false, true), + new HllSketchBuildAggregatorFactory("a2", "cnt", null, null, null, false, true), + new HllSketchBuildAggregatorFactory("a3", "v0", null, null, null, false, true), + new HllSketchBuildAggregatorFactory("a4", "v1", null, null, null, false, true), + new HllSketchBuildAggregatorFactory("a5", "dim2", null, null, null, true, true), + new HllSketchBuildAggregatorFactory("a6", "dim2", null, null, StringEncoding.UTF8, true, true) + ); + + /** + * Expected aggregators for {@link #testHllSketchFilteredAggregatorsTimeseries()} and + * {@link #testHllSketchFilteredAggregatorsGroupBy()}. + */ + private static final List EXPECTED_FILTERED_AGGREGATORS = + EXPECTED_PA_AGGREGATORS.stream() + .limit(5) + .map(factory -> new FilteredAggregatorFactory(factory, selector("dim2", "a", null))) + .collect(Collectors.toList()); + + /** + * Expected post-aggregators for {@link #testHllSketchPostAggsTimeseries()} and + * {@link #testHllSketchPostAggsGroupBy()}. + */ + private static final List EXPECTED_PA_POST_AGGREGATORS = + ImmutableList.of( + new HllSketchToEstimatePostAggregator("p1", new FieldAccessPostAggregator("p0", "a0"), false), + new HllSketchToEstimatePostAggregator("p3", new FieldAccessPostAggregator("p2", "a0"), false), + new ExpressionPostAggregator("p4", "(\"p3\" + 1)", null, TestExprMacroTable.INSTANCE), + new HllSketchToEstimatePostAggregator("p6", new FieldAccessPostAggregator("p5", "a3"), false), + new HllSketchToEstimatePostAggregator("p8", new FieldAccessPostAggregator("p7", "a0"), false), + new ExpressionPostAggregator("p9", "abs(\"p8\")", null, TestExprMacroTable.INSTANCE), + new HllSketchToEstimateWithBoundsPostAggregator("p11", new FieldAccessPostAggregator("p10", "a0"), 2), + new HllSketchToEstimateWithBoundsPostAggregator("p13", new FieldAccessPostAggregator("p12", "a0"), 1), + new HllSketchToStringPostAggregator("p15", new FieldAccessPostAggregator("p14", "a0")), + new HllSketchToStringPostAggregator("p17", new FieldAccessPostAggregator("p16", "a0")), + new ExpressionPostAggregator("p18", "upper(\"p17\")", null, TestExprMacroTable.INSTANCE), + new HllSketchToEstimatePostAggregator("p20", new FieldAccessPostAggregator("p19", "a0"), true) + ); + + /** + * Expected post-aggregators for {@link #testHllSketchFilteredAggregatorsTimeseries()} and + * {@link #testHllSketchFilteredAggregatorsGroupBy()}. + */ + private static final List EXPECTED_FILTERED_POST_AGGREGATORS = + ImmutableList.of( + new HllSketchToEstimatePostAggregator("p1", new FieldAccessPostAggregator("p0", "a0"), false), + new HllSketchToEstimatePostAggregator("p3", new FieldAccessPostAggregator("p2", "a1"), false), + new HllSketchToEstimatePostAggregator("p5", new FieldAccessPostAggregator("p4", "a2"), false), + new HllSketchToEstimatePostAggregator("p7", new FieldAccessPostAggregator("p6", "a3"), false), + new HllSketchToEstimatePostAggregator("p9", new FieldAccessPostAggregator("p8", "a4"), false) + ); + private static final ExprMacroTable MACRO_TABLE = new ExprMacroTable( ImmutableList.of( new HllPostAggExprMacros.HLLSketchEstimateExprMacro() @@ -133,6 +255,7 @@ public SpecificSegmentsQuerySegmentWalker createQuerySegmentWalker( "dim1", null, null, + null, false, ROUND ), @@ -141,6 +264,7 @@ public SpecificSegmentsQuerySegmentWalker createQuerySegmentWalker( "dim3", null, null, + null, false, false ) @@ -217,44 +341,16 @@ public void testApproxCountDistinctHllSketch() .aggregators( ImmutableList.of( new LongSumAggregatorFactory("a0", "cnt"), - new HllSketchBuildAggregatorFactory( - "a1", - "dim2", - null, - null, - null, - ROUND - ), + new HllSketchBuildAggregatorFactory("a1", "dim2", null, null, null, null, ROUND), new FilteredAggregatorFactory( - new HllSketchBuildAggregatorFactory( - "a2", - "dim2", - null, - null, - null, - ROUND - ), + new HllSketchBuildAggregatorFactory("a2", "dim2", null, null, null, null, ROUND), BaseCalciteQueryTest.not(BaseCalciteQueryTest.selector("dim2", "", null)) ), - new HllSketchBuildAggregatorFactory( - "a3", - "v0", - null, - null, - null, - ROUND - ), - new HllSketchBuildAggregatorFactory( - "a4", - "v1", - null, - null, - null, - ROUND - ), - new HllSketchMergeAggregatorFactory("a5", "hllsketch_dim1", 21, "HLL_8", null, ROUND), - new HllSketchMergeAggregatorFactory("a6", "hllsketch_dim1", null, null, null, ROUND), - new HllSketchMergeAggregatorFactory("a7", "hllsketch_dim1", 21, "HLL_4", null, ROUND) + new HllSketchBuildAggregatorFactory("a3", "v0", null, null, null, null, ROUND), + new HllSketchBuildAggregatorFactory("a4", "v1", null, null, null, null, ROUND), + new HllSketchMergeAggregatorFactory("a5", "hllsketch_dim1", 21, "HLL_8", null, null, ROUND), + new HllSketchMergeAggregatorFactory("a6", "hllsketch_dim1", null, null, null, null, ROUND), + new HllSketchMergeAggregatorFactory("a7", "hllsketch_dim1", 21, "HLL_4", null, null, ROUND) ) ) .context(QUERY_CONTEXT_DEFAULT) @@ -303,6 +399,7 @@ public void testAvgDailyCountDistinctHllSketch() null, null, null, + null, ROUND ) ) @@ -380,7 +477,7 @@ public void testApproxCountDistinctHllSketchIsRounded() .setGranularity(Granularities.ALL) .setAggregatorSpecs( aggregators( - new HllSketchBuildAggregatorFactory("a0", "m1", null, null, null, true) + new HllSketchBuildAggregatorFactory("a0", "m1", null, null, null, true, true) ) ) .setHavingSpec(having(selector("a0", "2", null))) @@ -399,33 +496,103 @@ public void testApproxCountDistinctHllSketchIsRounded() } @Test - public void testHllSketchPostAggs() + public void testHllSketchFilteredAggregatorsGroupBy() { - final String sketchSummary = "### HLL SKETCH SUMMARY: \n" - + " Log Config K : 12\n" - + " Hll Target : HLL_4\n" - + " Current Mode : LIST\n" - + " Memory : false\n" - + " LB : 2.0\n" - + " Estimate : 2.000000004967054\n" - + " UB : 2.000099863468538\n" - + " OutOfOrder Flag: false\n" - + " Coupon Count : 2\n"; + testQuery( + "SELECT\n" + + " DS_HLL(dim2) FILTER(WHERE MV_CONTAINS(dim2, 'a')),\n" + + " DS_HLL(m1) FILTER(WHERE MV_CONTAINS(dim2, 'a')),\n" + + " DS_HLL(cnt) FILTER(WHERE MV_CONTAINS(dim2, 'a')),\n" + + " DS_HLL(CONCAT(dim2, 'hello')) FILTER(WHERE MV_CONTAINS(dim2, 'a')),\n" + + " DS_HLL(POWER(ABS(m1 + 100), 2)) FILTER(WHERE MV_CONTAINS(dim2, 'a')),\n" + + " HLL_SKETCH_ESTIMATE(DS_HLL(dim2) FILTER(WHERE MV_CONTAINS(dim2, 'a'))),\n" + + " HLL_SKETCH_ESTIMATE(DS_HLL(m1) FILTER(WHERE MV_CONTAINS(dim2, 'a'))),\n" + + " HLL_SKETCH_ESTIMATE(DS_HLL(cnt) FILTER(WHERE MV_CONTAINS(dim2, 'a'))),\n" + + " HLL_SKETCH_ESTIMATE(DS_HLL(CONCAT(dim2, 'hello')) FILTER(WHERE MV_CONTAINS(dim2, 'a'))),\n" + + " HLL_SKETCH_ESTIMATE(DS_HLL(POWER(ABS(m1 + 100), 2)) FILTER(WHERE MV_CONTAINS(dim2, 'a')))\n" + + "FROM druid.foo\n" + + "GROUP BY cnt", + ImmutableList.of( + GroupByQuery.builder() + .setDataSource(CalciteTests.DATASOURCE1) + .setInterval(new MultipleIntervalSegmentSpec(Collections.singletonList(Filtration.eternity()))) + .setGranularity(Granularities.ALL) + .setVirtualColumns(VirtualColumns.create(EXPECTED_PA_VIRTUAL_COLUMNS)) + .setDimensions(new DefaultDimensionSpec("cnt", "d0", ColumnType.LONG)) + .setAggregatorSpecs(EXPECTED_FILTERED_AGGREGATORS) + .setPostAggregatorSpecs(EXPECTED_FILTERED_POST_AGGREGATORS) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ), + ImmutableList.of( + new Object[]{ + "\"AgEHDAMIAQDhUv8P\"", + "\"AgEHDAMIAgALpZ0PPgu1BA==\"", + "\"AgEHDAMIAQAr8vsG\"", + "\"AgEHDAMIAQCba0kG\"", + "\"AgEHDAMIAgC1EYgHuUivDA==\"", + 1.0, + 2.000000004967054, + 1.0, + 1.0, + 2.000000004967054 + } + ) + ); + } - final String otherSketchSummary = "### HLL SKETCH SUMMARY: \n" - + " LOG CONFIG K : 12\n" - + " HLL TARGET : HLL_4\n" - + " CURRENT MODE : LIST\n" - + " MEMORY : FALSE\n" - + " LB : 2.0\n" - + " ESTIMATE : 2.000000004967054\n" - + " UB : 2.000099863468538\n" - + " OUTOFORDER FLAG: FALSE\n" - + " COUPON COUNT : 2\n"; + @Test + public void testHllSketchFilteredAggregatorsTimeseries() + { + testQuery( + "SELECT\n" + + " DS_HLL(dim2) FILTER(WHERE MV_CONTAINS(dim2, 'a')),\n" + + " DS_HLL(m1) FILTER(WHERE MV_CONTAINS(dim2, 'a')),\n" + + " DS_HLL(cnt) FILTER(WHERE MV_CONTAINS(dim2, 'a')),\n" + + " DS_HLL(CONCAT(dim2, 'hello')) FILTER(WHERE MV_CONTAINS(dim2, 'a')),\n" + + " DS_HLL(POWER(ABS(m1 + 100), 2)) FILTER(WHERE MV_CONTAINS(dim2, 'a')),\n" + + " HLL_SKETCH_ESTIMATE(DS_HLL(dim2) FILTER(WHERE MV_CONTAINS(dim2, 'a'))),\n" + + " HLL_SKETCH_ESTIMATE(DS_HLL(m1) FILTER(WHERE MV_CONTAINS(dim2, 'a'))),\n" + + " HLL_SKETCH_ESTIMATE(DS_HLL(cnt) FILTER(WHERE MV_CONTAINS(dim2, 'a'))),\n" + + " HLL_SKETCH_ESTIMATE(DS_HLL(CONCAT(dim2, 'hello')) FILTER(WHERE MV_CONTAINS(dim2, 'a'))),\n" + + " HLL_SKETCH_ESTIMATE(DS_HLL(POWER(ABS(m1 + 100), 2)) FILTER(WHERE MV_CONTAINS(dim2, 'a')))\n" + + "FROM druid.foo", + ImmutableList.of( + Druids.newTimeseriesQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity()))) + .granularity(Granularities.ALL) + .virtualColumns(VirtualColumns.create(EXPECTED_PA_VIRTUAL_COLUMNS)) + .aggregators(EXPECTED_FILTERED_AGGREGATORS) + .postAggregators(EXPECTED_FILTERED_POST_AGGREGATORS) + .context(QUERY_CONTEXT_DEFAULT) + .build() + ), + ImmutableList.of( + new Object[]{ + "\"AgEHDAMIAQDhUv8P\"", + "\"AgEHDAMIAgALpZ0PPgu1BA==\"", + "\"AgEHDAMIAQAr8vsG\"", + "\"AgEHDAMIAQCba0kG\"", + "\"AgEHDAMIAgC1EYgHuUivDA==\"", + 1.0, + 2.000000004967054, + 1.0, + 1.0, + 2.000000004967054 + } + ) + ); + } + + @Test + public void testHllSketchPostAggsGroupBy() + { testQuery( "SELECT\n" + " DS_HLL(dim2),\n" + " DS_HLL(m1),\n" + + " DS_HLL(cnt),\n" + " HLL_SKETCH_ESTIMATE(DS_HLL(dim2)),\n" + " HLL_SKETCH_ESTIMATE(DS_HLL(dim2)) + 1,\n" + " HLL_SKETCH_ESTIMATE(DS_HLL(CONCAT(dim2, 'hello'))),\n" @@ -436,119 +603,59 @@ public void testHllSketchPostAggs() + " APPROX_COUNT_DISTINCT_DS_HLL(dim2),\n" + " HLL_SKETCH_TO_STRING(DS_HLL(dim2)),\n" + " UPPER(HLL_SKETCH_TO_STRING(DS_HLL(dim2))),\n" - + " HLL_SKETCH_ESTIMATE(DS_HLL(dim2), true)\n" + + " HLL_SKETCH_ESTIMATE(DS_HLL(dim2), true)\n," + + " APPROX_COUNT_DISTINCT_DS_HLL_UTF8(dim2)\n" + + "FROM druid.foo\n" + + "GROUP BY cnt", + ImmutableList.of( + GroupByQuery.builder() + .setDataSource(CalciteTests.DATASOURCE1) + .setInterval(new MultipleIntervalSegmentSpec(Collections.singletonList(Filtration.eternity()))) + .setGranularity(Granularities.ALL) + .setVirtualColumns(VirtualColumns.create(EXPECTED_PA_VIRTUAL_COLUMNS)) + .setDimensions(new DefaultDimensionSpec("cnt", "d0", ColumnType.LONG)) + .setAggregatorSpecs(EXPECTED_PA_AGGREGATORS) + .setPostAggregatorSpecs(EXPECTED_PA_POST_AGGREGATORS) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ), + ImmutableList.of(EXPECTED_PA_RESULT) + ); + } + + @Test + public void testHllSketchPostAggsTimeseries() + { + testQuery( + "SELECT\n" + + " DS_HLL(dim2),\n" + + " DS_HLL(m1),\n" + + " DS_HLL(cnt),\n" + + " HLL_SKETCH_ESTIMATE(DS_HLL(dim2)),\n" + + " HLL_SKETCH_ESTIMATE(DS_HLL(dim2)) + 1,\n" + + " HLL_SKETCH_ESTIMATE(DS_HLL(CONCAT(dim2, 'hello'))),\n" + + " ABS(HLL_SKETCH_ESTIMATE(DS_HLL(dim2))),\n" + + " HLL_SKETCH_ESTIMATE_WITH_ERROR_BOUNDS(DS_HLL(dim2), 2),\n" + + " HLL_SKETCH_ESTIMATE_WITH_ERROR_BOUNDS(DS_HLL(dim2)),\n" + + " DS_HLL(POWER(ABS(m1 + 100), 2)),\n" + + " APPROX_COUNT_DISTINCT_DS_HLL(dim2),\n" + + " HLL_SKETCH_TO_STRING(DS_HLL(dim2)),\n" + + " UPPER(HLL_SKETCH_TO_STRING(DS_HLL(dim2))),\n" + + " HLL_SKETCH_ESTIMATE(DS_HLL(dim2), true),\n" + + " APPROX_COUNT_DISTINCT_DS_HLL_UTF8(dim2)\n" + "FROM druid.foo", ImmutableList.of( Druids.newTimeseriesQueryBuilder() .dataSource(CalciteTests.DATASOURCE1) .intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity()))) .granularity(Granularities.ALL) - .virtualColumns( - new ExpressionVirtualColumn( - "v0", - "concat(\"dim2\",'hello')", - ColumnType.STRING, - TestExprMacroTable.INSTANCE - ), - new ExpressionVirtualColumn( - "v1", - "pow(abs((\"m1\" + 100)),2)", - ColumnType.DOUBLE, - TestExprMacroTable.INSTANCE - ) - ) - .aggregators( - ImmutableList.of( - new HllSketchBuildAggregatorFactory( - "a0", - "dim2", - null, - null, - false, - true - ), - new HllSketchBuildAggregatorFactory( - "a1", - "m1", - null, - null, - false, - true - ), - new HllSketchBuildAggregatorFactory( - "a2", - "v0", - null, - null, - false, - true - ), - new HllSketchBuildAggregatorFactory( - "a3", - "v1", - null, - null, - false, - true - ), - new HllSketchBuildAggregatorFactory( - "a4", - "dim2", - null, - null, - null, - true - ) - ) - ) - .postAggregators( - ImmutableList.of( - new HllSketchToEstimatePostAggregator("p1", new FieldAccessPostAggregator("p0", "a0"), false), - new HllSketchToEstimatePostAggregator("p3", new FieldAccessPostAggregator("p2", "a0"), false), - new ExpressionPostAggregator("p4", "(\"p3\" + 1)", null, TestExprMacroTable.INSTANCE), - new HllSketchToEstimatePostAggregator("p6", new FieldAccessPostAggregator("p5", "a2"), false), - new HllSketchToEstimatePostAggregator( - "p8", - new FieldAccessPostAggregator("p7", "a0"), - false - ), - new ExpressionPostAggregator("p9", "abs(\"p8\")", null, TestExprMacroTable.INSTANCE), - new HllSketchToEstimateWithBoundsPostAggregator( - "p11", - new FieldAccessPostAggregator("p10", "a0"), - 2 - ), - new HllSketchToEstimateWithBoundsPostAggregator( - "p13", - new FieldAccessPostAggregator("p12", "a0"), - 1 - ), - new HllSketchToStringPostAggregator("p15", new FieldAccessPostAggregator("p14", "a0")), - new HllSketchToStringPostAggregator("p17", new FieldAccessPostAggregator("p16", "a0")), - new ExpressionPostAggregator("p18", "upper(\"p17\")", null, TestExprMacroTable.INSTANCE), - new HllSketchToEstimatePostAggregator("p20", new FieldAccessPostAggregator("p19", "a0"), true) - ) - ) + .virtualColumns(VirtualColumns.create(EXPECTED_PA_VIRTUAL_COLUMNS)) + .aggregators(EXPECTED_PA_AGGREGATORS) + .postAggregators(EXPECTED_PA_POST_AGGREGATORS) .context(QUERY_CONTEXT_DEFAULT) .build() ), - ImmutableList.of( - new Object[]{ - "\"AgEHDAMIAgDhUv8P63iABQ==\"", - "\"AgEHDAMIBgALpZ0PjpTfBY5ElQo+C7UE4jA+DKfcYQQ=\"", - 2.000000004967054d, - 3.000000004967054d, - 3.000000014901161d, - 2.000000004967054d, - "[2.000000004967054,2.0,2.0001997319422404]", - "[2.000000004967054,2.0,2.000099863468538]", - "\"AgEHDAMIBgC1EYgH1mlHBwsKPwu5SK8MIiUxB7iZVwU=\"", - 2L, - sketchSummary, - otherSketchSummary, - 2.0 - } - ) + ImmutableList.of(EXPECTED_PA_RESULT) ); } @@ -620,46 +727,11 @@ public void testHllSketchPostAggsFinalizeOuterSketches() ) .aggregators( ImmutableList.of( - new HllSketchBuildAggregatorFactory( - "a0", - "dim2", - null, - null, - null, - true - ), - new HllSketchBuildAggregatorFactory( - "a1", - "m1", - null, - null, - null, - true - ), - new HllSketchBuildAggregatorFactory( - "a2", - "v0", - null, - null, - null, - true - ), - new HllSketchBuildAggregatorFactory( - "a3", - "v1", - null, - null, - null, - true - ), - new HllSketchBuildAggregatorFactory( - "a4", - "dim2", - null, - null, - null, - true - ) + new HllSketchBuildAggregatorFactory("a0", "dim2", null, null, null, true, true), + new HllSketchBuildAggregatorFactory("a1", "m1", null, null, null, true, true), + new HllSketchBuildAggregatorFactory("a2", "v0", null, null, null, true, true), + new HllSketchBuildAggregatorFactory("a3", "v1", null, null, null, true, true), + new HllSketchBuildAggregatorFactory("a4", "dim2", null, null, null, true, true) ) ) .postAggregators( @@ -743,6 +815,7 @@ public void testtHllSketchPostAggsPostSort() "dim2", null, null, + null, false, true ) @@ -789,6 +862,7 @@ public void testEmptyTimeseriesResults() null, null, null, + null, true ), new HllSketchBuildAggregatorFactory( @@ -796,6 +870,7 @@ public void testEmptyTimeseriesResults() "dim2", null, null, + null, false, true ) @@ -833,6 +908,7 @@ public void testGroupByAggregatorDefaultValues() null, null, null, + null, true ), selector("dim1", "nonexistent", null) @@ -843,6 +919,7 @@ public void testGroupByAggregatorDefaultValues() "v0", null, null, + null, false, true ), @@ -884,25 +961,11 @@ public void testGroupByAggregatorDefaultValuesFinalizeOuterSketches() .setAggregatorSpecs( aggregators( new FilteredAggregatorFactory( - new HllSketchBuildAggregatorFactory( - "a0", - "v0", - null, - null, - null, - true - ), + new HllSketchBuildAggregatorFactory("a0", "v0", null, null, null, null, true), selector("dim1", "nonexistent", null) ), new FilteredAggregatorFactory( - new HllSketchBuildAggregatorFactory( - "a1", - "v0", - null, - null, - null, - true - ), + new HllSketchBuildAggregatorFactory("a1", "v0", null, null, null, null, true), selector("dim1", "nonexistent", null) ) ) diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java index fccaa83f3e3b..3c40affa7834 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java @@ -172,7 +172,7 @@ public void testAutoCompactionRowWithMetricAndRowWithoutMetricShouldPreserveExis // FloatSumAggregator combine method takes in two Float but return Double new FloatSumAggregatorFactory("sum_added", "added"), new SketchMergeAggregatorFactory("thetaSketch", "user", 16384, true, false, null), - new HllSketchBuildAggregatorFactory("HLLSketchBuild", "user", 12, TgtHllType.HLL_4.name(), false, false), + new HllSketchBuildAggregatorFactory("HLLSketchBuild", "user", 12, TgtHllType.HLL_4.name(), null, false, false), new DoublesSketchAggregatorFactory("quantilesDoublesSketch", "delta", 128, 1000000000L, null) }, false @@ -266,7 +266,15 @@ public void testAutoCompactionRowWithMetricAndRowWithoutMetricShouldPreserveExis new CountAggregatorFactory("count"), new LongSumAggregatorFactory("sum_added", "added"), new SketchMergeAggregatorFactory("thetaSketch", "user", 16384, true, false, null), - new HllSketchBuildAggregatorFactory("HLLSketchBuild", "user", 12, TgtHllType.HLL_4.name(), false, false), + new HllSketchBuildAggregatorFactory( + "HLLSketchBuild", + "user", + 12, + TgtHllType.HLL_4.name(), + null, + false, + false + ), new DoublesSketchAggregatorFactory("quantilesDoublesSketch", "delta", 128, 1000000000L, null) }, false diff --git a/processing/src/main/java/org/apache/druid/java/util/common/StringEncoding.java b/processing/src/main/java/org/apache/druid/java/util/common/StringEncoding.java new file mode 100644 index 000000000000..c5775d10b27c --- /dev/null +++ b/processing/src/main/java/org/apache/druid/java/util/common/StringEncoding.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.common; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonValue; +import com.google.common.primitives.SignedBytes; + +/** + * An enum that provides a way for users to specify what encoding should be used when hashing strings. + * + * The main reason for this setting's existence is getting the best performance possible. When operating on memory + * mapped segments -- which store strings as UTF-8 -- it is fastest to use "UTF8". When operating on the result of + * expressions, or on an in-heap IncrementalIndex -- which use Java strings -- it is fastest to use "UTF16LE". + * + * This decision cannot be made locally, because different encodings do not generate equivalent hashes, and therefore + * they are not mergeable. The decision must be made globally by the end user or by the SQL planner, and should be + * based on where most input strings are expected to come from. + * + * Currently, UTF8 and UTF16LE are the only two options, because there are no situations where other options would be + * higher-performing. + */ +public enum StringEncoding implements Cacheable +{ + // Do not change order; the ordinal is used by cache keys. Add new ones at the end. + + UTF8, + UTF16LE /* Treat the result of str.toCharArray() as a bag of bytes in little-endian order */; + + @JsonCreator + public static StringEncoding fromString(final String name) + { + return valueOf(StringUtils.toUpperCase(name)); + } + + @Override + public byte[] getCacheKey() + { + return new byte[]{SignedBytes.checkedCast(ordinal())}; + } + + @JsonValue + @Override + public String toString() + { + return StringUtils.toLowerCase(this.name()); + } +} diff --git a/processing/src/main/java/org/apache/druid/java/util/common/StringEncodingDefaultUTF16LEJsonIncludeFilter.java b/processing/src/main/java/org/apache/druid/java/util/common/StringEncodingDefaultUTF16LEJsonIncludeFilter.java new file mode 100644 index 000000000000..332df030856e --- /dev/null +++ b/processing/src/main/java/org/apache/druid/java/util/common/StringEncodingDefaultUTF16LEJsonIncludeFilter.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.common; + +import com.fasterxml.jackson.annotation.JsonInclude; + +/** + * {@link JsonInclude} filter for {@link StringEncoding} that ignores UTF16LE, which is the typical default + * for aggregators. + * + * This API works by "creative" use of equals. It requires warnings to be suppressed and also requires spotbugs + * exclusions (see spotbugs-exclude.xml). + */ +@SuppressWarnings({"EqualsAndHashcode", "EqualsHashCode", "EqualsWhichDoesntCheckParameterClass"}) +public class StringEncodingDefaultUTF16LEJsonIncludeFilter // lgtm [java/inconsistent-equals-and-hashcode] +{ + @Override + public boolean equals(Object obj) + { + return obj == StringEncoding.UTF16LE; + } +} diff --git a/processing/src/test/java/org/apache/druid/java/util/common/StringEncodingDefaultUTF16LEJsonIncludeFilterTest.java b/processing/src/test/java/org/apache/druid/java/util/common/StringEncodingDefaultUTF16LEJsonIncludeFilterTest.java new file mode 100644 index 000000000000..4b93291d779c --- /dev/null +++ b/processing/src/test/java/org/apache/druid/java/util/common/StringEncodingDefaultUTF16LEJsonIncludeFilterTest.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.common; + +import org.junit.Assert; +import org.junit.Test; + +public class StringEncodingDefaultUTF16LEJsonIncludeFilterTest +{ + private final StringEncodingDefaultUTF16LEJsonIncludeFilter filter = + new StringEncodingDefaultUTF16LEJsonIncludeFilter(); + + @Test + @SuppressWarnings({"SimplifiableAssertion", "EqualsBetweenInconvertibleTypes"}) + public void testFilter() + { + Assert.assertTrue(filter.equals(StringEncoding.UTF16LE)); + Assert.assertFalse(filter.equals(StringEncoding.UTF8)); + } +} diff --git a/processing/src/test/java/org/apache/druid/java/util/common/StringEncodingTest.java b/processing/src/test/java/org/apache/druid/java/util/common/StringEncodingTest.java new file mode 100644 index 000000000000..b549e63c8721 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/java/util/common/StringEncodingTest.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.common; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.Arrays; + +public class StringEncodingTest +{ + @Test + public void testSerde() throws IOException + { + final ObjectMapper mapper = new ObjectMapper(); + + Assert.assertEquals( + StringEncoding.UTF16LE, + mapper.readValue(mapper.writeValueAsString(StringEncoding.UTF16LE), StringEncoding.class) + ); + + Assert.assertEquals( + StringEncoding.UTF8, + mapper.readValue(mapper.writeValueAsString(StringEncoding.UTF8), StringEncoding.class) + ); + } + + @Test + public void testGetCacheKey() + { + Assert.assertFalse( + Arrays.equals( + StringEncoding.UTF8.getCacheKey(), + StringEncoding.UTF16LE.getCacheKey() + ) + ); + } +}