diff --git a/docs/content/development/extensions-core/bloom-filter.md b/docs/content/development/extensions-core/bloom-filter.md index 3b83ff02621c..f878e75b2b3b 100644 --- a/docs/content/development/extensions-core/bloom-filter.md +++ b/docs/content/development/extensions-core/bloom-filter.md @@ -24,22 +24,44 @@ title: "Bloom Filter" # Bloom Filter -Make sure to [include](../../operations/including-extensions.html) `druid-bloom-filter` as an extension. +This extension adds the ability to both construct bloom filters from query results, and filter query results by testing +against a bloom filter. Make sure to [include](../../operations/including-extensions.html) `druid-bloom-filter` as an +extension. -BloomFilter is a probabilistic data structure for set membership check. -Following are some characterstics of BloomFilter +A BloomFilter is a probabilistic data structure for performing a set membership check. A bloom filter is a good candidate +to use with Druid for cases where an explicit filter is impossible, e.g. filtering a query against a set of millions of + values. + +Following are some characteristics of BloomFilters: - BloomFilters are highly space efficient when compared to using a HashSet. -- Because of the probabilistic nature of bloom filter false positive (element not present in bloom filter but test() says true) are possible -- false negatives are not possible (if element is present then test() will never say false). -- The false positive probability is configurable (default: 5%) depending on which storage requirement may increase or decrease. -- Lower the false positive probability greater is the space requirement. -- Bloom filters are sensitive to number of elements that will be inserted in the bloom filter. -- During the creation of bloom filter expected number of entries must be specified.If the number of insertions exceed the specified initial number of entries then false positive probability will increase accordingly. +- Because of the probabilistic nature of bloom filters, false positive results are possible (element was not actually +inserted into a bloom filter during construction, but `test()` says true) +- False negatives are not possible (if element is present then `test()` will never say false). +- The false positive probability of this implementation is currently fixed at 5%, but increasing the number of entries +that the filter can hold can decrease this false positive rate in exchange for overall size. +- Bloom filters are sensitive to number of elements that will be inserted in the bloom filter. During the creation of bloom filter expected number of entries must be specified. If the number of insertions exceed + the specified initial number of entries then false positive probability will increase accordingly. + +This extension is currently based on `org.apache.hive.common.util.BloomKFilter` from `hive-storage-api`. Internally, +this implementation uses Murmur3 as the hash algorithm. + +To construct a BloomKFilter externally with Java to use as a filter in a Druid query: + +```java +BloomKFilter bloomFilter = new BloomKFilter(1500); +bloomFilter.addString("value 1"); +bloomFilter.addString("value 2"); +bloomFilter.addString("value 3"); +ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); +BloomKFilter.serialize(byteArrayOutputStream, bloomFilter); +String base64Serialized = Base64.encodeBase64String(byteArrayOutputStream.toByteArray()); +``` -Internally, this implementation of bloom filter uses Murmur3 fast non-cryptographic hash algorithm. +This string can then be used in the native or sql Druid query. -### JSON Representation of Bloom Filter +## Filtering queries with a Bloom Filter +### JSON Specification of Bloom Filter ```json { "type" : "bloom", @@ -75,7 +97,6 @@ Bloom filters are supported in SQL via the `bloom_filter_test` operator: SELECT COUNT(*) FROM druid.foo WHERE bloom_filter_test(, '') ``` - ### Expression and Virtual Column Support The bloom filter extension also adds a bloom filter [Druid expression](../../misc/math-expr.html) which shares syntax @@ -83,4 +104,61 @@ with the SQL operator. ```sql bloom_filter_test(, '') -``` \ No newline at end of file +``` + +## Bloom Filter Query Aggregator + +Input for a `bloomKFilter` can also be created from a druid query with the `bloom` aggregator. + +### JSON Specification of Bloom Filter Aggregator + +```json +{ + "type": "bloom", + "name": , + "maxNumEntries": + "field": + } +``` + +|Property |Description |required? | +|-------------------------|------------------------------|----------------------------------| +|`type` |Aggregator Type. Should always be `bloom`|yes| +|`name` |Output field name |yes| +|`field` |[DimensionSpec](./../dimensionspecs.html) to add to `org.apache.hive.common.util.BloomKFilter` | yes | +|`maxNumEntries` |Maximum number of distinct values supported by `org.apache.hive.common.util.BloomKFilter`, default `1500`| no | + +### Example + +```json +{ + "queryType": "timeseries", + "dataSource": "wikiticker", + "intervals": [ "2015-09-12T00:00:00.000/2015-09-13T00:00:00.000" ], + "granularity": "day", + "aggregations": [ + { + "type": "bloom", + "name": "userBloom", + "maxNumEntries": 100000, + "field": { + "type":"default", + "dimension":"user", + "outputType": "STRING" + } + } + ] +} +``` + +response + +```json +[{"timestamp":"2015-09-12T00:00:00.000Z","result":{"userBloom":"BAAAJhAAAA..."}}] +``` + +These values can then be set in the filter specification above. + +Ordering results by a bloom filter aggregator, for example in a TopN query, will perform a comparatively expensive +linear scan _of the filter itself_ to count the number of set bits as a means of approximating how many items have been +added to the set. As such, ordering by an alternate aggregation is recommended if possible. \ No newline at end of file diff --git a/extensions-contrib/materialized-view-selection/src/main/java/org/apache/druid/query/materializedview/DerivativeDataSourceManager.java b/extensions-contrib/materialized-view-selection/src/main/java/org/apache/druid/query/materializedview/DerivativeDataSourceManager.java index 45ab136e443d..35a5c282ef8a 100644 --- a/extensions-contrib/materialized-view-selection/src/main/java/org/apache/druid/query/materializedview/DerivativeDataSourceManager.java +++ b/extensions-contrib/materialized-view-selection/src/main/java/org/apache/druid/query/materializedview/DerivativeDataSourceManager.java @@ -210,7 +210,7 @@ public Pair map(int index, ResultSet r, St } /** - * caculate the average data size per segment granularity for a given datasource. + * calculate the average data size per segment granularity for a given datasource. * * e.g. for a datasource, there're 5 segments as follows, * interval = "2018-04-01/2017-04-02", segment size = 1024 * 1024 * 2 diff --git a/extensions-contrib/materialized-view-selection/src/main/java/org/apache/druid/query/materializedview/MaterializedViewUtils.java b/extensions-contrib/materialized-view-selection/src/main/java/org/apache/druid/query/materializedview/MaterializedViewUtils.java index 92eff78a0315..2e96c941fa57 100644 --- a/extensions-contrib/materialized-view-selection/src/main/java/org/apache/druid/query/materializedview/MaterializedViewUtils.java +++ b/extensions-contrib/materialized-view-selection/src/main/java/org/apache/druid/query/materializedview/MaterializedViewUtils.java @@ -85,7 +85,7 @@ private static Set extractFieldsFromAggregations(List } /** - * caculate the intervals which are covered by interval2, but not covered by interval1. + * calculate the intervals which are covered by interval2, but not covered by interval1. * result intervals = interval2 - interval1 ∩ interval2 * e.g. * a list of interval2: ["2018-04-01T00:00:00.000Z/2018-04-02T00:00:00.000Z", diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/guice/BloomFilterSerializersModule.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/guice/BloomFilterSerializersModule.java index 5162abb1d059..0ac4f728a243 100644 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/guice/BloomFilterSerializersModule.java +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/guice/BloomFilterSerializersModule.java @@ -27,9 +27,12 @@ import com.fasterxml.jackson.databind.jsontype.NamedType; import com.fasterxml.jackson.databind.module.SimpleModule; import com.fasterxml.jackson.databind.ser.std.StdSerializer; +import org.apache.druid.query.aggregation.bloom.BloomFilterAggregatorFactory; +import org.apache.druid.query.aggregation.bloom.BloomFilterSerde; import org.apache.druid.query.filter.BloomDimFilter; import org.apache.druid.query.filter.BloomKFilter; import org.apache.druid.query.filter.BloomKFilterHolder; +import org.apache.druid.segment.serde.ComplexMetrics; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -41,10 +44,17 @@ public class BloomFilterSerializersModule extends SimpleModule public BloomFilterSerializersModule() { - registerSubtypes(new NamedType(BloomDimFilter.class, BLOOM_FILTER_TYPE_NAME)); + registerSubtypes( + new NamedType(BloomDimFilter.class, BLOOM_FILTER_TYPE_NAME), + new NamedType(BloomFilterAggregatorFactory.class, BLOOM_FILTER_TYPE_NAME) + ); addSerializer(BloomKFilter.class, new BloomKFilterSerializer()); addDeserializer(BloomKFilter.class, new BloomKFilterDeserializer()); addDeserializer(BloomKFilterHolder.class, new BloomKFilterHolderDeserializer()); + + if (ComplexMetrics.getSerdeForType(BLOOM_FILTER_TYPE_NAME) == null) { + ComplexMetrics.registerSerde(BLOOM_FILTER_TYPE_NAME, new BloomFilterSerde()); + } } private static class BloomKFilterSerializer extends StdSerializer diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterAggregator.java new file mode 100644 index 000000000000..652236b7c68a --- /dev/null +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterAggregator.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.bloom; + +import org.apache.druid.query.aggregation.Aggregator; +import org.apache.druid.query.filter.BloomKFilter; +import org.apache.druid.segment.BaseNullableColumnValueSelector; + +import javax.annotation.Nullable; + +public abstract class BaseBloomFilterAggregator implements Aggregator +{ + final BloomKFilter collector; + protected final TSelector selector; + + BaseBloomFilterAggregator(TSelector selector, BloomKFilter collector) + { + this.collector = collector; + this.selector = selector; + } + + @Nullable + @Override + public Object get() + { + return collector; + } + + @Override + public float getFloat() + { + throw new UnsupportedOperationException("BloomFilterAggregator does not support getFloat()"); + } + + @Override + public long getLong() + { + throw new UnsupportedOperationException("BloomFilterAggregator does not support getLong()"); + } + + @Override + public double getDouble() + { + throw new UnsupportedOperationException("BloomFilterAggregator does not support getDouble()"); + } + + @Override + public void close() + { + // nothing to close + } +} diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterBufferAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterBufferAggregator.java new file mode 100644 index 000000000000..74def15c0626 --- /dev/null +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterBufferAggregator.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.bloom; + +import org.apache.druid.query.aggregation.BufferAggregator; +import org.apache.druid.query.filter.BloomKFilter; +import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import org.apache.druid.segment.BaseNullableColumnValueSelector; + +import java.nio.ByteBuffer; + +public abstract class BaseBloomFilterBufferAggregator implements BufferAggregator +{ + protected final int maxNumEntries; + protected final TSelector selector; + + BaseBloomFilterBufferAggregator(TSelector selector, int maxNumEntries) + { + this.selector = selector; + this.maxNumEntries = maxNumEntries; + } + + abstract void bufferAdd(ByteBuffer buf); + + @Override + public void init(ByteBuffer buf, int position) + { + final ByteBuffer mutationBuffer = buf.duplicate(); + mutationBuffer.position(position); + BloomKFilter filter = new BloomKFilter(maxNumEntries); + BloomKFilter.serialize(mutationBuffer, filter); + } + + @Override + public void aggregate(ByteBuffer buf, int position) + { + final int oldPosition = buf.position(); + buf.position(position); + bufferAdd(buf); + buf.position(oldPosition); + } + + + @Override + public Object get(ByteBuffer buf, int position) + { + ByteBuffer mutationBuffer = buf.duplicate(); + mutationBuffer.position(position); + // | k (byte) | numLongs (int) | bitset (long[numLongs]) | + int sizeBytes = 1 + Integer.BYTES + (buf.getInt(position + 1) * Long.BYTES); + mutationBuffer.limit(position + sizeBytes); + return mutationBuffer.slice(); + } + + @Override + public float getFloat(ByteBuffer buf, int position) + { + throw new UnsupportedOperationException("BloomFilterBufferAggregator does not support getFloat()"); + } + + @Override + public long getLong(ByteBuffer buf, int position) + { + throw new UnsupportedOperationException("BloomFilterBufferAggregator does not support getLong()"); + } + + @Override + public double getDouble(ByteBuffer buf, int position) + { + throw new UnsupportedOperationException("BloomFilterBufferAggregator does not support getDouble()"); + } + + @Override + public void close() + { + // nothing to close + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + inspector.visit("selector", selector); + } +} diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregateCombiner.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregateCombiner.java new file mode 100644 index 000000000000..6fc4bf9379e4 --- /dev/null +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregateCombiner.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.bloom; + +import org.apache.druid.query.aggregation.ObjectAggregateCombiner; +import org.apache.druid.query.filter.BloomKFilter; +import org.apache.druid.segment.ColumnValueSelector; + +import javax.annotation.Nullable; + +public class BloomFilterAggregateCombiner extends ObjectAggregateCombiner +{ + @Nullable + private BloomKFilter combined; + + private final int maxNumEntries; + + public BloomFilterAggregateCombiner(int maxNumEntries) + { + this.maxNumEntries = maxNumEntries; + } + + @Override + public void reset(ColumnValueSelector selector) + { + combined = null; + fold(selector); + } + + @Override + public void fold(ColumnValueSelector selector) + { + BloomKFilter other = (BloomKFilter) selector.getObject(); + if (other == null) { + return; + } + if (combined == null) { + combined = new BloomKFilter(maxNumEntries); + } + combined.merge(other); + } + + @Nullable + @Override + public BloomKFilter getObject() + { + return combined; + } + + @Override + public Class classOfObject() + { + return BloomKFilter.class; + } +} diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorFactory.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorFactory.java new file mode 100644 index 000000000000..af60135afe38 --- /dev/null +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorFactory.java @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.bloom; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.guice.BloomFilterSerializersModule; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.RE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.query.aggregation.AggregateCombiner; +import org.apache.druid.query.aggregation.Aggregator; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.AggregatorUtil; +import org.apache.druid.query.aggregation.BufferAggregator; +import org.apache.druid.query.cache.CacheKeyBuilder; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.query.filter.BloomKFilter; +import org.apache.druid.segment.BaseNullableColumnValueSelector; +import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.NilColumnValueSelector; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.column.ValueType; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Objects; + +public class BloomFilterAggregatorFactory extends AggregatorFactory +{ + private static final int DEFAULT_NUM_ENTRIES = 1500; + + private static final Comparator COMPARATOR = Comparator.nullsFirst((o1, o2) -> { + if (o1 instanceof ByteBuffer && o2 instanceof ByteBuffer) { + ByteBuffer buf1 = (ByteBuffer) o1; + ByteBuffer buf2 = (ByteBuffer) o2; + return Integer.compare( + BloomKFilter.getNumSetBits(buf1, buf1.position()), + BloomKFilter.getNumSetBits(buf2, buf2.position()) + ); + } else if (o1 instanceof BloomKFilter && o2 instanceof BloomKFilter) { + BloomKFilter o1f = (BloomKFilter) o1; + BloomKFilter o2f = (BloomKFilter) o2; + return Integer.compare(o1f.getNumSetBits(), o2f.getNumSetBits()); + } else { + throw new RE("Unable to compare unexpected types [%s]", o1.getClass().getName()); + } + }); + + private final String name; + private final DimensionSpec field; + private final int maxNumEntries; + + @JsonCreator + public BloomFilterAggregatorFactory( + @JsonProperty("name") String name, + @JsonProperty("field") final DimensionSpec field, + @Nullable @JsonProperty("maxNumEntries") Integer maxNumEntries + ) + { + this.name = name; + this.field = field; + this.maxNumEntries = maxNumEntries != null ? maxNumEntries : DEFAULT_NUM_ENTRIES; + } + + @Override + public Aggregator factorize(ColumnSelectorFactory columnFactory) + { + BloomKFilter filter = new BloomKFilter(maxNumEntries); + ColumnCapabilities capabilities = columnFactory.getColumnCapabilities(field.getDimension()); + + if (capabilities == null) { + BaseNullableColumnValueSelector selector = columnFactory.makeColumnValueSelector(field.getDimension()); + if (selector instanceof NilColumnValueSelector) { + // BloomKFilter must be the same size so we cannot use a constant for the empty agg + return new EmptyBloomFilterAggregator(filter); + } + throw new IAE( + "Cannot create bloom filter buffer aggregator for column selector type [%s]", + selector.getClass().getName() + ); + } + ValueType type = capabilities.getType(); + switch (type) { + case STRING: + return new StringBloomFilterAggregator(columnFactory.makeDimensionSelector(field), filter); + case LONG: + return new LongBloomFilterAggregator(columnFactory.makeColumnValueSelector(field.getDimension()), filter); + case FLOAT: + return new FloatBloomFilterAggregator(columnFactory.makeColumnValueSelector(field.getDimension()), filter); + case DOUBLE: + return new DoubleBloomFilterAggregator(columnFactory.makeColumnValueSelector(field.getDimension()), filter); + default: + throw new IAE("Cannot create bloom filter aggregator for invalid column type [%s]", type); + } + } + + @Override + public BufferAggregator factorizeBuffered(ColumnSelectorFactory columnFactory) + { + ColumnCapabilities capabilities = columnFactory.getColumnCapabilities(field.getDimension()); + + if (capabilities == null) { + BaseNullableColumnValueSelector selector = columnFactory.makeColumnValueSelector(field.getDimension()); + if (selector instanceof NilColumnValueSelector) { + return new EmptyBloomFilterBufferAggregator(maxNumEntries); + } + throw new IAE( + "Cannot create bloom filter buffer aggregator for column selector type [%s]", + selector.getClass().getName() + ); + } + + ValueType type = capabilities.getType(); + switch (type) { + case STRING: + return new StringBloomFilterBufferAggregator(columnFactory.makeDimensionSelector(field), maxNumEntries); + case LONG: + return new LongBloomFilterBufferAggregator( + columnFactory.makeColumnValueSelector(field.getDimension()), maxNumEntries + ); + case FLOAT: + return new FloatBloomFilterBufferAggregator( + columnFactory.makeColumnValueSelector(field.getDimension()), maxNumEntries + ); + case DOUBLE: + return new DoubleBloomFilterBufferAggregator( + columnFactory.makeColumnValueSelector(field.getDimension()), maxNumEntries + ); + default: + throw new IAE("Cannot create bloom filter buffer aggregator for invalid column type [%s]", type); + } + } + + @Override + public Comparator getComparator() + { + return COMPARATOR; + } + + @Override + public Object combine(@Nullable Object lhs, @Nullable Object rhs) + { + if (rhs == null) { + return lhs; + } + if (lhs == null) { + return rhs; + } + ((BloomKFilter) lhs).merge((BloomKFilter) rhs); + return lhs; + } + + @Override + public AggregateCombiner makeAggregateCombiner() + { + return new BloomFilterAggregateCombiner(maxNumEntries); + } + + @Override + public AggregatorFactory getCombiningFactory() + { + return new BloomFilterMergeAggregatorFactory(name, name, maxNumEntries); + } + + @Override + public List getRequiredColumns() + { + return Collections.singletonList(new BloomFilterAggregatorFactory(name, field, maxNumEntries)); + } + + @Override + public Object deserialize(Object object) + { + if (object instanceof String) { + return ByteBuffer.wrap(StringUtils.decodeBase64String((String) object)); + } else { + return object; + } + } + + @Override + public Object finalizeComputation(Object object) + { + try { + if (object instanceof ByteBuffer) { + return BloomKFilter.deserialize((ByteBuffer) object); + } else if (object instanceof byte[]) { + return BloomKFilter.deserialize(ByteBuffer.wrap((byte[]) object)); + } else { + return object; + } + } + catch (IOException ioe) { + throw new RuntimeException("Failed to deserialize BloomKFilter", ioe); + } + } + + @JsonProperty + @Override + public String getName() + { + return name; + } + + @JsonProperty + public DimensionSpec getField() + { + return field; + } + + @JsonProperty + public int getMaxNumEntries() + { + return maxNumEntries; + } + + @Override + public List requiredFields() + { + return Collections.singletonList(field.getDimension()); + } + + @Override + public String getTypeName() + { + return BloomFilterSerializersModule.BLOOM_FILTER_TYPE_NAME; + } + + @Override + public int getMaxIntermediateSize() + { + return BloomKFilter.computeSizeBytes(maxNumEntries); + } + + @Override + public byte[] getCacheKey() + { + return new CacheKeyBuilder(AggregatorUtil.BLOOM_FILTER_CACHE_TYPE_ID) + .appendCacheable(field) + .appendInt(maxNumEntries) + .build(); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + BloomFilterAggregatorFactory that = (BloomFilterAggregatorFactory) o; + return maxNumEntries == that.maxNumEntries && + Objects.equals(name, that.name) && + Objects.equals(field, that.field); + } + + @Override + public int hashCode() + { + return Objects.hash(name, field, maxNumEntries); + } + + @Override + public String toString() + { + return "BloomFilterAggregatorFactory{" + + "name='" + name + '\'' + + ", field=" + field + + ", maxNumEntries=" + maxNumEntries + + '}'; + } +} diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeAggregator.java new file mode 100644 index 000000000000..67d7a70cb418 --- /dev/null +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeAggregator.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.bloom; + +import org.apache.druid.query.filter.BloomKFilter; +import org.apache.druid.segment.ColumnValueSelector; + +import java.io.IOException; +import java.nio.ByteBuffer; + +public final class BloomFilterMergeAggregator extends BaseBloomFilterAggregator> +{ + public BloomFilterMergeAggregator(ColumnValueSelector selector, BloomKFilter collector) + { + super(selector, collector); + } + + @Override + public void aggregate() + { + Object other = selector.getObject(); + if (other != null) { + if (other instanceof BloomKFilter) { + collector.merge((BloomKFilter) other); + } else if (other instanceof ByteBuffer) { + // fun fact: because bloom filter agg factory deserialize returns a byte buffer to avoid unnecessary serde, + // but GroupByQueryEngine (group by v1) ends up trying to merge ByteBuffers from buffer aggs with this agg + // instead of the BloomFilterBufferMergeAggregator. fun! Also, it requires a 'ComplexMetricSerde' to be + // registered even for query time only aggs, but then never uses it. also fun! + try { + BloomKFilter otherFilter = BloomKFilter.deserialize((ByteBuffer) other); + collector.merge(otherFilter); + } + catch (IOException ioe) { + throw new RuntimeException("Failed to deserialize BloomKFilter", ioe); + } + } + } + } +} diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeAggregatorFactory.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeAggregatorFactory.java new file mode 100644 index 000000000000..8dab8676e1ce --- /dev/null +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeAggregatorFactory.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.bloom; + +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.query.aggregation.Aggregator; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.AggregatorUtil; +import org.apache.druid.query.aggregation.BufferAggregator; +import org.apache.druid.query.cache.CacheKeyBuilder; +import org.apache.druid.query.filter.BloomKFilter; +import org.apache.druid.segment.BaseNullableColumnValueSelector; +import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.NilColumnValueSelector; + +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.List; + +public class BloomFilterMergeAggregatorFactory extends BloomFilterAggregatorFactory +{ + private final String fieldName; + + BloomFilterMergeAggregatorFactory(String name, String field, Integer maxNumEntries) + { + super(name, null, maxNumEntries); + this.fieldName = field; + } + + @Override + public Aggregator factorize(final ColumnSelectorFactory metricFactory) + { + final BaseNullableColumnValueSelector selector = metricFactory.makeColumnValueSelector(fieldName); + // null columns should be empty bloom filters by this point, so encountering a nil column in merge agg is unexpected + if (selector instanceof NilColumnValueSelector) { + throw new ISE("WTF?! Unexpected NilColumnValueSelector"); + } + return new BloomFilterMergeAggregator((ColumnValueSelector) selector, new BloomKFilter(getMaxNumEntries())); + } + + @Override + public BufferAggregator factorizeBuffered(final ColumnSelectorFactory metricFactory) + { + final BaseNullableColumnValueSelector selector = metricFactory.makeColumnValueSelector(fieldName); + // null columns should be empty bloom filters by this point, so encountering a nil column in merge agg is unexpected + if (selector instanceof NilColumnValueSelector) { + throw new ISE("WTF?! Unexpected NilColumnValueSelector"); + } + return new BloomFilterMergeBufferAggregator((ColumnValueSelector) selector, getMaxNumEntries()); + } + + @Override + public List getRequiredColumns() + { + return Collections.singletonList(new BloomFilterMergeAggregatorFactory(getName(), fieldName, getMaxNumEntries())); + } + + @Override + public byte[] getCacheKey() + { + return new CacheKeyBuilder(AggregatorUtil.BLOOM_FILTER_MERGE_CACHE_TYPE_ID) + .appendString(fieldName) + .appendInt(getMaxNumEntries()) + .build(); + } +} diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeBufferAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeBufferAggregator.java new file mode 100644 index 000000000000..026a23e7285d --- /dev/null +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeBufferAggregator.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.bloom; + +import org.apache.druid.query.filter.BloomKFilter; +import org.apache.druid.segment.ColumnValueSelector; + +import java.nio.ByteBuffer; + +public final class BloomFilterMergeBufferAggregator extends BaseBloomFilterBufferAggregator> +{ + public BloomFilterMergeBufferAggregator(ColumnValueSelector selector, int maxNumEntries) + { + super(selector, maxNumEntries); + } + + @Override + public void bufferAdd(ByteBuffer buf) + { + ByteBuffer other = selector.getObject(); + BloomKFilter.mergeBloomFilterByteBuffers(buf, buf.position(), other, other.position()); + } +} diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterSerde.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterSerde.java new file mode 100644 index 000000000000..227fe705ccfc --- /dev/null +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterSerde.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.bloom; + +import org.apache.druid.guice.BloomFilterSerializersModule; +import org.apache.druid.query.filter.BloomKFilter; +import org.apache.druid.segment.GenericColumnSerializer; +import org.apache.druid.segment.column.ColumnBuilder; +import org.apache.druid.segment.data.ObjectStrategy; +import org.apache.druid.segment.serde.ComplexMetricExtractor; +import org.apache.druid.segment.serde.ComplexMetricSerde; +import org.apache.druid.segment.writeout.SegmentWriteOutMedium; + +import java.nio.ByteBuffer; + +/** + * Dummy {@link ComplexMetricSerde} that exists so {@link BloomFilterAggregatorFactory} has something to register so + * {@link org.apache.druid.query.groupby.GroupByQueryEngine} will work, but isn't actually used because bloom filter + * aggregators are currently only implemented for use at query time + */ +public class BloomFilterSerde extends ComplexMetricSerde +{ + @Override + public String getTypeName() + { + return BloomFilterSerializersModule.BLOOM_FILTER_TYPE_NAME; + } + + @Override + public ComplexMetricExtractor getExtractor() + { + throw new UnsupportedOperationException("Bloom filter aggregators are query-time only"); + } + + @Override + public void deserializeColumn(ByteBuffer byteBuffer, ColumnBuilder columnBuilder) + { + throw new UnsupportedOperationException("Bloom filter aggregators are query-time only"); + } + + @Override + public GenericColumnSerializer getSerializer(SegmentWriteOutMedium segmentWriteOutMedium, String column) + { + throw new UnsupportedOperationException("Bloom filter aggregators are query-time only"); + } + + @Override + public ObjectStrategy getObjectStrategy() + { + throw new UnsupportedOperationException("Bloom filter aggregators are query-time only"); + } +} diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/DoubleBloomFilterAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/DoubleBloomFilterAggregator.java new file mode 100644 index 000000000000..dfdae6c20d6e --- /dev/null +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/DoubleBloomFilterAggregator.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.bloom; + +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.query.filter.BloomKFilter; +import org.apache.druid.segment.BaseDoubleColumnValueSelector; + +public final class DoubleBloomFilterAggregator extends BaseBloomFilterAggregator +{ + DoubleBloomFilterAggregator(BaseDoubleColumnValueSelector selector, BloomKFilter collector) + { + super(selector, collector); + } + + @Override + public void aggregate() + { + if (NullHandling.replaceWithDefault() || !selector.isNull()) { + collector.addDouble(selector.getDouble()); + } else { + collector.addBytes(null, 0, 0); + } + } +} diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/DoubleBloomFilterBufferAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/DoubleBloomFilterBufferAggregator.java new file mode 100644 index 000000000000..e84b9fc70df4 --- /dev/null +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/DoubleBloomFilterBufferAggregator.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.bloom; + +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.query.filter.BloomKFilter; +import org.apache.druid.segment.BaseDoubleColumnValueSelector; + +import java.nio.ByteBuffer; + +public final class DoubleBloomFilterBufferAggregator extends BaseBloomFilterBufferAggregator +{ + DoubleBloomFilterBufferAggregator(BaseDoubleColumnValueSelector selector, int maxNumEntries) + { + super(selector, maxNumEntries); + } + + @Override + public void bufferAdd(ByteBuffer buf) + { + if (NullHandling.replaceWithDefault() || !selector.isNull()) { + BloomKFilter.addDouble(buf, selector.getDouble()); + } else { + BloomKFilter.addBytes(buf, null, 0, 0); + } + } +} diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/EmptyBloomFilterAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/EmptyBloomFilterAggregator.java new file mode 100644 index 000000000000..57df6f2fe2c6 --- /dev/null +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/EmptyBloomFilterAggregator.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.bloom; + +import org.apache.druid.query.filter.BloomKFilter; +import org.apache.druid.segment.NilColumnValueSelector; + +public final class EmptyBloomFilterAggregator extends BaseBloomFilterAggregator +{ + EmptyBloomFilterAggregator(BloomKFilter collector) + { + super(NilColumnValueSelector.instance(), collector); + } + + @Override + public void aggregate() + { + // nothing to do + } +} diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/EmptyBloomFilterBufferAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/EmptyBloomFilterBufferAggregator.java new file mode 100644 index 000000000000..7b6301d37ee9 --- /dev/null +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/EmptyBloomFilterBufferAggregator.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.bloom; + +import org.apache.druid.segment.NilColumnValueSelector; + +import java.nio.ByteBuffer; + +public final class EmptyBloomFilterBufferAggregator extends BaseBloomFilterBufferAggregator +{ + EmptyBloomFilterBufferAggregator(int maxNumEntries) + { + super(NilColumnValueSelector.instance(), maxNumEntries); + } + + @Override + public void bufferAdd(ByteBuffer buf) + { + // nothing to do + } + + @Override + public void aggregate(ByteBuffer buf, int position) + { + // nothing to do + } +} diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/FloatBloomFilterAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/FloatBloomFilterAggregator.java new file mode 100644 index 000000000000..ae53d165b96e --- /dev/null +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/FloatBloomFilterAggregator.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.bloom; + +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.query.filter.BloomKFilter; +import org.apache.druid.segment.BaseFloatColumnValueSelector; + +public final class FloatBloomFilterAggregator extends BaseBloomFilterAggregator +{ + FloatBloomFilterAggregator(BaseFloatColumnValueSelector selector, BloomKFilter collector) + { + super(selector, collector); + } + + @Override + public void aggregate() + { + if (NullHandling.replaceWithDefault() || !selector.isNull()) { + collector.addFloat(selector.getFloat()); + } else { + collector.addBytes(null, 0, 0); + } + } +} diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/FloatBloomFilterBufferAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/FloatBloomFilterBufferAggregator.java new file mode 100644 index 000000000000..27e88d48d7a9 --- /dev/null +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/FloatBloomFilterBufferAggregator.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.bloom; + +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.query.filter.BloomKFilter; +import org.apache.druid.segment.BaseFloatColumnValueSelector; + +import java.nio.ByteBuffer; + +public final class FloatBloomFilterBufferAggregator extends BaseBloomFilterBufferAggregator +{ + FloatBloomFilterBufferAggregator(BaseFloatColumnValueSelector selector, int maxNumEntries) + { + super(selector, maxNumEntries); + } + + @Override + public void bufferAdd(ByteBuffer buf) + { + if (NullHandling.replaceWithDefault() || !selector.isNull()) { + BloomKFilter.addFloat(buf, selector.getFloat()); + } else { + BloomKFilter.addBytes(buf, null, 0, 0); + } + } +} diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/LongBloomFilterAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/LongBloomFilterAggregator.java new file mode 100644 index 000000000000..caa47397df11 --- /dev/null +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/LongBloomFilterAggregator.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.bloom; + +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.query.filter.BloomKFilter; +import org.apache.druid.segment.BaseLongColumnValueSelector; + +public final class LongBloomFilterAggregator extends BaseBloomFilterAggregator +{ + LongBloomFilterAggregator(BaseLongColumnValueSelector selector, BloomKFilter collector) + { + super(selector, collector); + } + + @Override + public void aggregate() + { + if (NullHandling.replaceWithDefault() || !selector.isNull()) { + collector.addLong(selector.getLong()); + } else { + collector.addBytes(null, 0, 0); + } + } +} diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/LongBloomFilterBufferAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/LongBloomFilterBufferAggregator.java new file mode 100644 index 000000000000..13a6634cda10 --- /dev/null +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/LongBloomFilterBufferAggregator.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.bloom; + +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.query.filter.BloomKFilter; +import org.apache.druid.segment.BaseLongColumnValueSelector; + +import java.nio.ByteBuffer; + +public final class LongBloomFilterBufferAggregator extends BaseBloomFilterBufferAggregator +{ + LongBloomFilterBufferAggregator(BaseLongColumnValueSelector selector, int maxNumEntries) + { + super(selector, maxNumEntries); + } + + @Override + public void bufferAdd(ByteBuffer buf) + { + if (NullHandling.replaceWithDefault() || !selector.isNull()) { + BloomKFilter.addLong(buf, selector.getLong()); + } else { + BloomKFilter.addBytes(buf, null, 0, 0); + } + } +} diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/StringBloomFilterAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/StringBloomFilterAggregator.java new file mode 100644 index 000000000000..351ef8413a92 --- /dev/null +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/StringBloomFilterAggregator.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.bloom; + +import org.apache.druid.query.filter.BloomKFilter; +import org.apache.druid.segment.DimensionSelector; + +public final class StringBloomFilterAggregator extends BaseBloomFilterAggregator +{ + StringBloomFilterAggregator(DimensionSelector selector, BloomKFilter collector) + { + super(selector, collector); + } + + @Override + public void aggregate() + { + // note: there might be room for optimization here but behavior must match BloomDimFilter implementation + if (selector.getRow().size() > 1) { + selector.getRow().forEach(v -> { + String value = selector.lookupName(v); + if (value == null) { + collector.addBytes(null, 0, 0); + } else { + collector.addString(value); + } + }); + } else { + String value = (String) selector.getObject(); + if (value == null) { + collector.addBytes(null, 0, 0); + } else { + collector.addString(value); + } + } + } +} diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/StringBloomFilterBufferAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/StringBloomFilterBufferAggregator.java new file mode 100644 index 000000000000..c7c17c940e0d --- /dev/null +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/StringBloomFilterBufferAggregator.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.bloom; + +import org.apache.druid.query.filter.BloomKFilter; +import org.apache.druid.segment.DimensionSelector; + +import java.nio.ByteBuffer; + +public final class StringBloomFilterBufferAggregator extends BaseBloomFilterBufferAggregator +{ + + StringBloomFilterBufferAggregator(DimensionSelector selector, int maxNumEntries) + { + super(selector, maxNumEntries); + } + + @Override + public void bufferAdd(ByteBuffer buf) + { + if (selector.getRow().size() > 1) { + selector.getRow().forEach(v -> { + String value = selector.lookupName(v); + if (value == null) { + BloomKFilter.addBytes(buf, null, 0, 0); + } else { + BloomKFilter.addString(buf, value); + } + }); + } else { + String value = (String) selector.getObject(); + if (value == null) { + BloomKFilter.addBytes(buf, null, 0, 0); + } else { + BloomKFilter.addString(buf, value); + } + } + } +} diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/filter/BloomKFilter.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/filter/BloomKFilter.java index 29492872966b..12533a20c7cd 100644 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/filter/BloomKFilter.java +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/filter/BloomKFilter.java @@ -27,6 +27,8 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.util.Arrays; /** @@ -38,7 +40,13 @@ * https://github.com/apache/hive/commit/87ce36b458350db141c4cb4b6336a9a01796370f#diff-e65fc506757ee058dc951d15a9a526c3L238 * and this linked issue https://issues.apache.org/jira/browse/HIVE-20101. * - * Todo: remove this and begin using hive-storage-api version again once https://issues.apache.org/jira/browse/HIVE-20893 is released + * Addtionally, a handful of methods have been added to in situ work with BloomKFilters that have been serialized to a + * ByteBuffer, e.g. all add and merge methods. Test methods were not added because we don't need them.. but would + * probably be chill to do so it is symmetrical. + * + * Todo: remove this and begin using hive-storage-api version again once + * https://issues.apache.org/jira/browse/HIVE-20893 is released and if/when static ByteBuffer methods have been merged + * (or alternatively, move them to some sort of utils class) * * begin copy-pasta: * @@ -62,7 +70,7 @@ public class BloomKFilter private static final int DEFAULT_BLOCK_SIZE_BITS = (int) (Math.log(DEFAULT_BLOCK_SIZE) / Math.log(2)); private static final int DEFAULT_BLOCK_OFFSET_MASK = DEFAULT_BLOCK_SIZE - 1; private static final int DEFAULT_BIT_OFFSET_MASK = Long.SIZE - 1; - private final ThreadLocal BYTE_ARRAY_4 = ThreadLocal.withInitial(() -> new byte[4]); + private static final ThreadLocal BYTE_ARRAY_4 = ThreadLocal.withInitial(() -> new byte[4]); private final BitSet bitSet; private final int m; private final int k; @@ -172,6 +180,8 @@ public static BloomKFilter deserialize(InputStream in) throws IOException } } + // custom Druid ByteBuffer methods start here + /** * Merges BloomKFilter bf2 into bf1. * Assumes 2 BloomKFilters with the same size/hash functions are serialized to byte arrays @@ -210,6 +220,259 @@ public static void mergeBloomFilterBytes( } } + public static void serialize(ByteBuffer out, BloomKFilter bloomFilter) + { + serialize(out, out.position(), bloomFilter); + } + + /** + * Serialize a bloom filter to a ByteBuffer. Does not mutate buffer position. + * + * @param out output buffer to write to + * @param position output buffer position + * @param bloomFilter BloomKFilter that needs to be seralized + */ + public static void serialize(ByteBuffer out, int position, BloomKFilter bloomFilter) + { + /** + * Serialized BloomKFilter format: + * 1 byte for the number of hash functions. + * 1 big endian int(to match OutputStream) for the number of longs in the bitset + * big endian longs in the BloomKFilter bitset + */ + ByteBuffer view = out.duplicate().order(ByteOrder.BIG_ENDIAN); + view.position(position); + view.put((byte) bloomFilter.k); + view.putInt(bloomFilter.getBitSet().length); + for (long value : bloomFilter.getBitSet()) { + view.putLong(value); + } + } + + public static BloomKFilter deserialize(ByteBuffer in) throws IOException + { + return deserialize(in, in.position()); + } + + /** + * Deserialize a bloom filter + * Read a byte buffer, which was written by {@linkplain #serialize(OutputStream, BloomKFilter)} or + * {@linkplain #serialize(ByteBuffer, int, BloomKFilter)} + * into a {@code BloomKFilter}. Does not mutate buffer position. + * + * @param in input ByteBuffer + * + * @return deserialized BloomKFilter + */ + public static BloomKFilter deserialize(ByteBuffer in, int position) throws IOException + { + if (in == null) { + throw new IOException("Input stream is null"); + } + + try { + ByteBuffer dataBuffer = in.duplicate().order(ByteOrder.BIG_ENDIAN); + dataBuffer.position(position); + int numHashFunc = dataBuffer.get(); + int bitsetArrayLen = dataBuffer.getInt(); + long[] data = new long[bitsetArrayLen]; + for (int i = 0; i < bitsetArrayLen; i++) { + data[i] = dataBuffer.getLong(); + } + return new BloomKFilter(data, numHashFunc); + } + catch (RuntimeException e) { + throw new IOException("Unable to deserialize BloomKFilter", e); + } + } + + /** + * Merges BloomKFilter bf2Buffer into bf1Buffer in place. Does not mutate buffer positions. + * Assumes 2 BloomKFilters with the same size/hash functions are serialized to ByteBuffers + * + * @param bf1Buffer + * @param bf1Start + * @param bf2Buffer + * @param bf2Start + */ + public static void mergeBloomFilterByteBuffers( + ByteBuffer bf1Buffer, + int bf1Start, + ByteBuffer bf2Buffer, + int bf2Start + ) + { + ByteBuffer view1 = bf1Buffer.duplicate().order(ByteOrder.BIG_ENDIAN); + ByteBuffer view2 = bf2Buffer.duplicate().order(ByteOrder.BIG_ENDIAN); + final int bf1Length = START_OF_SERIALIZED_LONGS + (view1.getInt(1 + bf1Start) * Long.BYTES); + final int bf2Length = START_OF_SERIALIZED_LONGS + (view2.getInt(1 + bf2Start) * Long.BYTES); + + if (bf1Length != bf2Length) { + throw new IllegalArgumentException("bf1Length " + bf1Length + " does not match bf2Length " + bf2Length); + } + + // Validation on the bitset size/3 hash functions. + for (int idx = 0; idx < START_OF_SERIALIZED_LONGS; ++idx) { + if (view1.get(bf1Start + idx) != view2.get(bf2Start + idx)) { + throw new IllegalArgumentException("bf1 NumHashFunctions/NumBits does not match bf2"); + } + } + + // Just bitwise-OR the bits together - size/# functions should be the same, + // rest of the data is serialized long values for the bitset which are supposed to be bitwise-ORed. + for (int idx = START_OF_SERIALIZED_LONGS; idx < bf1Length; ++idx) { + final int pos1 = bf1Start + idx; + final int pos2 = bf2Start + idx; + view1.put(pos1, (byte) (view1.get(pos1) | view2.get(pos2))); + } + } + + /** + * ByteBuffer based copy of logic of {@link BloomKFilter#getNumSetBits()} + * @param bfBuffer + * @param start + * @return + */ + public static int getNumSetBits(ByteBuffer bfBuffer, int start) + { + ByteBuffer view = bfBuffer.duplicate().order(ByteOrder.BIG_ENDIAN); + view.position(start); + int numLongs = view.getInt(1 + start); + int setBits = 0; + for (int i = 0, pos = START_OF_SERIALIZED_LONGS + start; i < numLongs; i++, pos += Long.BYTES) { + setBits += Long.bitCount(view.getLong(pos)); + } + return setBits; + } + + /** + * Calculate size in bytes of a BloomKFilter for a given number of entries + */ + public static int computeSizeBytes(long maxNumEntries) + { + // copied from constructor + checkArgument(maxNumEntries > 0, "expectedEntries should be > 0"); + long numBits = optimalNumOfBits(maxNumEntries, DEFAULT_FPP); + + int nLongs = (int) Math.ceil((double) numBits / (double) Long.SIZE); + int padLongs = DEFAULT_BLOCK_SIZE - nLongs % DEFAULT_BLOCK_SIZE; + return START_OF_SERIALIZED_LONGS + ((nLongs + padLongs) * Long.BYTES); + } + + /** + * ByteBuffer based copy of {@link BloomKFilter#add(byte[])} that adds a value to the ByteBuffer in place. + */ + public static void add(ByteBuffer buffer, byte[] val) + { + addBytes(buffer, val); + } + + /** + * ByteBuffer based copy of {@link BloomKFilter#addBytes(byte[], int, int)} that adds a value to the ByteBuffer + * in place. + */ + public static void addBytes(ByteBuffer buffer, byte[] val, int offset, int length) + { + long hash64 = val == null ? Murmur3.NULL_HASHCODE : + Murmur3.hash64(val, offset, length); + addHash(buffer, hash64); + } + + /** + * ByteBuffer based copy of {@link BloomKFilter#addBytes(byte[])} that adds a value to the ByteBuffer in place. + */ + public static void addBytes(ByteBuffer buffer, byte[] val) + { + addBytes(buffer, val, 0, val.length); + } + + /** + * ByteBuffer based copy of {@link BloomKFilter#addHash(long)} that adds a value to the ByteBuffer in place. + */ + public static void addHash(ByteBuffer buffer, long hash64) + { + final int hash1 = (int) hash64; + final int hash2 = (int) (hash64 >>> 32); + + int firstHash = hash1 + hash2; + // hashcode should be positive, flip all the bits if it's negative + if (firstHash < 0) { + firstHash = ~firstHash; + } + + ByteBuffer view = buffer.duplicate().order(ByteOrder.BIG_ENDIAN); + int startPosition = view.position(); + int numHashFuncs = view.get(startPosition); + int totalBlockCount = view.getInt(startPosition + 1) / DEFAULT_BLOCK_SIZE; + // first hash is used to locate start of the block (blockBaseOffset) + // subsequent K hashes are used to generate K bits within a block of words + final int blockIdx = firstHash % totalBlockCount; + final int blockBaseOffset = blockIdx << DEFAULT_BLOCK_SIZE_BITS; + for (int i = 1; i <= numHashFuncs; i++) { + int combinedHash = hash1 + ((i + 1) * hash2); + // hashcode should be positive, flip all the bits if it's negative + if (combinedHash < 0) { + combinedHash = ~combinedHash; + } + // LSB 3 bits is used to locate offset within the block + final int absOffset = blockBaseOffset + (combinedHash & DEFAULT_BLOCK_OFFSET_MASK); + // Next 6 bits are used to locate offset within a long/word + final int bitPos = (combinedHash >>> DEFAULT_BLOCK_SIZE_BITS) & DEFAULT_BIT_OFFSET_MASK; + + final int bufPos = startPosition + START_OF_SERIALIZED_LONGS + (absOffset * Long.BYTES); + view.putLong(bufPos, view.getLong(bufPos) | (1L << bitPos)); + } + } + + /** + * ByteBuffer based copy of {@link BloomKFilter#addString(String)} that adds a value to the ByteBuffer in place. + */ + public static void addString(ByteBuffer buffer, String val) + { + addBytes(buffer, StringUtils.toUtf8(val)); + } + + /** + * ByteBuffer based copy of {@link BloomKFilter#addByte(byte)} that adds a value to the ByteBuffer in place. + */ + public static void addByte(ByteBuffer buffer, byte val) + { + addBytes(buffer, new byte[]{val}); + } + + /** + * ByteBuffer based copy of {@link BloomKFilter#addInt(int)} that adds a value to the ByteBuffer in place. + */ + public static void addInt(ByteBuffer buffer, int val) + { + addBytes(buffer, intToByteArrayLE(val)); + } + + /** + * ByteBuffer based copy of {@link BloomKFilter#addLong(long)} that adds a value to the ByteBuffer in place. + */ + public static void addLong(ByteBuffer buffer, long val) + { + addHash(buffer, Murmur3.hash64(val)); + } + + /** + * ByteBuffer based copy of {@link BloomKFilter#addFloat(float)} that adds a value to the ByteBuffer in place. + */ + public static void addFloat(ByteBuffer buffer, float val) + { + addInt(buffer, Float.floatToIntBits(val)); + } + + /** + * ByteBuffer based copy of {@link BloomKFilter#addDouble(double)} + */ + public static void addDouble(ByteBuffer buffer, double val) + { + addLong(buffer, Double.doubleToLongBits(val)); + } + // custom Druid ByteBuffer methods end here + public void add(byte[] val) { addBytes(val); @@ -381,7 +644,7 @@ public boolean testDouble(double val) return testLong(Double.doubleToLongBits(val)); } - private byte[] intToByteArrayLE(int val) + private static byte[] intToByteArrayLE(int val) { byte[] bytes = BYTE_ARRAY_4.get(); bytes[0] = (byte) (val >> 0); @@ -401,6 +664,15 @@ public int getBitSize() return bitSet.getData().length * Long.SIZE; } + public int getNumSetBits() + { + int setCount = 0; + for (long datum : bitSet.getData()) { + setCount += Long.bitCount(datum); + } + return setCount; + } + public int getNumHashFunctions() { return k; diff --git a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorTest.java b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorTest.java new file mode 100644 index 000000000000..790cf8c5c01d --- /dev/null +++ b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorTest.java @@ -0,0 +1,656 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.bloom; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Function; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.guice.BloomFilterExtensionModule; +import org.apache.druid.guice.BloomFilterSerializersModule; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.query.aggregation.Aggregator; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.BufferAggregator; +import org.apache.druid.query.aggregation.cardinality.CardinalityAggregatorTest; +import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.query.dimension.ExtractionDimensionSpec; +import org.apache.druid.query.dimension.RegexFilteredDimensionSpec; +import org.apache.druid.query.extraction.RegexDimExtractionFn; +import org.apache.druid.query.filter.BloomKFilter; +import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.DimensionSelector; +import org.apache.druid.segment.DoubleColumnSelector; +import org.apache.druid.segment.FloatColumnSelector; +import org.apache.druid.segment.LongColumnSelector; +import org.junit.Assert; +import org.junit.Test; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.IntStream; + +public class BloomFilterAggregatorTest +{ + private static final String nullish = NullHandling.replaceWithDefault() ? "" : null; + private static final List values1 = dimensionValues( + "a", + "b", + "c", + "a", + "a", + nullish, + "b", + "b", + "b", + "b", + "a", + "a" + ); + private static final List values2 = dimensionValues( + "a", + "b", + "c", + "x", + "a", + "e", + "b", + new String[]{nullish, "x"}, + new String[]{"x", nullish}, + new String[]{"y", "x"}, + new String[]{"x", "y"}, + new String[]{"x", "y", "a"} + ); + private static final Double[] doubleValues1 = new Double[]{0.1, 1.5, 18.3, 0.1}; + private static final Float[] floatValues1 = new Float[]{0.4f, 0.8f, 23.2f}; + private static final Long[] longValues1 = new Long[]{10241L, 12312355L, 0L, 81L}; + + private static final int maxNumValues = 15; + + private static BloomKFilter filter1; + private static BloomKFilter filter2; + + private static String serializedFilter1; + private static String serializedFilter2; + private static String serializedCombinedFilter; + private static String serializedLongFilter; + private static String serializedDoubleFilter; + private static String serializedFloatFilter; + + static { + try { + filter1 = new BloomKFilter(maxNumValues); + filter2 = new BloomKFilter(maxNumValues); + BloomKFilter combinedValuesFilter = new BloomKFilter(maxNumValues); + + createStringFilter(values1, filter1, combinedValuesFilter); + createStringFilter(values2, filter2, combinedValuesFilter); + + serializedFilter1 = filterToString(filter1); + serializedFilter2 = filterToString(filter2); + serializedCombinedFilter = filterToString(combinedValuesFilter); + + BloomKFilter longFilter = new BloomKFilter(maxNumValues); + for (long val : longValues1) { + longFilter.addLong(val); + } + serializedLongFilter = filterToString(longFilter); + + BloomKFilter floatFilter = new BloomKFilter(maxNumValues); + for (float val : floatValues1) { + floatFilter.addFloat(val); + } + serializedFloatFilter = filterToString(floatFilter); + + BloomKFilter doubleFilter = new BloomKFilter(maxNumValues); + for (double val : doubleValues1) { + doubleFilter.addDouble(val); + } + serializedDoubleFilter = filterToString(doubleFilter); + + } + catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + private static void createStringFilter(List values, BloomKFilter filter, BloomKFilter combinedValuesFilter) + { + for (String[] vals : values) { + for (String val : vals) { + if (!NullHandling.replaceWithDefault() && val == null) { + filter.addBytes(null, 0, 0); + combinedValuesFilter.addBytes(null, 0, 0); + } else { + filter.addString(NullHandling.nullToEmptyIfNeeded(val)); + combinedValuesFilter.addString(NullHandling.nullToEmptyIfNeeded(val)); + } + } + } + } + + private static List dimensionValues(Object... values) + { + return Lists.transform( + Lists.newArrayList(values), new Function() + { + @Nullable + @Override + public String[] apply(@Nullable Object input) + { + if (input instanceof String[]) { + return (String[]) input; + } else { + return new String[]{(String) input}; + } + } + } + ); + } + + private static void aggregateDimension(List selectorList, Aggregator agg) + { + agg.aggregate(); + + for (DimensionSelector selector : selectorList) { + ((CardinalityAggregatorTest.TestDimensionSelector) selector).increment(); + } + } + + private static void bufferAggregateDimension( + List selectorList, + BufferAggregator agg, + ByteBuffer buf, + int pos + ) + { + agg.aggregate(buf, pos); + + for (DimensionSelector selector : selectorList) { + ((CardinalityAggregatorTest.TestDimensionSelector) selector).increment(); + } + } + + private static void aggregateColumn(List selectorList, Aggregator agg) + { + agg.aggregate(); + + for (SteppableSelector selector : selectorList) { + selector.increment(); + } + } + + private static void bufferAggregateColumn( + List selectorList, + BufferAggregator agg, + ByteBuffer buf, + int pos + ) + { + agg.aggregate(buf, pos); + + for (SteppableSelector selector : selectorList) { + selector.increment(); + } + } + + static String filterToString(BloomKFilter bloomKFilter) throws IOException + { + return StringUtils.encodeBase64String(BloomFilterSerializersModule.bloomKFilterToBytes(bloomKFilter)); + } + + private final DimensionSpec dimSpec = new DefaultDimensionSpec("dim1", "dim1"); + private BloomFilterAggregatorFactory valueAggregatorFactory; + + public BloomFilterAggregatorTest() + { + valueAggregatorFactory = new BloomFilterAggregatorFactory( + "billy", + dimSpec, + maxNumValues + ); + } + + + @Test + public void testAggregateValues() throws IOException + { + DimensionSelector dimSelector = new CardinalityAggregatorTest.TestDimensionSelector(values1, null); + StringBloomFilterAggregator agg = new StringBloomFilterAggregator(dimSelector, new BloomKFilter(maxNumValues)); + + for (int i = 0; i < values1.size(); ++i) { + aggregateDimension(Collections.singletonList(dimSelector), agg); + } + + BloomKFilter bloomKFilter = (BloomKFilter) valueAggregatorFactory.finalizeComputation(agg.get()); + String serialized = filterToString(bloomKFilter); + Assert.assertEquals(serializedFilter1, serialized); + } + + @Test + public void testAggregateLongValues() throws IOException + { + TestLongColumnSelector selector = new TestLongColumnSelector(Arrays.asList(longValues1)); + LongBloomFilterAggregator agg = new LongBloomFilterAggregator(selector, new BloomKFilter(maxNumValues)); + + for (Long ignored : longValues1) { + aggregateColumn(Collections.singletonList(selector), agg); + } + + BloomKFilter bloomKFilter = (BloomKFilter) valueAggregatorFactory.finalizeComputation(agg.get()); + String serialized = filterToString(bloomKFilter); + Assert.assertEquals(serializedLongFilter, serialized); + } + + @Test + public void testAggregateFloatValues() throws IOException + { + TestFloatColumnSelector selector = new TestFloatColumnSelector(Arrays.asList(floatValues1)); + FloatBloomFilterAggregator agg = new FloatBloomFilterAggregator(selector, new BloomKFilter(maxNumValues)); + + for (Float ignored : floatValues1) { + aggregateColumn(Collections.singletonList(selector), agg); + } + + BloomKFilter bloomKFilter = (BloomKFilter) valueAggregatorFactory.finalizeComputation(agg.get()); + String serialized = filterToString(bloomKFilter); + Assert.assertEquals(serializedFloatFilter, serialized); + } + + @Test + public void testAggregateDoubleValues() throws IOException + { + TestDoubleColumnSelector selector = new TestDoubleColumnSelector(Arrays.asList(doubleValues1)); + DoubleBloomFilterAggregator agg = new DoubleBloomFilterAggregator(selector, new BloomKFilter(maxNumValues)); + + for (Double ignored : doubleValues1) { + aggregateColumn(Collections.singletonList(selector), agg); + } + + BloomKFilter bloomKFilter = (BloomKFilter) valueAggregatorFactory.finalizeComputation(agg.get()); + String serialized = filterToString(bloomKFilter); + Assert.assertEquals(serializedDoubleFilter, serialized); + } + + @Test + public void testBufferAggregateStringValues() throws IOException + { + DimensionSelector dimSelector = new CardinalityAggregatorTest.TestDimensionSelector(values2, null); + StringBloomFilterBufferAggregator agg = new StringBloomFilterBufferAggregator(dimSelector, maxNumValues); + + int maxSize = valueAggregatorFactory.getMaxIntermediateSizeWithNulls(); + ByteBuffer buf = ByteBuffer.allocate(maxSize + 64); + int pos = 10; + buf.limit(pos + maxSize); + + agg.init(buf, pos); + + for (int i = 0; i < values2.size(); ++i) { + bufferAggregateDimension(Collections.singletonList(dimSelector), agg, buf, pos); + } + BloomKFilter bloomKFilter = (BloomKFilter) valueAggregatorFactory.finalizeComputation(agg.get(buf, pos)); + String serialized = filterToString(bloomKFilter); + Assert.assertEquals(serializedFilter2, serialized); + } + + @Test + public void testBufferAggregateLongValues() throws IOException + { + TestLongColumnSelector selector = new TestLongColumnSelector(Arrays.asList(longValues1)); + LongBloomFilterBufferAggregator agg = new LongBloomFilterBufferAggregator(selector, maxNumValues); + + int maxSize = valueAggregatorFactory.getMaxIntermediateSizeWithNulls(); + ByteBuffer buf = ByteBuffer.allocate(maxSize + 64); + int pos = 10; + buf.limit(pos + maxSize); + + agg.init(buf, pos); + + IntStream.range(0, longValues1.length) + .forEach(i -> bufferAggregateColumn(Collections.singletonList(selector), agg, buf, pos)); + BloomKFilter bloomKFilter = (BloomKFilter) valueAggregatorFactory.finalizeComputation(agg.get(buf, pos)); + String serialized = filterToString(bloomKFilter); + Assert.assertEquals(serializedLongFilter, serialized); + } + + @Test + public void testBufferAggregateFloatValues() throws IOException + { + TestFloatColumnSelector selector = new TestFloatColumnSelector(Arrays.asList(floatValues1)); + FloatBloomFilterBufferAggregator agg = new FloatBloomFilterBufferAggregator(selector, maxNumValues); + + int maxSize = valueAggregatorFactory.getMaxIntermediateSizeWithNulls(); + ByteBuffer buf = ByteBuffer.allocate(maxSize + 64); + int pos = 10; + buf.limit(pos + maxSize); + + agg.init(buf, pos); + + IntStream.range(0, floatValues1.length) + .forEach(i -> bufferAggregateColumn(Collections.singletonList(selector), agg, buf, pos)); + BloomKFilter bloomKFilter = (BloomKFilter) valueAggregatorFactory.finalizeComputation(agg.get(buf, pos)); + String serialized = filterToString(bloomKFilter); + Assert.assertEquals(serializedFloatFilter, serialized); + } + + @Test + public void testBufferAggregateDoubleValues() throws IOException + { + TestDoubleColumnSelector selector = new TestDoubleColumnSelector(Arrays.asList(doubleValues1)); + DoubleBloomFilterBufferAggregator agg = new DoubleBloomFilterBufferAggregator(selector, maxNumValues); + + int maxSize = valueAggregatorFactory.getMaxIntermediateSizeWithNulls(); + ByteBuffer buf = ByteBuffer.allocate(maxSize + 64); + int pos = 10; + buf.limit(pos + maxSize); + + agg.init(buf, pos); + + IntStream.range(0, doubleValues1.length) + .forEach(i -> bufferAggregateColumn(Collections.singletonList(selector), agg, buf, pos)); + BloomKFilter bloomKFilter = (BloomKFilter) valueAggregatorFactory.finalizeComputation(agg.get(buf, pos)); + String serialized = filterToString(bloomKFilter); + Assert.assertEquals(serializedDoubleFilter, serialized); + } + + @Test + public void testCombineValues() throws IOException + { + DimensionSelector dimSelector1 = new CardinalityAggregatorTest.TestDimensionSelector(values1, null); + DimensionSelector dimSelector2 = new CardinalityAggregatorTest.TestDimensionSelector(values2, null); + + StringBloomFilterAggregator agg1 = new StringBloomFilterAggregator(dimSelector1, new BloomKFilter(maxNumValues)); + StringBloomFilterAggregator agg2 = new StringBloomFilterAggregator(dimSelector2, new BloomKFilter(maxNumValues)); + + for (int i = 0; i < values1.size(); ++i) { + aggregateDimension(Collections.singletonList(dimSelector1), agg1); + } + for (int i = 0; i < values2.size(); ++i) { + aggregateDimension(Collections.singletonList(dimSelector2), agg2); + } + + BloomKFilter combined = (BloomKFilter) valueAggregatorFactory.finalizeComputation( + valueAggregatorFactory.combine( + agg1.get(), + agg2.get() + ) + ); + + String serialized = filterToString(combined); + Assert.assertEquals(serializedCombinedFilter, serialized); + } + + @Test + public void testMergeValues() throws IOException + { + final TestBloomFilterColumnSelector mergeDim = + new TestBloomFilterColumnSelector(ImmutableList.of(filter1, filter2)); + + BloomFilterMergeAggregator mergeAggregator = + new BloomFilterMergeAggregator(mergeDim, new BloomKFilter(maxNumValues)); + + for (int i = 0; i < 2; ++i) { + aggregateColumn(Collections.singletonList(mergeDim), mergeAggregator); + } + + + BloomKFilter merged = (BloomKFilter) valueAggregatorFactory.getCombiningFactory() + .finalizeComputation(mergeAggregator.get()); + String serialized = filterToString(merged); + Assert.assertEquals(serializedCombinedFilter, serialized); + } + + @Test + public void testMergeValuesWithBuffersForGroupByV1() throws IOException + { + final TestBloomFilterColumnSelector mergeDim = + new TestBloomFilterColumnSelector( + ImmutableList.of( + ByteBuffer.wrap(BloomFilterSerializersModule.bloomKFilterToBytes(filter1)), + ByteBuffer.wrap(BloomFilterSerializersModule.bloomKFilterToBytes(filter2)) + ) + ); + + BloomFilterMergeAggregator mergeAggregator = + new BloomFilterMergeAggregator(mergeDim, new BloomKFilter(maxNumValues)); + + for (int i = 0; i < 2; ++i) { + aggregateColumn(Collections.singletonList(mergeDim), mergeAggregator); + } + + + BloomKFilter merged = (BloomKFilter) valueAggregatorFactory.getCombiningFactory() + .finalizeComputation(mergeAggregator.get()); + String serialized = filterToString(merged); + Assert.assertEquals(serializedCombinedFilter, serialized); + } + + @Test + public void testBuferMergeValues() throws IOException + { + final TestBloomFilterBufferColumnSelector mergeDim = + new TestBloomFilterBufferColumnSelector( + ImmutableList.of( + ByteBuffer.wrap(BloomFilterSerializersModule.bloomKFilterToBytes(filter1)), + ByteBuffer.wrap(BloomFilterSerializersModule.bloomKFilterToBytes(filter2)) + ) + ); + + BloomFilterMergeBufferAggregator mergeAggregator = new BloomFilterMergeBufferAggregator(mergeDim, maxNumValues); + + int maxSize = valueAggregatorFactory.getCombiningFactory().getMaxIntermediateSizeWithNulls(); + ByteBuffer buf = ByteBuffer.allocate(maxSize + 64); + int pos = 10; + buf.limit(pos + maxSize); + + mergeAggregator.init(buf, pos); + + for (int i = 0; i < 2; ++i) { + bufferAggregateColumn(Collections.singletonList(mergeDim), mergeAggregator, buf, pos); + } + + BloomKFilter merged = (BloomKFilter) valueAggregatorFactory.getCombiningFactory() + .finalizeComputation(mergeAggregator.get(buf, pos)); + String serialized = filterToString(merged); + + Assert.assertEquals(serializedCombinedFilter, serialized); + } + + @Test + public void testSerde() throws Exception + { + BloomFilterAggregatorFactory factory = new BloomFilterAggregatorFactory( + "billy", + new DefaultDimensionSpec("b", "b"), + maxNumValues + ); + ObjectMapper objectMapper = new DefaultObjectMapper(); + new BloomFilterExtensionModule().getJacksonModules().forEach(objectMapper::registerModule); + Assert.assertEquals( + factory, + objectMapper.readValue(objectMapper.writeValueAsString(factory), AggregatorFactory.class) + ); + + String fieldNamesOnly = "{" + + "\"type\":\"bloom\"," + + "\"name\":\"billy\"," + + "\"field\":\"b\"," + + "\"maxNumEntries\":15" + + "}"; + Assert.assertEquals( + factory, + objectMapper.readValue(fieldNamesOnly, AggregatorFactory.class) + ); + + BloomFilterAggregatorFactory factory2 = new BloomFilterAggregatorFactory( + "billy", + new ExtractionDimensionSpec("b", "b", new RegexDimExtractionFn(".*", false, null)), + maxNumValues + ); + + Assert.assertEquals( + factory2, + objectMapper.readValue(objectMapper.writeValueAsString(factory2), AggregatorFactory.class) + ); + + BloomFilterAggregatorFactory factory3 = new BloomFilterAggregatorFactory( + "billy", + new RegexFilteredDimensionSpec(new DefaultDimensionSpec("a", "a"), ".*"), + maxNumValues + ); + Assert.assertEquals( + factory3, + objectMapper.readValue(objectMapper.writeValueAsString(factory3), AggregatorFactory.class) + ); + } + + private abstract static class SteppableSelector implements ColumnValueSelector + { + List values; + int pos; + + public SteppableSelector(List values) + { + this.values = values; + this.pos = 0; + } + + @Nullable + @Override + public T getObject() + { + return values.get(pos); + } + + public void increment() + { + pos++; + } + + public void reset() + { + pos = 0; + } + + + @Override + public double getDouble() + { + throw new UnsupportedOperationException(); + } + + @Override + public float getFloat() + { + throw new UnsupportedOperationException(); + } + + @Override + public long getLong() + { + throw new UnsupportedOperationException(); + } + + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + + } + + @Override + public Class classOfObject() + { + return null; + } + + @Override + public boolean isNull() + { + return false; + } + } + + public static class TestBloomFilterColumnSelector extends SteppableSelector + { + public TestBloomFilterColumnSelector(List values) + { + super(values); + } + } + + public static class TestBloomFilterBufferColumnSelector extends SteppableSelector + { + public TestBloomFilterBufferColumnSelector(List values) + { + super(values); + } + } + + public static class TestLongColumnSelector extends SteppableSelector implements LongColumnSelector + { + public TestLongColumnSelector(List values) + { + super(values); + } + + @Override + public long getLong() + { + return values.get(pos); + } + } + + public static class TestFloatColumnSelector extends SteppableSelector implements FloatColumnSelector + { + public TestFloatColumnSelector(List values) + { + super(values); + } + + @Override + public float getFloat() + { + return values.get(pos); + } + } + + public static class TestDoubleColumnSelector extends SteppableSelector implements DoubleColumnSelector + { + public TestDoubleColumnSelector(List values) + { + super(values); + } + + @Override + public double getDouble() + { + return values.get(pos); + } + } +} diff --git a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/BloomFilterGroupByQueryTest.java b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/BloomFilterGroupByQueryTest.java new file mode 100644 index 000000000000..a2207f2e6423 --- /dev/null +++ b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/BloomFilterGroupByQueryTest.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.bloom; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; +import com.google.inject.Guice; +import com.google.inject.Key; +import org.apache.druid.data.input.MapBasedRow; +import org.apache.druid.guice.BloomFilterExtensionModule; +import org.apache.druid.guice.annotations.Json; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.query.aggregation.AggregationTestHelper; +import org.apache.druid.query.filter.BloomKFilter; +import org.apache.druid.query.groupby.GroupByQueryConfig; +import org.apache.druid.query.groupby.GroupByQueryRunnerTest; +import org.apache.druid.segment.TestHelper; +import org.junit.After; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +@RunWith(Parameterized.class) +public class BloomFilterGroupByQueryTest +{ + private static final BloomFilterExtensionModule module = new BloomFilterExtensionModule(); + + static { + // throwaway, just using to properly initialize jackson modules + Guice.createInjector( + binder -> binder.bind(Key.get(ObjectMapper.class, Json.class)).toInstance(TestHelper.makeJsonMapper()), + module + ); + } + + private AggregationTestHelper helper; + + @Rule + public final TemporaryFolder tempFolder = new TemporaryFolder(); + + public BloomFilterGroupByQueryTest(final GroupByQueryConfig config) + { + helper = AggregationTestHelper.createGroupByQueryAggregationTestHelper( + Lists.newArrayList(module.getJacksonModules()), + config, + tempFolder + ); + } + + @Parameterized.Parameters(name = "{0}") + public static Collection constructorFeeder() + { + final List constructors = new ArrayList<>(); + for (GroupByQueryConfig config : GroupByQueryRunnerTest.testConfigs()) { + constructors.add(new Object[]{config}); + } + return constructors; + } + + @After + public void teardown() throws IOException + { + helper.close(); + } + + @Test + public void testQuery() throws Exception + { + + String query = "{" + + "\"queryType\": \"groupBy\"," + + "\"dataSource\": \"test_datasource\"," + + "\"granularity\": \"ALL\"," + + "\"dimensions\": []," + + "\"filter\":{ \"type\":\"selector\", \"dimension\":\"market\", \"value\":\"upfront\"}," + + "\"aggregations\": [" + + " { \"type\": \"bloom\", \"name\": \"blooming_quality\", \"field\": \"quality\" }" + + "]," + + "\"intervals\": [ \"1970/2050\" ]" + + "}"; + + MapBasedRow row = ingestAndQuery(query); + + + Assert.assertTrue(((BloomKFilter) row.getRaw("blooming_quality")).testString("mezzanine")); + Assert.assertTrue(((BloomKFilter) row.getRaw("blooming_quality")).testString("premium")); + Assert.assertFalse(((BloomKFilter) row.getRaw("blooming_quality")).testString("entertainment")); + } + + @Test + public void testQueryFakeDimension() throws Exception + { + String query = "{" + + "\"queryType\": \"groupBy\"," + + "\"dataSource\": \"test_datasource\"," + + "\"granularity\": \"ALL\"," + + "\"dimensions\": []," + + "\"filter\":{ \"type\":\"selector\", \"dimension\":\"market\", \"value\":\"upfront\"}," + + "\"aggregations\": [" + + " { \"type\": \"bloom\", \"name\": \"blooming_quality\", \"field\": \"nope\" }" + + "]," + + "\"intervals\": [ \"1970/2050\" ]" + + "}"; + + MapBasedRow row = ingestAndQuery(query); + + // a nil column results in a totally empty bloom filter + BloomKFilter filter = new BloomKFilter(1500); + + Object val = row.getRaw("blooming_quality"); + + String serialized = BloomFilterAggregatorTest.filterToString((BloomKFilter) val); + String empty = BloomFilterAggregatorTest.filterToString(filter); + + Assert.assertEquals(empty, serialized); + } + + private MapBasedRow ingestAndQuery(String query) throws Exception + { + String metricSpec = "[{ \"type\": \"count\", \"name\": \"count\"}]"; + + String parseSpec = "{" + + "\"type\" : \"string\"," + + "\"parseSpec\" : {" + + " \"format\" : \"tsv\"," + + " \"timestampSpec\" : {" + + " \"column\" : \"timestamp\"," + + " \"format\" : \"auto\"" + + "}," + + " \"dimensionsSpec\" : {" + + " \"dimensions\": []," + + " \"dimensionExclusions\" : []," + + " \"spatialDimensions\" : []" + + " }," + + " \"columns\": [\"timestamp\", \"market\", \"quality\", \"placement\", \"placementish\", \"index\"]" + + " }" + + "}"; + + Sequence seq = helper.createIndexAndRunQueryOnSegment( + this.getClass().getClassLoader().getResourceAsStream("sample.data.tsv"), + parseSpec, + metricSpec, + 0, + Granularities.NONE, + 50000, + query + ); + + return (MapBasedRow) seq.toList().get(0); + } +} diff --git a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/filter/BloomKFilterTest.java b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/filter/BloomKFilterTest.java new file mode 100644 index 000000000000..3385924e4e1a --- /dev/null +++ b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/filter/BloomKFilterTest.java @@ -0,0 +1,541 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.filter; + +import org.apache.druid.io.ByteBufferInputStream; +import org.junit.Assert; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; + +import static org.junit.Assert.assertEquals; + +public class BloomKFilterTest +{ + private static final int COUNT = 100; + Random rand = ThreadLocalRandom.current(); + + @Test + public void testBloomKFilterBytes() throws IOException + { + BloomKFilter bf = new BloomKFilter(10000); + ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); + BloomKFilter.serialize(bytesOut, bf); + byte[] bfBytes = bytesOut.toByteArray(); + ByteBuffer buffer = ByteBuffer.wrap(bfBytes); + + byte[] val = new byte[]{1, 2, 3}; + byte[] val1 = new byte[]{1, 2, 3, 4}; + byte[] val2 = new byte[]{1, 2, 3, 4, 5}; + byte[] val3 = new byte[]{1, 2, 3, 4, 5, 6}; + + + bf.add(val); + BloomKFilter.add(buffer, val); + BloomKFilter rehydrated = BloomKFilter.deserialize(new ByteBufferInputStream(buffer)); + buffer.position(0); + assertEquals(true, rehydrated.test(val)); + assertEquals(false, rehydrated.test(val1)); + assertEquals(false, rehydrated.test(val2)); + assertEquals(false, rehydrated.test(val3)); + BloomKFilter.add(buffer, val1); + rehydrated = BloomKFilter.deserialize(new ByteBufferInputStream(buffer)); + buffer.position(0); + assertEquals(true, rehydrated.test(val)); + assertEquals(true, rehydrated.test(val1)); + assertEquals(false, rehydrated.test(val2)); + assertEquals(false, rehydrated.test(val3)); + BloomKFilter.add(buffer, val2); + rehydrated = BloomKFilter.deserialize(new ByteBufferInputStream(buffer)); + buffer.position(0); + assertEquals(true, rehydrated.test(val)); + assertEquals(true, rehydrated.test(val1)); + assertEquals(true, rehydrated.test(val2)); + assertEquals(false, rehydrated.test(val3)); + BloomKFilter.add(buffer, val3); + rehydrated = BloomKFilter.deserialize(new ByteBufferInputStream(buffer)); + buffer.position(0); + assertEquals(true, rehydrated.test(val)); + assertEquals(true, rehydrated.test(val1)); + assertEquals(true, rehydrated.test(val2)); + assertEquals(true, rehydrated.test(val3)); + + byte[] randVal = new byte[COUNT]; + for (int i = 0; i < COUNT; i++) { + rand.nextBytes(randVal); + BloomKFilter.add(buffer, randVal); + } + // last value should be present + rehydrated = BloomKFilter.deserialize(new ByteBufferInputStream(buffer)); + assertEquals(true, rehydrated.test(randVal)); + // most likely this value should not exist + randVal[0] = 0; + randVal[1] = 0; + randVal[2] = 0; + randVal[3] = 0; + randVal[4] = 0; + assertEquals(false, rehydrated.test(randVal)); + + assertEquals(7808, rehydrated.sizeInBytes()); + } + + @Test + public void testBloomKFilterByte() throws IOException + { + BloomKFilter bf = new BloomKFilter(10000); + ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); + BloomKFilter.serialize(bytesOut, bf); + byte[] bfBytes = bytesOut.toByteArray(); + ByteBuffer buffer = ByteBuffer.wrap(bfBytes); + + byte val = Byte.MIN_VALUE; + byte val1 = 1; + byte val2 = 2; + byte val3 = Byte.MAX_VALUE; + + BloomKFilter.addLong(buffer, val); + BloomKFilter rehydrated = BloomKFilter.deserialize(new ByteBufferInputStream(buffer)); + buffer.position(0); + assertEquals(true, rehydrated.testLong(val)); + assertEquals(false, rehydrated.testLong(val1)); + assertEquals(false, rehydrated.testLong(val2)); + assertEquals(false, rehydrated.testLong(val3)); + BloomKFilter.addLong(buffer, val1); + rehydrated = BloomKFilter.deserialize(new ByteBufferInputStream(buffer)); + buffer.position(0); + assertEquals(true, rehydrated.testLong(val)); + assertEquals(true, rehydrated.testLong(val1)); + assertEquals(false, rehydrated.testLong(val2)); + assertEquals(false, rehydrated.testLong(val3)); + BloomKFilter.addLong(buffer, val2); + rehydrated = BloomKFilter.deserialize(new ByteBufferInputStream(buffer)); + buffer.position(0); + assertEquals(true, rehydrated.testLong(val)); + assertEquals(true, rehydrated.testLong(val1)); + assertEquals(true, rehydrated.testLong(val2)); + assertEquals(false, rehydrated.testLong(val3)); + BloomKFilter.addLong(buffer, val3); + rehydrated = BloomKFilter.deserialize(new ByteBufferInputStream(buffer)); + buffer.position(0); + assertEquals(true, rehydrated.testLong(val)); + assertEquals(true, rehydrated.testLong(val1)); + assertEquals(true, rehydrated.testLong(val2)); + assertEquals(true, rehydrated.testLong(val3)); + + byte randVal = 0; + for (int i = 0; i < COUNT; i++) { + randVal = (byte) rand.nextInt(Byte.MAX_VALUE); + BloomKFilter.addLong(buffer, randVal); + } + + rehydrated = BloomKFilter.deserialize(new ByteBufferInputStream(buffer)); + + // last value should be present + assertEquals(true, rehydrated.testLong(randVal)); + // most likely this value should not exist + assertEquals(false, rehydrated.testLong((byte) -120)); + + assertEquals(7808, rehydrated.sizeInBytes()); + } + + @Test + public void testBloomKFilterInt() throws IOException + { + BloomKFilter bf = new BloomKFilter(10000); + ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); + BloomKFilter.serialize(bytesOut, bf); + byte[] bfBytes = bytesOut.toByteArray(); + ByteBuffer buffer = ByteBuffer.wrap(bfBytes); + + int val = Integer.MIN_VALUE; + int val1 = 1; + int val2 = 2; + int val3 = Integer.MAX_VALUE; + + BloomKFilter.addLong(buffer, val); + BloomKFilter rehydrated = BloomKFilter.deserialize(new ByteBufferInputStream(buffer)); + buffer.position(0); + assertEquals(true, rehydrated.testLong(val)); + assertEquals(false, rehydrated.testLong(val1)); + assertEquals(false, rehydrated.testLong(val2)); + assertEquals(false, rehydrated.testLong(val3)); + BloomKFilter.addLong(buffer, val1); + rehydrated = BloomKFilter.deserialize(new ByteBufferInputStream(buffer)); + buffer.position(0); + assertEquals(true, rehydrated.testLong(val)); + assertEquals(true, rehydrated.testLong(val1)); + assertEquals(false, rehydrated.testLong(val2)); + assertEquals(false, rehydrated.testLong(val3)); + BloomKFilter.addLong(buffer, val2); + rehydrated = BloomKFilter.deserialize(new ByteBufferInputStream(buffer)); + buffer.position(0); + assertEquals(true, rehydrated.testLong(val)); + assertEquals(true, rehydrated.testLong(val1)); + assertEquals(true, rehydrated.testLong(val2)); + assertEquals(false, rehydrated.testLong(val3)); + BloomKFilter.addLong(buffer, val3); + rehydrated = BloomKFilter.deserialize(new ByteBufferInputStream(buffer)); + buffer.position(0); + assertEquals(true, rehydrated.testLong(val)); + assertEquals(true, rehydrated.testLong(val1)); + assertEquals(true, rehydrated.testLong(val2)); + assertEquals(true, rehydrated.testLong(val3)); + + int randVal = 0; + for (int i = 0; i < COUNT; i++) { + randVal = rand.nextInt(); + BloomKFilter.addLong(buffer, randVal); + } + rehydrated = BloomKFilter.deserialize(new ByteBufferInputStream(buffer)); + // last value should be present + assertEquals(true, rehydrated.testLong(randVal)); + // most likely this value should not exist + assertEquals(false, rehydrated.testLong(-120)); + + assertEquals(7808, rehydrated.sizeInBytes()); + } + + @Test + public void testBloomKFilterLong() throws IOException + { + BloomKFilter bf = new BloomKFilter(10000); + ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); + BloomKFilter.serialize(bytesOut, bf); + byte[] bfBytes = bytesOut.toByteArray(); + ByteBuffer buffer = ByteBuffer.wrap(bfBytes); + + long val = Long.MIN_VALUE; + long val1 = 1; + long val2 = 2; + long val3 = Long.MAX_VALUE; + + BloomKFilter.addLong(buffer, val); + BloomKFilter rehydrated = BloomKFilter.deserialize(new ByteBufferInputStream(buffer)); + buffer.position(0); + assertEquals(true, rehydrated.testLong(val)); + assertEquals(false, rehydrated.testLong(val1)); + assertEquals(false, rehydrated.testLong(val2)); + assertEquals(false, rehydrated.testLong(val3)); + BloomKFilter.addLong(buffer, val1); + rehydrated = BloomKFilter.deserialize(new ByteBufferInputStream(buffer)); + buffer.position(0); + assertEquals(true, rehydrated.testLong(val)); + assertEquals(true, rehydrated.testLong(val1)); + assertEquals(false, rehydrated.testLong(val2)); + assertEquals(false, rehydrated.testLong(val3)); + BloomKFilter.addLong(buffer, val2); + rehydrated = BloomKFilter.deserialize(new ByteBufferInputStream(buffer)); + buffer.position(0); + assertEquals(true, rehydrated.testLong(val)); + assertEquals(true, rehydrated.testLong(val1)); + assertEquals(true, rehydrated.testLong(val2)); + assertEquals(false, rehydrated.testLong(val3)); + BloomKFilter.addLong(buffer, val3); + rehydrated = BloomKFilter.deserialize(new ByteBufferInputStream(buffer)); + buffer.position(0); + assertEquals(true, rehydrated.testLong(val)); + assertEquals(true, rehydrated.testLong(val1)); + assertEquals(true, rehydrated.testLong(val2)); + assertEquals(true, rehydrated.testLong(val3)); + + int randVal = 0; + for (int i = 0; i < COUNT; i++) { + randVal = rand.nextInt(); + BloomKFilter.addLong(buffer, randVal); + } + rehydrated = BloomKFilter.deserialize(new ByteBufferInputStream(buffer)); + // last value should be present + assertEquals(true, rehydrated.testLong(randVal)); + // most likely this value should not exist + assertEquals(false, rehydrated.testLong(-120)); + + assertEquals(7808, rehydrated.sizeInBytes()); + } + + @Test + public void testBloomKFilterFloat() throws IOException + { + BloomKFilter bf = new BloomKFilter(10000); + ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); + BloomKFilter.serialize(bytesOut, bf); + byte[] bfBytes = bytesOut.toByteArray(); + ByteBuffer buffer = ByteBuffer.wrap(bfBytes); + + float val = Float.NEGATIVE_INFINITY; + float val1 = 1.1f; + float val2 = 2.2f; + float val3 = Float.POSITIVE_INFINITY; + + BloomKFilter.addFloat(buffer, val); + BloomKFilter rehydrated = BloomKFilter.deserialize(new ByteBufferInputStream(buffer)); + buffer.position(0); + assertEquals(true, rehydrated.testFloat(val)); + assertEquals(false, rehydrated.testFloat(val1)); + assertEquals(false, rehydrated.testFloat(val2)); + assertEquals(false, rehydrated.testFloat(val3)); + BloomKFilter.addFloat(buffer, val1); + rehydrated = BloomKFilter.deserialize(new ByteBufferInputStream(buffer)); + buffer.position(0); + assertEquals(true, rehydrated.testFloat(val)); + assertEquals(true, rehydrated.testFloat(val1)); + assertEquals(false, rehydrated.testFloat(val2)); + assertEquals(false, rehydrated.testFloat(val3)); + BloomKFilter.addFloat(buffer, val2); + rehydrated = BloomKFilter.deserialize(new ByteBufferInputStream(buffer)); + buffer.position(0); + assertEquals(true, rehydrated.testFloat(val)); + assertEquals(true, rehydrated.testFloat(val1)); + assertEquals(true, rehydrated.testFloat(val2)); + assertEquals(false, rehydrated.testFloat(val3)); + BloomKFilter.addFloat(buffer, val3); + rehydrated = BloomKFilter.deserialize(new ByteBufferInputStream(buffer)); + buffer.position(0); + assertEquals(true, rehydrated.testFloat(val)); + assertEquals(true, rehydrated.testFloat(val1)); + assertEquals(true, rehydrated.testFloat(val2)); + assertEquals(true, rehydrated.testFloat(val3)); + + float randVal = 0; + for (int i = 0; i < COUNT; i++) { + randVal = rand.nextFloat(); + BloomKFilter.addFloat(buffer, randVal); + } + rehydrated = BloomKFilter.deserialize(new ByteBufferInputStream(buffer)); + + // last value should be present + assertEquals(true, rehydrated.testFloat(randVal)); + // most likely this value should not exist + assertEquals(false, rehydrated.testFloat(-120.2f)); + + assertEquals(7808, rehydrated.sizeInBytes()); + } + + @Test + public void testBloomKFilterDouble() throws IOException + { + BloomKFilter bf = new BloomKFilter(10000); + ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); + BloomKFilter.serialize(bytesOut, bf); + byte[] bfBytes = bytesOut.toByteArray(); + ByteBuffer buffer = ByteBuffer.wrap(bfBytes); + + double val = Double.NEGATIVE_INFINITY; + double val1 = 1.1d; + double val2 = 2.2d; + double val3 = Double.POSITIVE_INFINITY; + + BloomKFilter.addDouble(buffer, val); + BloomKFilter rehydrated = BloomKFilter.deserialize(new ByteBufferInputStream(buffer)); + buffer.position(0); + assertEquals(true, rehydrated.testDouble(val)); + assertEquals(false, rehydrated.testDouble(val1)); + assertEquals(false, rehydrated.testDouble(val2)); + assertEquals(false, rehydrated.testDouble(val3)); + BloomKFilter.addDouble(buffer, val1); + rehydrated = BloomKFilter.deserialize(new ByteBufferInputStream(buffer)); + buffer.position(0); + assertEquals(true, rehydrated.testDouble(val)); + assertEquals(true, rehydrated.testDouble(val1)); + assertEquals(false, rehydrated.testDouble(val2)); + assertEquals(false, rehydrated.testDouble(val3)); + BloomKFilter.addDouble(buffer, val2); + rehydrated = BloomKFilter.deserialize(new ByteBufferInputStream(buffer)); + buffer.position(0); + assertEquals(true, rehydrated.testDouble(val)); + assertEquals(true, rehydrated.testDouble(val1)); + assertEquals(true, rehydrated.testDouble(val2)); + assertEquals(false, rehydrated.testDouble(val3)); + BloomKFilter.addDouble(buffer, val3); + rehydrated = BloomKFilter.deserialize(new ByteBufferInputStream(buffer)); + buffer.position(0); + assertEquals(true, rehydrated.testDouble(val)); + assertEquals(true, rehydrated.testDouble(val1)); + assertEquals(true, rehydrated.testDouble(val2)); + assertEquals(true, rehydrated.testDouble(val3)); + + double randVal = 0; + for (int i = 0; i < COUNT; i++) { + randVal = rand.nextDouble(); + BloomKFilter.addDouble(buffer, randVal); + } + rehydrated = BloomKFilter.deserialize(new ByteBufferInputStream(buffer)); + + // last value should be present + assertEquals(true, rehydrated.testDouble(randVal)); + // most likely this value should not exist + assertEquals(false, rehydrated.testDouble(-120.2d)); + + assertEquals(7808, rehydrated.sizeInBytes()); + } + + @Test + public void testBloomKFilterString() throws IOException + { + BloomKFilter bf = new BloomKFilter(100000); + ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); + BloomKFilter.serialize(bytesOut, bf); + byte[] bfBytes = bytesOut.toByteArray(); + ByteBuffer buffer = ByteBuffer.wrap(bfBytes); + + String val = "bloo"; + String val1 = "bloom fil"; + String val2 = "bloom filter"; + String val3 = "cuckoo filter"; + + BloomKFilter.addString(buffer, val); + BloomKFilter rehydrated = BloomKFilter.deserialize(new ByteBufferInputStream(buffer)); + buffer.position(0); + assertEquals(true, rehydrated.testString(val)); + assertEquals(false, rehydrated.testString(val1)); + assertEquals(false, rehydrated.testString(val2)); + assertEquals(false, rehydrated.testString(val3)); + BloomKFilter.addString(buffer, val1); + rehydrated = BloomKFilter.deserialize(new ByteBufferInputStream(buffer)); + buffer.position(0); + assertEquals(true, rehydrated.testString(val)); + assertEquals(true, rehydrated.testString(val1)); + assertEquals(false, rehydrated.testString(val2)); + assertEquals(false, rehydrated.testString(val3)); + BloomKFilter.addString(buffer, val2); + rehydrated = BloomKFilter.deserialize(new ByteBufferInputStream(buffer)); + buffer.position(0); + assertEquals(true, rehydrated.testString(val)); + assertEquals(true, rehydrated.testString(val1)); + assertEquals(true, rehydrated.testString(val2)); + assertEquals(false, rehydrated.testString(val3)); + BloomKFilter.addString(buffer, val3); + rehydrated = BloomKFilter.deserialize(new ByteBufferInputStream(buffer)); + buffer.position(0); + assertEquals(true, rehydrated.testString(val)); + assertEquals(true, rehydrated.testString(val1)); + assertEquals(true, rehydrated.testString(val2)); + assertEquals(true, rehydrated.testString(val3)); + + long randVal = 0; + for (int i = 0; i < COUNT; i++) { + randVal = rand.nextLong(); + BloomKFilter.addString(buffer, Long.toString(randVal)); + } + rehydrated = BloomKFilter.deserialize(new ByteBufferInputStream(buffer)); + // last value should be present + assertEquals(true, rehydrated.testString(Long.toString(randVal))); + // most likely this value should not exist + assertEquals(false, rehydrated.testString(Long.toString(-120))); + + assertEquals(77952, rehydrated.sizeInBytes()); + } + + @Test + public void testMergeBloomKFilterByteBuffers() throws Exception + { + BloomKFilter bf1 = new BloomKFilter(10000); + BloomKFilter bf2 = new BloomKFilter(10000); + + String[] inputs1 = { + "bloo", + "bloom fil", + "bloom filter", + "cuckoo filter", + }; + + String[] inputs2 = { + "2_bloo", + "2_bloom fil", + "2_bloom filter", + "2_cuckoo filter", + }; + + for (String val : inputs1) { + bf1.addString(val); + } + for (String val : inputs2) { + bf2.addString(val); + } + + ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); + BloomKFilter.serialize(bytesOut, bf1); + byte[] bf1Bytes = bytesOut.toByteArray(); + bytesOut.reset(); + BloomKFilter.serialize(bytesOut, bf2); + byte[] bf2Bytes = bytesOut.toByteArray(); + + ByteBuffer buf1 = ByteBuffer.wrap(bf1Bytes); + ByteBuffer buf2 = ByteBuffer.wrap(bf2Bytes); + + // Merge bytes + BloomKFilter.mergeBloomFilterByteBuffers( + buf1, + 0, + buf2, + 0 + ); + + // Deserialize and test + byte[] merged = new byte[bf1Bytes.length]; + buf1.get(merged, 0, bf1Bytes.length); + + ByteArrayInputStream bytesIn = new ByteArrayInputStream(merged, 0, bf1Bytes.length); + BloomKFilter bfMerged = BloomKFilter.deserialize(bytesIn); + // All values should pass test + for (String val : inputs1) { + assert bfMerged.testString(val); + } + for (String val : inputs2) { + assert bfMerged.testString(val); + } + } + + @Test + public void testCountBitBloomKFilterByteBuffersEmpty() throws Exception + { + BloomKFilter bfWithValues = new BloomKFilter(10000); + BloomKFilter bfEmpty = new BloomKFilter(10000); + BloomKFilter bfNull = new BloomKFilter(10000); + + for (int i = 0; i < 1000; i++) { + bfWithValues.addInt(rand.nextInt()); + } + + bfNull.addBytes(null, 0, 0); + + ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); + BloomKFilter.serialize(bytesOut, bfWithValues); + ByteBuffer bufWithValues = ByteBuffer.wrap(bytesOut.toByteArray()); + bytesOut.reset(); + BloomKFilter.serialize(bytesOut, bfEmpty); + ByteBuffer bufEmpty = ByteBuffer.wrap(bytesOut.toByteArray()); + bytesOut.reset(); + BloomKFilter.serialize(bytesOut, bfNull); + ByteBuffer bufWithNull = ByteBuffer.wrap(bytesOut.toByteArray()); + + + Assert.assertTrue(BloomKFilter.getNumSetBits(bufWithValues, 0) > 0); + Assert.assertFalse(BloomKFilter.getNumSetBits(bufEmpty, 0) > 0); + Assert.assertTrue(BloomKFilter.getNumSetBits(bufWithNull, 0) > 0); + Assert.assertTrue( + BloomKFilter.getNumSetBits(bufWithValues, 0) > BloomKFilter.getNumSetBits(bufWithNull, 0) + ); + } +} diff --git a/extensions-core/druid-bloom-filter/src/test/resources/sample.data.tsv b/extensions-core/druid-bloom-filter/src/test/resources/sample.data.tsv new file mode 100644 index 000000000000..674d86cefe9f --- /dev/null +++ b/extensions-core/druid-bloom-filter/src/test/resources/sample.data.tsv @@ -0,0 +1,13 @@ +2011-04-15T00:00:00.000Z spot automotive preferred apreferred 106.793700 +2011-04-15T00:00:00.000Z spot business preferred bpreferred 94.469747 +2011-04-15T00:00:00.000Z spot entertainment preferred epreferred 135.109191 +2011-04-15T00:00:00.000Z spot health preferred hpreferred 99.596909 +2011-04-15T00:00:00.000Z spot mezzanine preferred mpreferred 92.782760 +2011-04-15T00:00:00.000Z spot news preferred npreferred +2011-04-15T00:00:00.000Z spot premium preferred ppreferred +2011-04-15T00:00:00.000Z spot technology preferred tpreferred +2011-04-15T00:00:00.000Z spot travel preferred tpreferred +2011-04-15T00:00:00.000Z total_market mezzanine preferred mpreferred +2011-04-15T00:00:00.000Z total_market premium preferred ppreferred +2011-04-15T00:00:00.000Z upfront mezzanine preferred mpreferred +2011-04-15T00:00:00.000Z upfront premium preferred ppreferred diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java b/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java index 32003c6e61c4..19cf467b5467 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java @@ -113,6 +113,10 @@ public class AggregatorUtil // Fixed buckets histogram aggregator public static final byte FIXED_BUCKET_HIST_CACHE_TYPE_ID = 0x33; + // bloom filter extension + public static final byte BLOOM_FILTER_CACHE_TYPE_ID = 0x34; + public static final byte BLOOM_FILTER_MERGE_CACHE_TYPE_ID = 0x35; + /** * returns the list of dependent postAggregators that should be calculated in order to calculate given postAgg * diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/CardinalityBufferAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/CardinalityBufferAggregator.java index abb157738d4d..875cd3d8be82 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/CardinalityBufferAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/CardinalityBufferAggregator.java @@ -20,6 +20,7 @@ package org.apache.druid.query.aggregation.cardinality; import org.apache.druid.hll.HyperLogLogCollector; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.query.ColumnSelectorPlus; import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.query.aggregation.cardinality.types.CardinalityAggregatorColumnSelectorStrategy; @@ -112,6 +113,8 @@ public void close() @Override public void inspectRuntimeShape(RuntimeShapeInspector inspector) { - inspector.visit("selectorPluses", selectorPluses); + for (int i = 0; i < selectorPluses.length; i++) { + inspector.visit(StringUtils.format("selector-%d", i), selectorPluses[i].getSelector()); + } } }