diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/guice/BloomFilterSerializersModule.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/guice/BloomFilterSerializersModule.java index 21af16d649f7..011e091d3dd1 100644 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/guice/BloomFilterSerializersModule.java +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/guice/BloomFilterSerializersModule.java @@ -20,7 +20,6 @@ import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.DeserializationContext; import com.fasterxml.jackson.databind.SerializerProvider; import com.fasterxml.jackson.databind.deser.std.StdDeserializer; @@ -28,6 +27,7 @@ import com.fasterxml.jackson.databind.module.SimpleModule; import com.fasterxml.jackson.databind.ser.std.StdSerializer; import org.apache.druid.query.filter.BloomDimFilter; +import org.apache.druid.query.filter.BloomKFilterHolder; import org.apache.hive.common.util.BloomKFilter; import java.io.ByteArrayInputStream; @@ -45,12 +45,13 @@ public BloomFilterSerializersModule() ); addSerializer(BloomKFilter.class, new BloomKFilterSerializer()); addDeserializer(BloomKFilter.class, new BloomKFilterDeserializer()); + addDeserializer(BloomKFilterHolder.class, new BloomKFilterHolderDeserializer()); } - public static class BloomKFilterSerializer extends StdSerializer + private static class BloomKFilterSerializer extends StdSerializer { - public BloomKFilterSerializer() + BloomKFilterSerializer() { super(BloomKFilter.class); } @@ -60,17 +61,14 @@ public void serialize( BloomKFilter bloomKFilter, JsonGenerator jsonGenerator, SerializerProvider serializerProvider ) throws IOException { - ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); - BloomKFilter.serialize(byteArrayOutputStream, bloomKFilter); - byte[] bytes = byteArrayOutputStream.toByteArray(); - jsonGenerator.writeBinary(bytes); + jsonGenerator.writeBinary(bloomKFilterToBytes(bloomKFilter)); } } - public static class BloomKFilterDeserializer extends StdDeserializer + private static class BloomKFilterDeserializer extends StdDeserializer { - protected BloomKFilterDeserializer() + BloomKFilterDeserializer() { super(BloomKFilter.class); } @@ -78,11 +76,37 @@ protected BloomKFilterDeserializer() @Override public BloomKFilter deserialize( JsonParser jsonParser, DeserializationContext deserializationContext - ) throws IOException, JsonProcessingException + ) throws IOException { - byte[] bytes = jsonParser.getBinaryValue(); - return BloomKFilter.deserialize(new ByteArrayInputStream(bytes)); + return bloomKFilterFromBytes(jsonParser.getBinaryValue()); + } + } + private static class BloomKFilterHolderDeserializer extends StdDeserializer + { + BloomKFilterHolderDeserializer() + { + super(BloomKFilterHolder.class); } + + @Override + public BloomKFilterHolder deserialize( + JsonParser jsonParser, DeserializationContext deserializationContext + ) throws IOException + { + return BloomKFilterHolder.fromBytes(jsonParser.getBinaryValue()); + } + } + + public static byte[] bloomKFilterToBytes(BloomKFilter bloomKFilter) throws IOException + { + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + BloomKFilter.serialize(byteArrayOutputStream, bloomKFilter); + return byteArrayOutputStream.toByteArray(); + } + + public static BloomKFilter bloomKFilterFromBytes(byte[] bytes) throws IOException + { + return BloomKFilter.deserialize(new ByteArrayInputStream(bytes)); } } diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/filter/BloomDimFilter.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/filter/BloomDimFilter.java index bfaa37c8857a..383488cc41ac 100644 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/filter/BloomDimFilter.java +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/filter/BloomDimFilter.java @@ -25,15 +25,13 @@ import com.google.common.base.Predicate; import com.google.common.collect.RangeSet; import com.google.common.collect.Sets; -import org.apache.druid.java.util.common.ISE; +import com.google.common.hash.HashCode; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.query.cache.CacheKeyBuilder; import org.apache.druid.query.extraction.ExtractionFn; import org.apache.druid.segment.filter.DimensionPredicateFilter; import org.apache.hive.common.util.BloomKFilter; -import java.io.ByteArrayOutputStream; -import java.io.IOException; import java.util.HashSet; /** @@ -43,39 +41,33 @@ public class BloomDimFilter implements DimFilter private final String dimension; private final BloomKFilter bloomKFilter; + private final HashCode hash; private final ExtractionFn extractionFn; @JsonCreator public BloomDimFilter( @JsonProperty("dimension") String dimension, - @JsonProperty("bloomKFilter") BloomKFilter bloomKFilter, + @JsonProperty("bloomKFilter") BloomKFilterHolder bloomKFilterHolder, @JsonProperty("extractionFn") ExtractionFn extractionFn ) { Preconditions.checkArgument(dimension != null, "dimension must not be null"); - Preconditions.checkNotNull(bloomKFilter); + Preconditions.checkNotNull(bloomKFilterHolder); this.dimension = dimension; - this.bloomKFilter = bloomKFilter; + this.bloomKFilter = bloomKFilterHolder.getFilter(); + this.hash = bloomKFilterHolder.getFilterHash(); this.extractionFn = extractionFn; } @Override public byte[] getCacheKey() { - ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); - try { - BloomKFilter.serialize(byteArrayOutputStream, bloomKFilter); - } - catch (IOException e) { - throw new ISE(e, "Exception when generating cache key for [%s]", this); - } - byte[] bloomFilterBytes = byteArrayOutputStream.toByteArray(); return new CacheKeyBuilder(DimFilterUtils.BLOOM_DIM_FILTER_CACHE_ID) .appendString(dimension) .appendByte(DimFilterUtils.STRING_SEPARATOR) .appendByteArray(extractionFn == null ? new byte[0] : extractionFn.getCacheKey()) .appendByte(DimFilterUtils.STRING_SEPARATOR) - .appendByteArray(bloomFilterBytes) + .appendByteArray(hash.asBytes()) .build(); } @@ -187,9 +179,9 @@ public ExtractionFn getExtractionFn() public String toString() { if (extractionFn != null) { - return StringUtils.format("%s(%s) = %s", extractionFn, dimension, bloomKFilter); + return StringUtils.format("%s(%s) = %s", extractionFn, dimension, hash.toString()); } else { - return StringUtils.format("%s = %s", dimension, bloomKFilter); + return StringUtils.format("%s = %s", dimension, hash.toString()); } } @@ -208,7 +200,7 @@ public boolean equals(Object o) if (!dimension.equals(that.dimension)) { return false; } - if (bloomKFilter != null ? !bloomKFilter.equals(that.bloomKFilter) : that.bloomKFilter != null) { + if (hash != null ? !hash.equals(that.hash) : that.hash != null) { return false; } return extractionFn != null ? extractionFn.equals(that.extractionFn) : that.extractionFn == null; @@ -230,7 +222,7 @@ public HashSet getRequiredColumns() public int hashCode() { int result = dimension.hashCode(); - result = 31 * result + (bloomKFilter != null ? bloomKFilter.hashCode() : 0); + result = 31 * result + (hash != null ? hash.hashCode() : 0); result = 31 * result + (extractionFn != null ? extractionFn.hashCode() : 0); return result; } diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/filter/BloomKFilterHolder.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/filter/BloomKFilterHolder.java new file mode 100644 index 000000000000..e06632f0a60c --- /dev/null +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/filter/BloomKFilterHolder.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.filter; + +import com.google.common.hash.HashCode; +import com.google.common.hash.Hashing; +import org.apache.druid.guice.BloomFilterSerializersModule; +import org.apache.hive.common.util.BloomKFilter; + +import java.io.IOException; +import java.util.Objects; + +public class BloomKFilterHolder +{ + private final BloomKFilter filter; + private final HashCode hash; + + public BloomKFilterHolder(BloomKFilter filter, HashCode hash) + { + this.filter = filter; + this.hash = hash; + } + + BloomKFilter getFilter() + { + return filter; + } + + HashCode getFilterHash() + { + return hash; + } + + public static BloomKFilterHolder fromBloomKFilter(BloomKFilter filter) throws IOException + { + byte[] bytes = BloomFilterSerializersModule.bloomKFilterToBytes(filter); + + return new BloomKFilterHolder(filter, Hashing.sha512().hashBytes(bytes)); + } + + public static BloomKFilterHolder fromBytes(byte[] bytes) throws IOException + { + return new BloomKFilterHolder( + BloomFilterSerializersModule.bloomKFilterFromBytes(bytes), + Hashing.sha512().hashBytes(bytes) + ); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + BloomKFilterHolder that = (BloomKFilterHolder) o; + return Objects.equals(this.hash, that.hash); + } + + @Override + public int hashCode() + { + return Objects.hash(filter, hash); + } +} diff --git a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/filter/BloomDimFilterTest.java b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/filter/BloomDimFilterTest.java index 181235a34778..81a272f5f95a 100644 --- a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/filter/BloomDimFilterTest.java +++ b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/filter/BloomDimFilterTest.java @@ -90,6 +90,8 @@ public class BloomDimFilterTest extends BaseFilterTest PARSER.parseBatch(ImmutableMap.of("dim0", "5", "dim1", "abc")).get(0) ); + private static DefaultObjectMapper mapper = new DefaultObjectMapper(); + public BloomDimFilterTest( String testName, IndexBuilder indexBuilder, @@ -111,8 +113,6 @@ public BloomDimFilterTest( ); } - private static DefaultObjectMapper mapper = new DefaultObjectMapper(); - @BeforeClass public static void beforeClass() { @@ -130,9 +130,10 @@ public void testSerde() throws IOException { BloomKFilter bloomFilter = new BloomKFilter(1500); bloomFilter.addString("myTestString"); + BloomKFilterHolder holder = new BloomKFilterHolder(bloomFilter, null); BloomDimFilter bloomDimFilter = new BloomDimFilter( "abc", - bloomFilter, + holder, new TimeDimExtractionFn("yyyy-MM-dd", "yyyy-MM", true) ); DimFilter filter = mapper.readValue(mapper.writeValueAsBytes(bloomDimFilter), DimFilter.class); @@ -145,7 +146,7 @@ public void testSerde() throws IOException } @Test - public void testWithTimeExtractionFnNull() + public void testWithTimeExtractionFnNull() throws IOException { assertFilterMatches(new BloomDimFilter( "dim0", @@ -170,7 +171,7 @@ public void testWithTimeExtractionFnNull() } @Test - public void testSingleValueStringColumnWithoutNulls() + public void testSingleValueStringColumnWithoutNulls() throws IOException { assertFilterMatches(new BloomDimFilter("dim0", bloomKFilter(1000, (String) null), null), ImmutableList.of()); assertFilterMatches(new BloomDimFilter("dim0", bloomKFilter(1000, ""), null), ImmutableList.of()); @@ -179,7 +180,7 @@ public void testSingleValueStringColumnWithoutNulls() } @Test - public void testSingleValueStringColumnWithNulls() + public void testSingleValueStringColumnWithNulls() throws IOException { if (NullHandling.replaceWithDefault()) { assertFilterMatches(new BloomDimFilter("dim1", bloomKFilter(1000, (String) null), null), ImmutableList.of("0")); @@ -196,7 +197,7 @@ public void testSingleValueStringColumnWithNulls() } @Test - public void testMultiValueStringColumn() + public void testMultiValueStringColumn() throws IOException { if (NullHandling.replaceWithDefault()) { assertFilterMatches( @@ -217,7 +218,7 @@ public void testMultiValueStringColumn() } @Test - public void testMissingColumnSpecifiedInDimensionList() + public void testMissingColumnSpecifiedInDimensionList() throws IOException { assertFilterMatches( new BloomDimFilter("dim3", bloomKFilter(1000, (String) null), null), @@ -230,7 +231,7 @@ public void testMissingColumnSpecifiedInDimensionList() } @Test - public void testMissingColumnNotSpecifiedInDimensionList() + public void testMissingColumnNotSpecifiedInDimensionList() throws IOException { assertFilterMatches( new BloomDimFilter("dim4", bloomKFilter(1000, (String) null), null), @@ -243,7 +244,7 @@ public void testMissingColumnNotSpecifiedInDimensionList() } @Test - public void testExpressionVirtualColumn() + public void testExpressionVirtualColumn() throws IOException { assertFilterMatches( new BloomDimFilter("expr", bloomKFilter(1000, 1.1F), null), @@ -263,7 +264,7 @@ public void testExpressionVirtualColumn() } @Test - public void testSelectorWithLookupExtractionFn() + public void testSelectorWithLookupExtractionFn() throws IOException { final Map stringMap = ImmutableMap.of( "1", "HELLO", @@ -334,7 +335,32 @@ public void testSelectorWithLookupExtractionFn() } } - private static BloomKFilter bloomKFilter(int expectedEntries, String... values) + @Test + public void testCacheKeyIsNotGiantIfFilterIsGiant() throws IOException + { + BloomKFilter bloomFilter = new BloomKFilter(10_000_000); + // FILL IT UP! + bloomFilter.addString("myTestString"); + + BloomKFilterHolder holder = BloomKFilterHolder.fromBloomKFilter(bloomFilter); + + BloomDimFilter bloomDimFilter = new BloomDimFilter( + "abc", + holder, + new TimeDimExtractionFn("yyyy-MM-dd", "yyyy-MM", true) + ); + + byte[] bloomFilterBytes = BloomFilterSerializersModule.bloomKFilterToBytes(bloomFilter); + + // serialized filter can be quite large for high capacity bloom filters... + Assert.assertTrue(bloomFilterBytes.length > 7794000); + + // actual size is 86 bytes instead of 7794075 bytes of old key format + final int actualSize = bloomDimFilter.getCacheKey().length; + Assert.assertTrue(actualSize < 100); + } + + private static BloomKFilterHolder bloomKFilter(int expectedEntries, String... values) throws IOException { BloomKFilter filter = new BloomKFilter(expectedEntries); for (String value : values) { @@ -344,10 +370,11 @@ private static BloomKFilter bloomKFilter(int expectedEntries, String... values) filter.addString(value); } } - return filter; + + return BloomKFilterHolder.fromBloomKFilter(filter); } - private static BloomKFilter bloomKFilter(int expectedEntries, Float... values) + private static BloomKFilterHolder bloomKFilter(int expectedEntries, Float... values) throws IOException { BloomKFilter filter = new BloomKFilter(expectedEntries); for (Float value : values) { @@ -357,10 +384,10 @@ private static BloomKFilter bloomKFilter(int expectedEntries, Float... values) filter.addFloat(value); } } - return filter; + return BloomKFilterHolder.fromBloomKFilter(filter); } - private static BloomKFilter bloomKFilter(int expectedEntries, Double... values) + private static BloomKFilterHolder bloomKFilter(int expectedEntries, Double... values) throws IOException { BloomKFilter filter = new BloomKFilter(expectedEntries); for (Double value : values) { @@ -370,10 +397,10 @@ private static BloomKFilter bloomKFilter(int expectedEntries, Double... values) filter.addDouble(value); } } - return filter; + return BloomKFilterHolder.fromBloomKFilter(filter); } - private static BloomKFilter bloomKFilter(int expectedEntries, Long... values) + private static BloomKFilterHolder bloomKFilter(int expectedEntries, Long... values) throws IOException { BloomKFilter filter = new BloomKFilter(expectedEntries); for (Long value : values) { @@ -383,6 +410,6 @@ private static BloomKFilter bloomKFilter(int expectedEntries, Long... values) filter.addLong(value); } } - return filter; + return BloomKFilterHolder.fromBloomKFilter(filter); } }